You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/02 18:07:33 UTC
[1/5] incubator-beam git commit: Move getPathValidator to GcsOptions
and use it in temp/staging locations
Repository: incubator-beam
Updated Branches:
refs/heads/master d73e614b7 -> 63c5d19b3
Move getPathValidator to GcsOptions and use it in temp/staging locations
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/70f394b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/70f394b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/70f394b8
Branch: refs/heads/master
Commit: 70f394b8f238811b9f237b0c401f5388e12955bd
Parents: 724c88d
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 29 16:34:37 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Aug 2 11:07:28 2016 -0700
----------------------------------------------------------------------
.../options/DataflowPipelineDebugOptions.java | 43 -------------------
.../options/DataflowPipelineOptions.java | 5 ++-
.../runners/dataflow/DataflowRunnerTest.java | 4 +-
.../org/apache/beam/sdk/options/GcpOptions.java | 11 +++--
.../org/apache/beam/sdk/options/GcsOptions.java | 44 ++++++++++++++++++++
5 files changed, 57 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70f394b8/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index bc92a5f..ac2e0b7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.dataflow.options;
-import org.apache.beam.sdk.util.GcsPathValidator;
import org.apache.beam.runners.dataflow.util.DataflowTransport;
import org.apache.beam.runners.dataflow.util.GcsStager;
import org.apache.beam.runners.dataflow.util.Stager;
@@ -28,7 +27,6 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.PathValidator;
import com.google.api.services.dataflow.Dataflow;
@@ -101,31 +99,6 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions {
void setDataflowJobFile(String value);
/**
- * The class of the validator that should be created and used to validate paths.
- * If pathValidator has not been set explicitly, an instance of this class will be
- * constructed and used as the path validator.
- */
- @Description("The class of the validator that should be created and used to validate paths. "
- + "If pathValidator has not been set explicitly, an instance of this class will be "
- + "constructed and used as the path validator.")
- @Default.Class(GcsPathValidator.class)
- Class<? extends PathValidator> getPathValidatorClass();
- void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
-
- /**
- * The path validator instance that should be used to validate paths.
- * If no path validator has been set explicitly, the default is to use the instance factory that
- * constructs a path validator based upon the currently set pathValidatorClass.
- */
- @JsonIgnore
- @Description("The path validator instance that should be used to validate paths. "
- + "If no path validator has been set explicitly, the default is to use the instance factory "
- + "that constructs a path validator based upon the currently set pathValidatorClass.")
- @Default.InstanceFactory(PathValidatorFactory.class)
- PathValidator getPathValidator();
- void setPathValidator(PathValidator validator);
-
- /**
* The class responsible for staging resources to be accessible by workers
* during job execution. If stager has not been set explicitly, an instance of this class
* will be created and used as the resource stager.
@@ -226,22 +199,6 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions {
void setDumpHeapOnOOM(boolean dumpHeapBeforeExit);
/**
- * Creates a {@link PathValidator} object using the class specified in
- * {@link #getPathValidatorClass()}.
- */
- public static class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
- @Override
- public PathValidator create(PipelineOptions options) {
- DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
- return InstanceBuilder.ofType(PathValidator.class)
- .fromClass(debugOptions.getPathValidatorClass())
- .fromFactoryMethod("fromOptions")
- .withArg(PipelineOptions.class, options)
- .build();
- }
- }
-
- /**
* Creates a {@link Stager} object using the class specified in
* {@link #getStagerClass()}.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70f394b8/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index b69b6f9..0c5bf47 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -138,13 +138,14 @@ public interface DataflowPipelineOptions
@Override
public String create(PipelineOptions options) {
- String gcpTempLocation = options.as(GcpOptions.class).getGcpTempLocation();
+ GcsOptions gcsOptions = options.as(GcsOptions.class);
+ String gcpTempLocation = gcsOptions.getGcpTempLocation();
checkArgument(!isNullOrEmpty(gcpTempLocation),
"Error constructing default value for stagingLocation: gcpTempLocation is missing."
+ "Either stagingLocation must be set explicitly or a valid value must be provided"
+ "for gcpTempLocation.");
try {
- GcsPath.fromUri(gcpTempLocation);
+ gcsOptions.getPathValidator().validateOutputFilePrefixSupported(gcpTempLocation);
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"Error constructing default value for stagingLocation: gcpTempLocation is not"
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70f394b8/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index d3f2f75..a29cdc9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -229,7 +229,7 @@ public class DataflowRunnerTest {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject(PROJECT_ID);
- options.setTempLocation("gs://somebucket/some/path");
+ options.setGcpTempLocation("gs://somebucket/some/path");
// Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
options.setFilesToStage(new LinkedList<String>());
options.setDataflowClient(buildMockDataflow(jobCaptor));
@@ -601,7 +601,7 @@ public class DataflowRunnerTest {
buildMockGcsUtil(false /* temp bucket exists */, true /* staging bucket exists */);
DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
options.setGcsUtil(mockGcsUtil);
- options.setTempLocation("gs://non-existent-bucket/location");
+ options.setGcpTempLocation("gs://non-existent-bucket/location");
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(containsString(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70f394b8/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
index 1bf4dd6..de3f133 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.options;
import org.apache.beam.sdk.util.CredentialFactory;
import org.apache.beam.sdk.util.GcpCredentialFactory;
import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.util.PathValidator;
import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.googleapis.auth.oauth2.GoogleOAuthConstants;
@@ -42,6 +42,8 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+
/**
* Options used to configure Google Cloud Platform project and credentials.
*
@@ -300,7 +302,7 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
*/
@Description("A GCS path for storing temporary files in GCP.")
@Default.InstanceFactory(GcpTempLocationFactory.class)
- String getGcpTempLocation();
+ @Nullable String getGcpTempLocation();
void setGcpTempLocation(String value);
/**
@@ -309,11 +311,14 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
public static class GcpTempLocationFactory implements DefaultValueFactory<String> {
@Override
+ @Nullable
public String create(PipelineOptions options) {
String tempLocation = options.getTempLocation();
if (!Strings.isNullOrEmpty(tempLocation)) {
try {
- GcsPath.fromUri(tempLocation);
+ PathValidator validator = options.as(GcsOptions.class).getPathValidator();
+ System.err.println(validator);
+ validator.validateOutputFilePrefixSupported(tempLocation);
} catch (Exception e) {
// Ignore the temp location because it is not a valid 'gs://' path.
return null;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70f394b8/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
index bae4742..1b3436b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java
@@ -18,7 +18,10 @@
package org.apache.beam.sdk.options;
import org.apache.beam.sdk.util.AppEngineEnvironment;
+import org.apache.beam.sdk.util.GcsPathValidator;
import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PathValidator;
import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
import com.google.common.util.concurrent.MoreExecutors;
@@ -85,6 +88,31 @@ public interface GcsOptions extends
void setGcsUploadBufferSizeBytes(Integer bytes);
/**
+ * The class of the validator that should be created and used to validate paths.
+ * If pathValidator has not been set explicitly, an instance of this class will be
+ * constructed and used as the path validator.
+ */
+ @Description("The class of the validator that should be created and used to validate paths. "
+ + "If pathValidator has not been set explicitly, an instance of this class will be "
+ + "constructed and used as the path validator.")
+ @Default.Class(GcsPathValidator.class)
+ Class<? extends PathValidator> getPathValidatorClass();
+ void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
+
+ /**
+ * The path validator instance that should be used to validate paths.
+ * If no path validator has been set explicitly, the default is to use the instance factory that
+ * constructs a path validator based upon the currently set pathValidatorClass.
+ */
+ @JsonIgnore
+ @Description("The path validator instance that should be used to validate paths. "
+ + "If no path validator has been set explicitly, the default is to use the instance factory "
+ + "that constructs a path validator based upon the currently set pathValidatorClass.")
+ @Default.InstanceFactory(PathValidatorFactory.class)
+ PathValidator getPathValidator();
+ void setPathValidator(PathValidator validator);
+
+ /**
* Returns the default {@link ExecutorService} to use within the Dataflow SDK. The
* {@link ExecutorService} is compatible with AppEngine.
*/
@@ -112,4 +140,20 @@ public interface GcsOptions extends
threadFactoryBuilder.build());
}
}
+
+ /**
+ * Creates a {@link PathValidator} object using the class specified in
+ * {@link #getPathValidatorClass()}.
+ */
+ public static class PathValidatorFactory implements DefaultValueFactory<PathValidator> {
+ @Override
+ public PathValidator create(PipelineOptions options) {
+ GcsOptions gcsOptions = options.as(GcsOptions.class);
+ return InstanceBuilder.ofType(PathValidator.class)
+ .fromClass(gcsOptions.getPathValidatorClass())
+ .fromFactoryMethod("fromOptions")
+ .withArg(PipelineOptions.class, options)
+ .build();
+ }
+ }
}
[4/5] incubator-beam git commit: Some fixups to tests
Posted by dh...@apache.org.
Some fixups to tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ab20823c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ab20823c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ab20823c
Branch: refs/heads/master
Commit: ab20823cda024ed3061034774c2a896d2ddc0671
Parents: 70f394b
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 29 16:51:09 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Aug 2 11:07:29 2016 -0700
----------------------------------------------------------------------
.../dataflow/options/DataflowPipelineOptions.java | 1 -
.../beam/runners/dataflow/DataflowRunnerTest.java | 3 ++-
.../options/DataflowPipelineOptionsTest.java | 3 +++
.../runners/dataflow/util/GcsPathValidatorTest.java | 15 +++++++++++++--
.../org/apache/beam/sdk/options/GcpOptionsTest.java | 2 ++
5 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab20823c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 0c5bf47..841741f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -33,7 +33,6 @@ import org.apache.beam.sdk.options.PubsubOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
import com.google.common.base.MoreObjects;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab20823c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index a29cdc9..704410d 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -229,7 +229,7 @@ public class DataflowRunnerTest {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject(PROJECT_ID);
- options.setGcpTempLocation("gs://somebucket/some/path");
+ options.setTempLocation("gs://somebucket/some/path");
// Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
options.setFilesToStage(new LinkedList<String>());
options.setDataflowClient(buildMockDataflow(jobCaptor));
@@ -617,6 +617,7 @@ public class DataflowRunnerTest {
GcsUtil mockGcsUtil =
buildMockGcsUtil(true /* temp bucket exists */, false /* staging bucket exists */);
DataflowPipelineOptions options = buildPipelineOptions(jobCaptor);
+ options.setGcpTempLocation(options.getTempLocation()); // bypass validation for GcpTempLocation
options.setGcsUtil(mockGcsUtil);
options.setStagingLocation("gs://non-existent-bucket/location");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab20823c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index b5ee5e9..c0422ee 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.testing.RestoreSystemProperties;
import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.NoopPathValidator;
import org.junit.Rule;
import org.junit.Test;
@@ -109,6 +110,7 @@ public class DataflowPipelineOptionsTest {
public void testDefaultToTempLocation() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
IOChannelUtils.registerStandardIOFactories(options);
+ options.setPathValidatorClass(NoopPathValidator.class);
options.setTempLocation("gs://temp_location");
assertEquals("gs://temp_location", options.getGcpTempLocation());
assertEquals("gs://temp_location/staging", options.getStagingLocation());
@@ -118,6 +120,7 @@ public class DataflowPipelineOptionsTest {
public void testDefaultToGcpTempLocation() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
IOChannelUtils.registerStandardIOFactories(options);
+ options.setPathValidatorClass(NoopPathValidator.class);
options.setTempLocation("gs://temp_location");
options.setGcpTempLocation("gs://gcp_temp_location");
assertEquals("gs://gcp_temp_location/staging", options.getStagingLocation());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab20823c/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
index d101627..8913916 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
@@ -21,8 +21,11 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.when;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.util.GcsPathValidator;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.TestCredential;
@@ -45,12 +48,20 @@ public class GcsPathValidatorTest {
@Mock private GcsUtil mockGcsUtil;
private GcsPathValidator validator;
+ private class FakeRunner extends PipelineRunner<PipelineResult> {
+ @Override
+ public PipelineResult run(Pipeline pipeline) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+ options.setRunner(FakeRunner.class);
options.setGcpCredential(new TestCredential());
options.setGcsUtil(mockGcsUtil);
validator = GcsPathValidator.fromOptions(options);
@@ -65,7 +76,7 @@ public class GcsPathValidatorTest {
public void testInvalidFilePattern() {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(
- "DirectRunner expected a valid 'gs://' path but was given '/local/path'");
+ "FakeRunner expected a valid 'gs://' path but was given '/local/path'");
validator.validateInputFilePatternSupported("/local/path");
}
@@ -87,7 +98,7 @@ public class GcsPathValidatorTest {
public void testInvalidOutputPrefix() {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(
- "DirectRunner expected a valid 'gs://' path but was given '/local/path'");
+ "FakeRunner expected a valid 'gs://' path but was given '/local/path'");
validator.validateOutputFilePrefixSupported("/local/path");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab20823c/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
index c0f65d8..22359dc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
import org.apache.beam.sdk.options.GcpOptions.DefaultProjectFactory;
import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.sdk.util.NoopPathValidator;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
@@ -117,6 +118,7 @@ public class GcpOptionsTest {
GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
String tempLocation = "gs://bucket";
options.setTempLocation(tempLocation);
+ options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class);
assertEquals(tempLocation, options.getGcpTempLocation());
}
[3/5] incubator-beam git commit: DataflowRunner: test path validation
Posted by dh...@apache.org.
DataflowRunner: test path validation
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1f87b848
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1f87b848
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1f87b848
Branch: refs/heads/master
Commit: 1f87b84838f53c49151cfadcd20dcf6890f8491f
Parents: d73e614
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 29 15:30:16 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Aug 2 11:07:28 2016 -0700
----------------------------------------------------------------------
.../runners/dataflow/DataflowRunnerTest.java | 36 ++++++++++++++++++++
1 file changed, 36 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f87b848/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index fe288ad..d3f2f75 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow;
import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@@ -62,6 +63,7 @@ import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
+import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.PTransform;
@@ -70,6 +72,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.NoopCredentialFactory;
import org.apache.beam.sdk.util.NoopPathValidator;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.TestCredential;
@@ -88,6 +91,7 @@ import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.ListJobsResponse;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -235,6 +239,38 @@ public class DataflowRunnerTest {
}
@Test
+ public void testPathValidation() {
+ String[] args = new String[] {
+ "--runner=DataflowRunner",
+ "--tempLocation=/tmp/not/a/gs/path",
+ "--project=test-project",
+ "--credentialFactoryClass=" + NoopCredentialFactory.class.getCanonicalName(),
+ };
+
+ try {
+ TestPipeline.fromOptions(PipelineOptionsFactory.fromArgs(args).create());
+ fail();
+ } catch (RuntimeException e) {
+ assertThat(
+ Throwables.getStackTraceAsString(e),
+ containsString("DataflowRunner requires gcpTempLocation"));
+ }
+ }
+
+ @Test
+ public void testPathValidatorOverride() {
+ String[] args = new String[] {
+ "--runner=DataflowRunner",
+ "--tempLocation=/tmp/testing",
+ "--project=test-project",
+ "--credentialFactoryClass=" + NoopCredentialFactory.class.getCanonicalName(),
+ "--pathValidatorClass=" + NoopPathValidator.class.getCanonicalName(),
+ };
+ // Should not crash, because gcpTempLocation should get set from tempLocation
+ TestPipeline.fromOptions(PipelineOptionsFactory.fromArgs(args).create());
+ }
+
+ @Test
public void testFromOptionsWithUppercaseConvertsToLowercase() throws Exception {
String mixedCase = "ThisJobNameHasMixedCase";
ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
[2/5] incubator-beam git commit: Move DataflowPathValidator to
sdk.util, rename to GcsPathValidator
Posted by dh...@apache.org.
Move DataflowPathValidator to sdk.util, rename to GcsPathValidator
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/724c88d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/724c88d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/724c88d9
Branch: refs/heads/master
Commit: 724c88d950d3d872b364c6a761a1c4faa32efe54
Parents: 1f87b84
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 29 16:11:33 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Aug 2 11:07:28 2016 -0700
----------------------------------------------------------------------
.../options/DataflowPipelineDebugOptions.java | 4 +-
.../dataflow/util/DataflowPathValidator.java | 98 --------------------
.../DataflowPipelineTranslatorTest.java | 2 +-
.../util/DataflowPathValidatorTest.java | 94 -------------------
.../apache/beam/sdk/util/GcsPathValidator.java | 97 +++++++++++++++++++
.../dataflow/util/GcsPathValidatorTest.java | 93 +++++++++++++++++++
6 files changed, 193 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index 8765adf..bc92a5f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.dataflow.options;
-import org.apache.beam.runners.dataflow.util.DataflowPathValidator;
+import org.apache.beam.sdk.util.GcsPathValidator;
import org.apache.beam.runners.dataflow.util.DataflowTransport;
import org.apache.beam.runners.dataflow.util.GcsStager;
import org.apache.beam.runners.dataflow.util.Stager;
@@ -108,7 +108,7 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions {
@Description("The class of the validator that should be created and used to validate paths. "
+ "If pathValidator has not been set explicitly, an instance of this class will be "
+ "constructed and used as the path validator.")
- @Default.Class(DataflowPathValidator.class)
+ @Default.Class(GcsPathValidator.class)
Class<? extends PathValidator> getPathValidatorClass();
void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
deleted file mode 100644
index ec10b28..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.PathValidator;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import java.io.IOException;
-
-/**
- * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
- */
-public class DataflowPathValidator implements PathValidator {
-
- private DataflowPipelineOptions dataflowOptions;
-
- DataflowPathValidator(DataflowPipelineOptions options) {
- this.dataflowOptions = options;
- }
-
- public static DataflowPathValidator fromOptions(PipelineOptions options) {
- return new DataflowPathValidator(options.as(DataflowPipelineOptions.class));
- }
-
- /**
- * Validates the the input GCS path is accessible and that the path
- * is well formed.
- */
- @Override
- public String validateInputFilePatternSupported(String filepattern) {
- GcsPath gcsPath = getGcsPath(filepattern);
- checkArgument(dataflowOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
- String returnValue = verifyPath(filepattern);
- verifyPathIsAccessible(filepattern, "Could not find file %s");
- return returnValue;
- }
-
- /**
- * Validates the the output GCS path is accessible and that the path
- * is well formed.
- */
- @Override
- public String validateOutputFilePrefixSupported(String filePrefix) {
- String returnValue = verifyPath(filePrefix);
- verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
- return returnValue;
- }
-
- @Override
- public String verifyPath(String path) {
- GcsPath gcsPath = getGcsPath(path);
- checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
- checkArgument(!gcsPath.getObject().contains("//"),
- "Dataflow Service does not allow objects with consecutive slashes");
- return gcsPath.toResourceName();
- }
-
- private void verifyPathIsAccessible(String path, String errorMessage) {
- GcsPath gcsPath = getGcsPath(path);
- try {
- checkArgument(dataflowOptions.getGcsUtil().bucketExists(gcsPath),
- errorMessage, path);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
- e);
- }
- }
-
- private GcsPath getGcsPath(String path) {
- try {
- return GcsPath.fromUri(path);
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(String.format(
- "%s expected a valid 'gs://' path but was given '%s'",
- dataflowOptions.getRunner().getSimpleName(), path), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index d4d571b..7d89735 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -194,7 +194,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
settings.put("appName", "DataflowPipelineTranslatorTest");
settings.put("project", "some-project");
settings.put("pathValidatorClass",
- "org.apache.beam.runners.dataflow.util.DataflowPathValidator");
+ "org.apache.beam.sdk.util.GcsPathValidator");
settings.put("runner", "org.apache.beam.runners.dataflow.DataflowRunner");
settings.put("jobName", "some-job-name");
settings.put("tempLocation", "gs://somebucket/some/path");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
deleted file mode 100644
index a91f56c..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.util;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Tests for {@link DataflowPathValidator}. */
-@RunWith(JUnit4.class)
-public class DataflowPathValidatorTest {
- @Rule public ExpectedException expectedException = ExpectedException.none();
-
- @Mock private GcsUtil mockGcsUtil;
- private DataflowPathValidator validator;
-
- @Before
- public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
- when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
- when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setGcpCredential(new TestCredential());
- options.setRunner(DataflowRunner.class);
- options.setGcsUtil(mockGcsUtil);
- validator = new DataflowPathValidator(options);
- }
-
- @Test
- public void testValidFilePattern() {
- validator.validateInputFilePatternSupported("gs://bucket/path");
- }
-
- @Test
- public void testInvalidFilePattern() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "DataflowRunner expected a valid 'gs://' path but was given '/local/path'");
- validator.validateInputFilePatternSupported("/local/path");
- }
-
- @Test
- public void testWhenBucketDoesNotExist() throws Exception {
- when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "Could not find file gs://non-existent-bucket/location");
- validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
- }
-
- @Test
- public void testValidOutputPrefix() {
- validator.validateOutputFilePrefixSupported("gs://bucket/path");
- }
-
- @Test
- public void testInvalidOutputPrefix() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "DataflowRunner expected a valid 'gs://' path but was given '/local/path'");
- validator.validateOutputFilePrefixSupported("/local/path");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
new file mode 100644
index 0000000..87f9181
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+import java.io.IOException;
+
+/**
+ * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
+ */
+public class GcsPathValidator implements PathValidator {
+
+ private GcsOptions gcpOptions;
+
+ private GcsPathValidator(GcsOptions options) {
+ this.gcpOptions = options;
+ }
+
+ public static GcsPathValidator fromOptions(PipelineOptions options) {
+ return new GcsPathValidator(options.as(GcsOptions.class));
+ }
+
+ /**
+ * Validates the the input GCS path is accessible and that the path
+ * is well formed.
+ */
+ @Override
+ public String validateInputFilePatternSupported(String filepattern) {
+ GcsPath gcsPath = getGcsPath(filepattern);
+ checkArgument(gcpOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
+ String returnValue = verifyPath(filepattern);
+ verifyPathIsAccessible(filepattern, "Could not find file %s");
+ return returnValue;
+ }
+
+ /**
+ * Validates the the output GCS path is accessible and that the path
+ * is well formed.
+ */
+ @Override
+ public String validateOutputFilePrefixSupported(String filePrefix) {
+ String returnValue = verifyPath(filePrefix);
+ verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
+ return returnValue;
+ }
+
+ @Override
+ public String verifyPath(String path) {
+ GcsPath gcsPath = getGcsPath(path);
+ checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
+ checkArgument(!gcsPath.getObject().contains("//"),
+ "Dataflow Service does not allow objects with consecutive slashes");
+ return gcsPath.toResourceName();
+ }
+
+ private void verifyPathIsAccessible(String path, String errorMessage) {
+ GcsPath gcsPath = getGcsPath(path);
+ try {
+ checkArgument(gcpOptions.getGcsUtil().bucketExists(gcsPath),
+ errorMessage, path);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
+ e);
+ }
+ }
+
+ private GcsPath getGcsPath(String path) {
+ try {
+ return GcsPath.fromUri(path);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(String.format(
+ "%s expected a valid 'gs://' path but was given '%s'",
+ gcpOptions.getRunner().getSimpleName(), path), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
new file mode 100644
index 0000000..d101627
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.GcsPathValidator;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link GcsPathValidator}. */
+@RunWith(JUnit4.class)
+public class GcsPathValidatorTest {
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ @Mock private GcsUtil mockGcsUtil;
+ private GcsPathValidator validator;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
+ when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
+ GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+ options.setGcsUtil(mockGcsUtil);
+ validator = GcsPathValidator.fromOptions(options);
+ }
+
+ @Test
+ public void testValidFilePattern() {
+ validator.validateInputFilePatternSupported("gs://bucket/path");
+ }
+
+ @Test
+ public void testInvalidFilePattern() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "DirectRunner expected a valid 'gs://' path but was given '/local/path'");
+ validator.validateInputFilePatternSupported("/local/path");
+ }
+
+ @Test
+ public void testWhenBucketDoesNotExist() throws Exception {
+ when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Could not find file gs://non-existent-bucket/location");
+ validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
+ }
+
+ @Test
+ public void testValidOutputPrefix() {
+ validator.validateOutputFilePrefixSupported("gs://bucket/path");
+ }
+
+ @Test
+ public void testInvalidOutputPrefix() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "DirectRunner expected a valid 'gs://' path but was given '/local/path'");
+ validator.validateOutputFilePrefixSupported("/local/path");
+ }
+}
[5/5] incubator-beam git commit: Closes #761
Posted by dh...@apache.org.
Closes #761
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63c5d19b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63c5d19b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63c5d19b
Branch: refs/heads/master
Commit: 63c5d19b3909516402e06afdb916be3e5effbb96
Parents: d73e614 ab20823
Author: Dan Halperin <dh...@google.com>
Authored: Tue Aug 2 11:07:29 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Aug 2 11:07:29 2016 -0700
----------------------------------------------------------------------
.../options/DataflowPipelineDebugOptions.java | 43 --------
.../options/DataflowPipelineOptions.java | 6 +-
.../dataflow/util/DataflowPathValidator.java | 98 -----------------
.../DataflowPipelineTranslatorTest.java | 2 +-
.../runners/dataflow/DataflowRunnerTest.java | 39 ++++++-
.../options/DataflowPipelineOptionsTest.java | 3 +
.../util/DataflowPathValidatorTest.java | 94 -----------------
.../org/apache/beam/sdk/options/GcpOptions.java | 11 +-
.../org/apache/beam/sdk/options/GcsOptions.java | 44 ++++++++
.../apache/beam/sdk/util/GcsPathValidator.java | 97 +++++++++++++++++
.../dataflow/util/GcsPathValidatorTest.java | 104 +++++++++++++++++++
.../apache/beam/sdk/options/GcpOptionsTest.java | 2 +
12 files changed, 300 insertions(+), 243 deletions(-)
----------------------------------------------------------------------