You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/02 18:07:33 UTC

[1/5] incubator-beam git commit: Move getPathValidator to GcsOptions and use it in temp/staging locations

Repository: incubator-beam
Updated Branches:
  refs/heads/master d73e614b7 -> 63c5d19b3


Move getPathValidator to GcsOptions and use it in temp/staging locations


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/70f394b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/70f394b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/70f394b8

Branch: refs/heads/master
Commit: 70f394b8f238811b9f237b0c401f5388e12955bd
Parents: 724c88d
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 29 16:34:37 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Aug 2 11:07:28 2016 -0700

----------------------------------------------------------------------
 .../options/DataflowPipelineDebugOptions.java   | 43 -------------------
 .../options/DataflowPipelineOptions.java        |  5 ++-
 .../runners/dataflow/DataflowRunnerTest.java    |  4 +-
 .../org/apache/beam/sdk/options/GcpOptions.java | 11 +++--
 .../org/apache/beam/sdk/options/GcsOptions.java | 44 ++++++++++++++++++++
 5 files changed, 57 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70f394b8/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index bc92a5f..ac2e0b7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.dataflow.options;
 
-import org.apache.beam.sdk.util.GcsPathValidator;
 import org.apache.beam.runners.dataflow.util.DataflowTransport;
 import org.apache.beam.runners.dataflow.util.GcsStager;
 import org.apache.beam.runners.dataflow.util.Stager;
@@ -28,7 +27,6 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.Hidden;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.PathValidator;
 
 import com.google.api.services.dataflow.Dataflow;
 
@@ -101,31 +99,6 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions {
   void setDataflowJobFile(String value);
 
   /**
-   * The class of the validator that should be created and used to validate paths.
-   * If pathValidator has not been set explicitly, an instance of this class will be
-   * constructed and used as the path validator.
-   */
-  @Description("The class of the validator that should be created and used to validate paths. "
-      + "If pathValidator has not been set explicitly, an instance of this class will be "
-      + "constructed and used as the path validator.")
-  @Default.Class(GcsPathValidator.class)
-  Class<? extends PathValidator> getPathValidatorClass();
-  void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
-
-  /**
-   * The path validator instance that should be used to validate paths.
-   * If no path validator has been set explicitly, the default is to use the instance factory that
-   * constructs a path validator based upon the currently set pathValidatorClass.
-   */
-  @JsonIgnore
-  @Description("The path validator instance that should be used to validate paths. "
-      + "If no path validator has been set explicitly, the default is to use the instance factory "
-      + "that constructs a path validator based upon the currently set pathValidatorClass.")
-  @Default.InstanceFactory(PathValidatorFactory.class)
-  PathValidator getPathValidator();
-  void setPathValidator(PathValidator validator);
-
-  /**
    * The class responsible for staging resources to be accessible by workers
    * during job execution. If stager has not been set explicitly, an instance of this class
    * will be created and used as the resource stager.
@@ -226,22 +199,6 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions {
   void setDumpHeapOnOOM(boolean dumpHeapBeforeExit);
 
   /**
-   * Creates a {@link PathValidator} object using the class specified in
-   * {@link #getPathValidatorClass()}.
-   */
-  public static class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
-      @Override
-      public PathValidator create(PipelineOptions options) {
-      DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
-      return InstanceBuilder.ofType(PathValidator.class)
-          .fromClass(debugOptions.getPathValidatorClass())
-          .fromFactoryMethod("fromOptions")
-          .withArg(PipelineOptions.class, options)
-          .build();
-    }
-  }
-
-  /**
    * Creates a {@link Stager} object using the class specified in
    * {@link #getStagerClass()}.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70f394b8/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index b69b6f9..0c5bf47 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -138,13 +138,14 @@ public interface DataflowPipelineOptions
 
     @Override
     public String create(PipelineOptions options) {
-      String gcpTempLocation = options.as(GcpOptions.class).getGcpTempLocation();
+      GcsOptions gcsOptions = options.as(GcsOptions.class);
+      String gcpTempLocation = gcsOptions.getGcpTempLocation();
       checkArgument(!isNullOrEmpty(gcpTempLocation),
           "Error constructing default value for stagingLocation: gcpTempLocation is missing."
           + "Either stagingLocation must be set explicitly or a valid value must be provided"
           + "for gcpTempLocation.");
       try {
-        GcsPath.fromUri(gcpTempLocation);
+        gcsOptions.getPathValidator().validateOutputFilePrefixSupported(gcpTempLocation);
       } catch (Exception e) {
         throw new IllegalArgumentException(String.format(
             "Error constructing default value for stagingLocation: gcpTempLocation is not"

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70f394b8/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index d3f2f75..a29cdc9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -229,7 +229,7 @@ public class DataflowRunnerTest {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setRunner(DataflowRunner.class);
     options.setProject(PROJECT_ID);
-    options.setTempLocation("gs://somebucket/some/path");
+    options.setGcpTempLocation("gs://somebucket/some/path");
     // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
     options.setFilesToStage(new LinkedList<String>());
     options.setDataflowClient(buildMockDataflow(jobCaptor));
@@ -601,7 +601,7 @@ public class DataflowRunnerTest {
         buildMockGcsUtil(false /* temp bucket exists */, true /* staging bucket exists */);
     DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
     options.setGcsUtil(mockGcsUtil);
