You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/02 18:07:34 UTC
[2/5] incubator-beam git commit: Move DataflowPathValidator to
sdk.util, rename to GcsPathValidator
Move DataflowPathValidator to sdk.util, rename to GcsPathValidator
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/724c88d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/724c88d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/724c88d9
Branch: refs/heads/master
Commit: 724c88d950d3d872b364c6a761a1c4faa32efe54
Parents: 1f87b84
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 29 16:11:33 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Aug 2 11:07:28 2016 -0700
----------------------------------------------------------------------
.../options/DataflowPipelineDebugOptions.java | 4 +-
.../dataflow/util/DataflowPathValidator.java | 98 --------------------
.../DataflowPipelineTranslatorTest.java | 2 +-
.../util/DataflowPathValidatorTest.java | 94 -------------------
.../apache/beam/sdk/util/GcsPathValidator.java | 97 +++++++++++++++++++
.../dataflow/util/GcsPathValidatorTest.java | 93 +++++++++++++++++++
6 files changed, 193 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index 8765adf..bc92a5f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.dataflow.options;
-import org.apache.beam.runners.dataflow.util.DataflowPathValidator;
+import org.apache.beam.sdk.util.GcsPathValidator;
import org.apache.beam.runners.dataflow.util.DataflowTransport;
import org.apache.beam.runners.dataflow.util.GcsStager;
import org.apache.beam.runners.dataflow.util.Stager;
@@ -108,7 +108,7 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions {
@Description("The class of the validator that should be created and used to validate paths. "
+ "If pathValidator has not been set explicitly, an instance of this class will be "
+ "constructed and used as the path validator.")
- @Default.Class(DataflowPathValidator.class)
+ @Default.Class(GcsPathValidator.class)
Class<? extends PathValidator> getPathValidatorClass();
void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
deleted file mode 100644
index ec10b28..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.PathValidator;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import java.io.IOException;
-
-/**
- * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
- */
-public class DataflowPathValidator implements PathValidator {
-
- private DataflowPipelineOptions dataflowOptions;
-
- DataflowPathValidator(DataflowPipelineOptions options) {
- this.dataflowOptions = options;
- }
-
- public static DataflowPathValidator fromOptions(PipelineOptions options) {
- return new DataflowPathValidator(options.as(DataflowPipelineOptions.class));
- }
-
- /**
- * Validates the the input GCS path is accessible and that the path
- * is well formed.
- */
- @Override
- public String validateInputFilePatternSupported(String filepattern) {
- GcsPath gcsPath = getGcsPath(filepattern);
- checkArgument(dataflowOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
- String returnValue = verifyPath(filepattern);
- verifyPathIsAccessible(filepattern, "Could not find file %s");
- return returnValue;
- }
-
- /**
- * Validates the the output GCS path is accessible and that the path
- * is well formed.
- */
- @Override
- public String validateOutputFilePrefixSupported(String filePrefix) {
- String returnValue = verifyPath(filePrefix);
- verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
- return returnValue;
- }
-
- @Override
- public String verifyPath(String path) {
- GcsPath gcsPath = getGcsPath(path);
- checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
- checkArgument(!gcsPath.getObject().contains("//"),
- "Dataflow Service does not allow objects with consecutive slashes");
- return gcsPath.toResourceName();
- }
-
- private void verifyPathIsAccessible(String path, String errorMessage) {
- GcsPath gcsPath = getGcsPath(path);
- try {
- checkArgument(dataflowOptions.getGcsUtil().bucketExists(gcsPath),
- errorMessage, path);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
- e);
- }
- }
-
- private GcsPath getGcsPath(String path) {
- try {
- return GcsPath.fromUri(path);
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(String.format(
- "%s expected a valid 'gs://' path but was given '%s'",
- dataflowOptions.getRunner().getSimpleName(), path), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index d4d571b..7d89735 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -194,7 +194,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
settings.put("appName", "DataflowPipelineTranslatorTest");
settings.put("project", "some-project");
settings.put("pathValidatorClass",
- "org.apache.beam.runners.dataflow.util.DataflowPathValidator");
+ "org.apache.beam.sdk.util.GcsPathValidator");
settings.put("runner", "org.apache.beam.runners.dataflow.DataflowRunner");
settings.put("jobName", "some-job-name");
settings.put("tempLocation", "gs://somebucket/some/path");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
deleted file mode 100644
index a91f56c..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.util;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Tests for {@link DataflowPathValidator}. */
-@RunWith(JUnit4.class)
-public class DataflowPathValidatorTest {
- @Rule public ExpectedException expectedException = ExpectedException.none();
-
- @Mock private GcsUtil mockGcsUtil;
- private DataflowPathValidator validator;
-
- @Before
- public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
- when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
- when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
- DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
- options.setGcpCredential(new TestCredential());
- options.setRunner(DataflowRunner.class);
- options.setGcsUtil(mockGcsUtil);
- validator = new DataflowPathValidator(options);
- }
-
- @Test
- public void testValidFilePattern() {
- validator.validateInputFilePatternSupported("gs://bucket/path");
- }
-
- @Test
- public void testInvalidFilePattern() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "DataflowRunner expected a valid 'gs://' path but was given '/local/path'");
- validator.validateInputFilePatternSupported("/local/path");
- }
-
- @Test
- public void testWhenBucketDoesNotExist() throws Exception {
- when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "Could not find file gs://non-existent-bucket/location");
- validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
- }
-
- @Test
- public void testValidOutputPrefix() {
- validator.validateOutputFilePrefixSupported("gs://bucket/path");
- }
-
- @Test
- public void testInvalidOutputPrefix() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "DataflowRunner expected a valid 'gs://' path but was given '/local/path'");
- validator.validateOutputFilePrefixSupported("/local/path");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
new file mode 100644
index 0000000..87f9181
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+import java.io.IOException;
+
+/**
+ * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
+ */
+public class GcsPathValidator implements PathValidator {
+
+ private GcsOptions gcpOptions;
+
+ private GcsPathValidator(GcsOptions options) {
+ this.gcpOptions = options;
+ }
+
+ public static GcsPathValidator fromOptions(PipelineOptions options) {
+ return new GcsPathValidator(options.as(GcsOptions.class));
+ }
+
+ /**
+ * Validates the the input GCS path is accessible and that the path
+ * is well formed.
+ */
+ @Override
+ public String validateInputFilePatternSupported(String filepattern) {
+ GcsPath gcsPath = getGcsPath(filepattern);
+ checkArgument(gcpOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
+ String returnValue = verifyPath(filepattern);
+ verifyPathIsAccessible(filepattern, "Could not find file %s");
+ return returnValue;
+ }
+
+ /**
+ * Validates the the output GCS path is accessible and that the path
+ * is well formed.
+ */
+ @Override
+ public String validateOutputFilePrefixSupported(String filePrefix) {
+ String returnValue = verifyPath(filePrefix);
+ verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
+ return returnValue;
+ }
+
+ @Override
+ public String verifyPath(String path) {
+ GcsPath gcsPath = getGcsPath(path);
+ checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
+ checkArgument(!gcsPath.getObject().contains("//"),
+ "Dataflow Service does not allow objects with consecutive slashes");
+ return gcsPath.toResourceName();
+ }
+
+ private void verifyPathIsAccessible(String path, String errorMessage) {
+ GcsPath gcsPath = getGcsPath(path);
+ try {
+ checkArgument(gcpOptions.getGcsUtil().bucketExists(gcsPath),
+ errorMessage, path);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
+ e);
+ }
+ }
+
+ private GcsPath getGcsPath(String path) {
+ try {
+ return GcsPath.fromUri(path);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(String.format(
+ "%s expected a valid 'gs://' path but was given '%s'",
+ gcpOptions.getRunner().getSimpleName(), path), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
new file mode 100644
index 0000000..d101627
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.GcsPathValidator;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link GcsPathValidator}. */
+@RunWith(JUnit4.class)
+public class GcsPathValidatorTest {
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ @Mock private GcsUtil mockGcsUtil;
+ private GcsPathValidator validator;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
+ when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
+ GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+ options.setGcsUtil(mockGcsUtil);
+ validator = GcsPathValidator.fromOptions(options);
+ }
+
+ @Test
+ public void testValidFilePattern() {
+ validator.validateInputFilePatternSupported("gs://bucket/path");
+ }
+
+ @Test
+ public void testInvalidFilePattern() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "DirectRunner expected a valid 'gs://' path but was given '/local/path'");
+ validator.validateInputFilePatternSupported("/local/path");
+ }
+
+ @Test
+ public void testWhenBucketDoesNotExist() throws Exception {
+ when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Could not find file gs://non-existent-bucket/location");
+ validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
+ }
+
+ @Test
+ public void testValidOutputPrefix() {
+ validator.validateOutputFilePrefixSupported("gs://bucket/path");
+ }
+
+ @Test
+ public void testInvalidOutputPrefix() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "DirectRunner expected a valid 'gs://' path but was given '/local/path'");
+ validator.validateOutputFilePrefixSupported("/local/path");
+ }
+}