-    options.setTempLocation("gs://non-existent-bucket/location");
+    options.setGcpTempLocation("gs://non-existent-bucket/location");
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(containsString(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70f394b8/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
index 1bf4dd6..de3f133 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.options;
 import org.apache.beam.sdk.util.CredentialFactory;
 import org.apache.beam.sdk.util.GcpCredentialFactory;
 import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.util.PathValidator;
 
 import com.google.api.client.auth.oauth2.Credential;
 import com.google.api.client.googleapis.auth.oauth2.GoogleOAuthConstants;
@@ -42,6 +42,8 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import javax.annotation.Nullable;
+
 /**
  * Options used to configure Google Cloud Platform project and credentials.
  *
@@ -300,7 +302,7 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
    */
   @Description("A GCS path for storing temporary files in GCP.")
   @Default.InstanceFactory(GcpTempLocationFactory.class)
-  String getGcpTempLocation();
+  @Nullable String getGcpTempLocation();
   void setGcpTempLocation(String value);
 
   /**
@@ -309,11 +311,14 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
   public static class GcpTempLocationFactory implements DefaultValueFactory<String> {
 
     @Override
+    @Nullable
     public String create(PipelineOptions options) {
       String tempLocation = options.getTempLocation();
       if (!Strings.isNullOrEmpty(tempLocation)) {
         try {
-          GcsPath.fromUri(tempLocation);
+          PathValidator validator = options.as(GcsOptions.class).getPathValidator();
+          System.err.println(validator);
+          validator.validateOutputFilePrefixSupported(tempLocation);
         } catch (Exception e) {
           // Ignore the temp location because it is not a valid 'gs://' path.
           return null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70f394b8/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
index bae4742..1b3436b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
@@ -18,7 +18,10 @@
 package org.apache.beam.sdk.options;
 
 import org.apache.beam.sdk.util.AppEngineEnvironment;
+import org.apache.beam.sdk.util.GcsPathValidator;
 import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PathValidator;
 
 import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -85,6 +88,31 @@ public interface GcsOptions extends
   void setGcsUploadBufferSizeBytes(Integer bytes);
 
   /**
+   * The class of the validator that should be created and used to validate paths.
+   * If pathValidator has not been set explicitly, an instance of this class will be
+   * constructed and used as the path validator.
+   */
+  @Description("The class of the validator that should be created and used to validate paths. "
+      + "If pathValidator has not been set explicitly, an instance of this class will be "
+      + "constructed and used as the path validator.")
+  @Default.Class(GcsPathValidator.class)
+  Class<? extends PathValidator> getPathValidatorClass();
+  void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
+
+  /**
+   * The path validator instance that should be used to validate paths.
+   * If no path validator has been set explicitly, the default is to use the instance factory that
+   * constructs a path validator based upon the currently set pathValidatorClass.
+   */
+  @JsonIgnore
+  @Description("The path validator instance that should be used to validate paths. "
+      + "If no path validator has been set explicitly, the default is to use the instance factory "
+      + "that constructs a path validator based upon the currently set pathValidatorClass.")
+  @Default.InstanceFactory(PathValidatorFactory.class)
+  PathValidator getPathValidator();
+  void setPathValidator(PathValidator validator);
+
+  /**
    * Returns the default {@link ExecutorService} to use within the Dataflow SDK. The
    * {@link ExecutorService} is compatible with AppEngine.
    */
@@ -112,4 +140,20 @@ public interface GcsOptions extends
           threadFactoryBuilder.build());
     }
   }
+
+  /**
+   * Creates a {@link PathValidator} object using the class specified in
+   * {@link #getPathValidatorClass()}.
+   */
+  public static class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
+    @Override
+    public PathValidator create(PipelineOptions options) {
+      GcsOptions gcsOptions = options.as(GcsOptions.class);
+      return InstanceBuilder.ofType(PathValidator.class)
+          .fromClass(gcsOptions.getPathValidatorClass())
+          .fromFactoryMethod("fromOptions")
+          .withArg(PipelineOptions.class, options)
+          .build();
+    }
+  }
 }


[4/5] incubator-beam git commit: Some fixups to tests

Posted by dh...@apache.org.
Some fixups to tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ab20823c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ab20823c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ab20823c

Branch: refs/heads/master
Commit: ab20823cda024ed3061034774c2a896d2ddc0671
Parents: 70f394b
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 29 16:51:09 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Aug 2 11:07:29 2016 -0700

----------------------------------------------------------------------
 .../dataflow/options/DataflowPipelineOptions.java    |  1 -
 .../beam/runners/dataflow/DataflowRunnerTest.java    |  3 ++-
 .../options/DataflowPipelineOptionsTest.java         |  3 +++
 .../runners/dataflow/util/GcsPathValidatorTest.java  | 15 +++++++++++++--
 .../org/apache/beam/sdk/options/GcpOptionsTest.java  |  2 ++
 5 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab20823c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 0c5bf47..841741f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -33,7 +33,6 @@ import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
 
 import com.google.common.base.MoreObjects;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab20823c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index a29cdc9..704410d 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -229,7 +229,7 @@ public class DataflowRunnerTest {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     options.setRunner(DataflowRunner.class);
     options.setProject(PROJECT_ID);
-    options.setGcpTempLocation("gs://somebucket/some/path");
+    options.setTempLocation("gs://somebucket/some/path");
     // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
     options.setFilesToStage(new LinkedList<String>());
     options.setDataflowClient(buildMockDataflow(jobCaptor));
@@ -617,6 +617,7 @@ public class DataflowRunnerTest {
     GcsUtil mockGcsUtil =
         buildMockGcsUtil(true /* temp bucket exists */, false /* staging bucket exists */);
     DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+    options.setGcpTempLocation(options.getTempLocation()); // bypass validation for GcpTempLocation
     options.setGcsUtil(mockGcsUtil);
     options.setStagingLocation("gs://non-existent-bucket/location");
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab20823c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index b5ee5e9..c0422ee 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ResetDateTimeProvider;
 import org.apache.beam.sdk.testing.RestoreSystemProperties;
 import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.NoopPathValidator;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -109,6 +110,7 @@ public class DataflowPipelineOptionsTest {
   public void testDefaultToTempLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     IOChannelUtils.registerStandardIOFactories(options);
+    options.setPathValidatorClass(NoopPathValidator.class);
     options.setTempLocation("gs://temp_location");
     assertEquals("gs://temp_location", options.getGcpTempLocation());
     assertEquals("gs://temp_location/staging", options.getStagingLocation());
@@ -118,6 +120,7 @@ public class DataflowPipelineOptionsTest {
   public void testDefaultToGcpTempLocation() {
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
     IOChannelUtils.registerStandardIOFactories(options);
+    options.setPathValidatorClass(NoopPathValidator.class);
     options.setTempLocation("gs://temp_location");
     options.setGcpTempLocation("gs://gcp_temp_location");
     assertEquals("gs://gcp_temp_location/staging", options.getStagingLocation());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab20823c/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
index d101627..8913916 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
@@ -21,8 +21,11 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.when;
 
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.util.GcsPathValidator;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.TestCredential;
@@ -45,12 +48,20 @@ public class GcsPathValidatorTest {
   @Mock private GcsUtil mockGcsUtil;
   private GcsPathValidator validator;
 
+  private class FakeRunner extends PipelineRunner<PipelineResult> {
+    @Override
+    public PipelineResult run(Pipeline pipeline) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
   @Before
   public void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
     when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
     when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
     GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+    options.setRunner(FakeRunner.class);
     options.setGcpCredential(new TestCredential());
     options.setGcsUtil(mockGcsUtil);
     validator = GcsPathValidator.fromOptions(options);
@@ -65,7 +76,7 @@ public class GcsPathValidatorTest {
   public void testInvalidFilePattern() {
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage(
-        "DirectRunner expected a valid 'gs://' path but was given '/local/path'");
+        "FakeRunner expected a valid 'gs://' path but was given '/local/path'");
     validator.validateInputFilePatternSupported("/local/path");
   }
 
@@ -87,7 +98,7 @@ public class GcsPathValidatorTest {
   public void testInvalidOutputPrefix() {
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage(
-        "DirectRunner expected a valid 'gs://' path but was given '/local/path'");
+        "FakeRunner expected a valid 'gs://' path but was given '/local/path'");
     validator.validateOutputFilePrefixSupported("/local/path");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab20823c/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
index c0f65d8..22359dc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
 
 import org.apache.beam.sdk.options.GcpOptions.DefaultProjectFactory;
 import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.sdk.util.NoopPathValidator;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Files;
@@ -117,6 +118,7 @@ public class GcpOptionsTest {
     GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
     String tempLocation = "gs://bucket";
     options.setTempLocation(tempLocation);
+    options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class);
     assertEquals(tempLocation, options.getGcpTempLocation());
   }
 


[3/5] incubator-beam git commit: DataflowRunner: test path validation

Posted by dh...@apache.org.
DataflowRunner: test path validation


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1f87b848
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1f87b848
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1f87b848

Branch: refs/heads/master
Commit: 1f87b84838f53c49151cfadcd20dcf6890f8491f
Parents: d73e614
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 29 15:30:16 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Aug 2 11:07:28 2016 -0700

----------------------------------------------------------------------
 .../runners/dataflow/DataflowRunnerTest.java    | 36 ++++++++++++++++++++
 1 file changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f87b848/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index fe288ad..d3f2f75 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow;
 
 import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -62,6 +63,7 @@ import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -70,6 +72,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.NoopCredentialFactory;
 import org.apache.beam.sdk.util.NoopPathValidator;
 import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.util.TestCredential;
@@ -88,6 +91,7 @@ import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.ListJobsResponse;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
@@ -235,6 +239,38 @@ public class DataflowRunnerTest {
   }
 
   @Test
+  public void testPathValidation() {
+    String[] args = new String[] {
+        "--runner=DataflowRunner",
+        "--tempLocation=/tmp/not/a/gs/path",
+        "--project=test-project",
+        "--credentialFactoryClass=" + NoopCredentialFactory.class.getCanonicalName(),
+    };
+
+    try {
+      TestPipeline.fromOptions(PipelineOptionsFactory.fromArgs(args).create());
+      fail();
+    } catch (RuntimeException e) {
+      assertThat(
+          Throwables.getStackTraceAsString(e),
+          containsString("DataflowRunner requires gcpTempLocation"));
+    }
+  }
+
+  @Test
+  public void testPathValidatorOverride() {
+    String[] args = new String[] {
+        "--runner=DataflowRunner",
+        "--tempLocation=/tmp/testing",
+        "--project=test-project",
+        "--credentialFactoryClass=" + NoopCredentialFactory.class.getCanonicalName(),
+        "--pathValidatorClass=" + NoopPathValidator.class.getCanonicalName(),
+    };
+    // Should not crash, because gcpTempLocation should get set from tempLocation
+    TestPipeline.fromOptions(PipelineOptionsFactory.fromArgs(args).create());
+  }
+
+  @Test
   public void testFromOptionsWithUppercaseConvertsToLowercase() throws Exception {
     String mixedCase = "ThisJobNameHasMixedCase";
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);


[2/5] incubator-beam git commit: Move DataflowPathValidator to sdk.util, rename to GcsPathValidator

Posted by dh...@apache.org.
Move DataflowPathValidator to sdk.util, rename to GcsPathValidator


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/724c88d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/724c88d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/724c88d9

Branch: refs/heads/master
Commit: 724c88d950d3d872b364c6a761a1c4faa32efe54
Parents: 1f87b84
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 29 16:11:33 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Aug 2 11:07:28 2016 -0700

----------------------------------------------------------------------
 .../options/DataflowPipelineDebugOptions.java   |  4 +-
 .../dataflow/util/DataflowPathValidator.java    | 98 --------------------
 .../DataflowPipelineTranslatorTest.java         |  2 +-
 .../util/DataflowPathValidatorTest.java         | 94 -------------------
 .../apache/beam/sdk/util/GcsPathValidator.java  | 97 +++++++++++++++++++
 .../dataflow/util/GcsPathValidatorTest.java     | 93 +++++++++++++++++++
 6 files changed, 193 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index 8765adf..bc92a5f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.options;
 
-import org.apache.beam.runners.dataflow.util.DataflowPathValidator;
+import org.apache.beam.sdk.util.GcsPathValidator;
 import org.apache.beam.runners.dataflow.util.DataflowTransport;
 import org.apache.beam.runners.dataflow.util.GcsStager;
 import org.apache.beam.runners.dataflow.util.Stager;
@@ -108,7 +108,7 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions {
   @Description("The class of the validator that should be created and used to validate paths. "
       + "If pathValidator has not been set explicitly, an instance of this class will be "
       + "constructed and used as the path validator.")
-  @Default.Class(DataflowPathValidator.class)
+  @Default.Class(GcsPathValidator.class)
   Class<? extends PathValidator> getPathValidatorClass();
   void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
deleted file mode 100644
index ec10b28..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.PathValidator;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import java.io.IOException;
-
-/**
- * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
- */
-public class DataflowPathValidator implements PathValidator {
-
-  private DataflowPipelineOptions dataflowOptions;
-
-  DataflowPathValidator(DataflowPipelineOptions options) {
-    this.dataflowOptions = options;
-  }
-
-  public static DataflowPathValidator fromOptions(PipelineOptions options) {
-    return new DataflowPathValidator(options.as(DataflowPipelineOptions.class));
-  }
-
-  /**
-   * Validates the the input GCS path is accessible and that the path
-   * is well formed.
-   */
-  @Override
-  public String validateInputFilePatternSupported(String filepattern) {
-    GcsPath gcsPath = getGcsPath(filepattern);
-    checkArgument(dataflowOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
-    String returnValue = verifyPath(filepattern);
-    verifyPathIsAccessible(filepattern, "Could not find file %s");
-    return returnValue;
-  }
-
-  /**
-   * Validates the the output GCS path is accessible and that the path
-   * is well formed.
-   */
-  @Override
-  public String validateOutputFilePrefixSupported(String filePrefix) {
-    String returnValue = verifyPath(filePrefix);
-    verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
-    return returnValue;
-  }
-
-  @Override
-  public String verifyPath(String path) {
-    GcsPath gcsPath = getGcsPath(path);
-    checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
-    checkArgument(!gcsPath.getObject().contains("//"),
-        "Dataflow Service does not allow objects with consecutive slashes");
-    return gcsPath.toResourceName();
-  }
-
-  private void verifyPathIsAccessible(String path, String errorMessage) {
-    GcsPath gcsPath = getGcsPath(path);
-    try {
-      checkArgument(dataflowOptions.getGcsUtil().bucketExists(gcsPath),
-        errorMessage, path);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
-          e);
-    }
-  }
-
-  private GcsPath getGcsPath(String path) {
-    try {
-      return GcsPath.fromUri(path);
-    } catch (IllegalArgumentException e) {
-      throw new IllegalArgumentException(String.format(
-          "%s expected a valid 'gs://' path but was given '%s'",
-          dataflowOptions.getRunner().getSimpleName(), path), e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index d4d571b..7d89735 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -194,7 +194,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     settings.put("appName", "DataflowPipelineTranslatorTest");
     settings.put("project", "some-project");
     settings.put("pathValidatorClass",
-        "org.apache.beam.runners.dataflow.util.DataflowPathValidator");
+        "org.apache.beam.sdk.util.GcsPathValidator");
     settings.put("runner", "org.apache.beam.runners.dataflow.DataflowRunner");
     settings.put("jobName", "some-job-name");
     settings.put("tempLocation", "gs://somebucket/some/path");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
deleted file mode 100644
index a91f56c..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.util;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Tests for {@link DataflowPathValidator}. */
-@RunWith(JUnit4.class)
-public class DataflowPathValidatorTest {
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-
-  @Mock private GcsUtil mockGcsUtil;
-  private DataflowPathValidator validator;
-
-  @Before
-  public void setUp() throws Exception {
-    MockitoAnnotations.initMocks(this);
-    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
-    when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setGcpCredential(new TestCredential());
-    options.setRunner(DataflowRunner.class);
-    options.setGcsUtil(mockGcsUtil);
-    validator = new DataflowPathValidator(options);
-  }
-
-  @Test
-  public void testValidFilePattern() {
-    validator.validateInputFilePatternSupported("gs://bucket/path");
-  }
-
-  @Test
-  public void testInvalidFilePattern() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "DataflowRunner expected a valid 'gs://' path but was given '/local/path'");
-    validator.validateInputFilePatternSupported("/local/path");
-  }
-
-  @Test
-  public void testWhenBucketDoesNotExist() throws Exception {
-    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "Could not find file gs://non-existent-bucket/location");
-    validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
-  }
-
-  @Test
-  public void testValidOutputPrefix() {
-    validator.validateOutputFilePrefixSupported("gs://bucket/path");
-  }
-
-  @Test
-  public void testInvalidOutputPrefix() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "DataflowRunner expected a valid 'gs://' path but was given '/local/path'");
-    validator.validateOutputFilePrefixSupported("/local/path");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
new file mode 100644
index 0000000..87f9181
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+import java.io.IOException;
+
+/**
+ * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
+ */
+public class GcsPathValidator implements PathValidator {
+
+  private GcsOptions gcpOptions;
+
+  private GcsPathValidator(GcsOptions options) {
+    this.gcpOptions = options;
+  }
+
+  public static GcsPathValidator fromOptions(PipelineOptions options) {
+    return new GcsPathValidator(options.as(GcsOptions.class));
+  }
+
+  /**
+   * Validates the the input GCS path is accessible and that the path
+   * is well formed.
+   */
+  @Override
+  public String validateInputFilePatternSupported(String filepattern) {
+    GcsPath gcsPath = getGcsPath(filepattern);
+    checkArgument(gcpOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
+    String returnValue = verifyPath(filepattern);
+    verifyPathIsAccessible(filepattern, "Could not find file %s");
+    return returnValue;
+  }
+
+  /**
+   * Validates the the output GCS path is accessible and that the path
+   * is well formed.
+   */
+  @Override
+  public String validateOutputFilePrefixSupported(String filePrefix) {
+    String returnValue = verifyPath(filePrefix);
+    verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
+    return returnValue;
+  }
+
+  @Override
+  public String verifyPath(String path) {
+    GcsPath gcsPath = getGcsPath(path);
+    checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
+    checkArgument(!gcsPath.getObject().contains("//"),
+        "Dataflow Service does not allow objects with consecutive slashes");
+    return gcsPath.toResourceName();
+  }
+
+  private void verifyPathIsAccessible(String path, String errorMessage) {
+    GcsPath gcsPath = getGcsPath(path);
+    try {
+      checkArgument(gcpOptions.getGcsUtil().bucketExists(gcsPath),
+        errorMessage, path);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
+          e);
+    }
+  }
+
+  private GcsPath getGcsPath(String path) {
+    try {
+      return GcsPath.fromUri(path);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(String.format(
+          "%s expected a valid 'gs://' path but was given '%s'",
+          gcpOptions.getRunner().getSimpleName(), path), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
new file mode 100644
index 0000000..d101627
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.GcsPathValidator;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link GcsPathValidator}. */
+@RunWith(JUnit4.class)
+public class GcsPathValidatorTest {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @Mock private GcsUtil mockGcsUtil;
+  private GcsPathValidator validator;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
+    when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
+    GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+    options.setGcsUtil(mockGcsUtil);
+    validator = GcsPathValidator.fromOptions(options);
+  }
+
+  @Test
+  public void testValidFilePattern() {
+    validator.validateInputFilePatternSupported("gs://bucket/path");
+  }
+
+  @Test
+  public void testInvalidFilePattern() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "DirectRunner expected a valid 'gs://' path but was given '/local/path'");
+    validator.validateInputFilePatternSupported("/local/path");
+  }
+
+  @Test
+  public void testWhenBucketDoesNotExist() throws Exception {
+    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Could not find file gs://non-existent-bucket/location");
+    validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
+  }
+
+  @Test
+  public void testValidOutputPrefix() {
+    validator.validateOutputFilePrefixSupported("gs://bucket/path");
+  }
+
+  @Test
+  public void testInvalidOutputPrefix() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "DirectRunner expected a valid 'gs://' path but was given '/local/path'");
+    validator.validateOutputFilePrefixSupported("/local/path");
+  }
+}


[5/5] incubator-beam git commit: Closes #761

Posted by dh...@apache.org.
Closes #761


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63c5d19b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63c5d19b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63c5d19b

Branch: refs/heads/master
Commit: 63c5d19b3909516402e06afdb916be3e5effbb96
Parents: d73e614 ab20823
Author: Dan Halperin <dh...@google.com>
Authored: Tue Aug 2 11:07:29 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Aug 2 11:07:29 2016 -0700

----------------------------------------------------------------------
 .../options/DataflowPipelineDebugOptions.java   |  43 --------
 .../options/DataflowPipelineOptions.java        |   6 +-
 .../dataflow/util/DataflowPathValidator.java    |  98 -----------------
 .../DataflowPipelineTranslatorTest.java         |   2 +-
 .../runners/dataflow/DataflowRunnerTest.java    |  39 ++++++-
 .../options/DataflowPipelineOptionsTest.java    |   3 +
 .../util/DataflowPathValidatorTest.java         |  94 -----------------
 .../org/apache/beam/sdk/options/GcpOptions.java |  11 +-
 .../org/apache/beam/sdk/options/GcsOptions.java |  44 ++++++++
 .../apache/beam/sdk/util/GcsPathValidator.java  |  97 +++++++++++++++++
 .../dataflow/util/GcsPathValidatorTest.java     | 104 +++++++++++++++++++
 .../apache/beam/sdk/options/GcpOptionsTest.java |   2 +
 12 files changed, 300 insertions(+), 243 deletions(-)
----------------------------------------------------------------------