You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/02 18:07:34 UTC

[2/5] incubator-beam git commit: Move DataflowPathValidator to sdk.util, rename to GcsPathValidator

Move DataflowPathValidator to sdk.util, rename to GcsPathValidator


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/724c88d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/724c88d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/724c88d9

Branch: refs/heads/master
Commit: 724c88d950d3d872b364c6a761a1c4faa32efe54
Parents: 1f87b84
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 29 16:11:33 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Aug 2 11:07:28 2016 -0700

----------------------------------------------------------------------
 .../options/DataflowPipelineDebugOptions.java   |  4 +-
 .../dataflow/util/DataflowPathValidator.java    | 98 --------------------
 .../DataflowPipelineTranslatorTest.java         |  2 +-
 .../util/DataflowPathValidatorTest.java         | 94 -------------------
 .../apache/beam/sdk/util/GcsPathValidator.java  | 97 +++++++++++++++++++
 .../dataflow/util/GcsPathValidatorTest.java     | 93 +++++++++++++++++++
 6 files changed, 193 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index 8765adf..bc92a5f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.dataflow.options;
 
-import org.apache.beam.runners.dataflow.util.DataflowPathValidator;
+import org.apache.beam.sdk.util.GcsPathValidator;
 import org.apache.beam.runners.dataflow.util.DataflowTransport;
 import org.apache.beam.runners.dataflow.util.GcsStager;
 import org.apache.beam.runners.dataflow.util.Stager;
@@ -108,7 +108,7 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions {
   @Description("The class of the validator that should be created and used to validate paths. "
       + "If pathValidator has not been set explicitly, an instance of this class will be "
       + "constructed and used as the path validator.")
-  @Default.Class(DataflowPathValidator.class)
+  @Default.Class(GcsPathValidator.class)
   Class<? extends PathValidator> getPathValidatorClass();
   void setPathValidatorClass(Class<? extends PathValidator> validatorClass);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
deleted file mode 100644
index ec10b28..0000000
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowPathValidator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.PathValidator;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import java.io.IOException;
-
-/**
- * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
- */
-public class DataflowPathValidator implements PathValidator {
-
-  private DataflowPipelineOptions dataflowOptions;
-
-  DataflowPathValidator(DataflowPipelineOptions options) {
-    this.dataflowOptions = options;
-  }
-
-  public static DataflowPathValidator fromOptions(PipelineOptions options) {
-    return new DataflowPathValidator(options.as(DataflowPipelineOptions.class));
-  }
-
-  /**
-   * Validates the the input GCS path is accessible and that the path
-   * is well formed.
-   */
-  @Override
-  public String validateInputFilePatternSupported(String filepattern) {
-    GcsPath gcsPath = getGcsPath(filepattern);
-    checkArgument(dataflowOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
-    String returnValue = verifyPath(filepattern);
-    verifyPathIsAccessible(filepattern, "Could not find file %s");
-    return returnValue;
-  }
-
-  /**
-   * Validates the the output GCS path is accessible and that the path
-   * is well formed.
-   */
-  @Override
-  public String validateOutputFilePrefixSupported(String filePrefix) {
-    String returnValue = verifyPath(filePrefix);
-    verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
-    return returnValue;
-  }
-
-  @Override
-  public String verifyPath(String path) {
-    GcsPath gcsPath = getGcsPath(path);
-    checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
-    checkArgument(!gcsPath.getObject().contains("//"),
-        "Dataflow Service does not allow objects with consecutive slashes");
-    return gcsPath.toResourceName();
-  }
-
-  private void verifyPathIsAccessible(String path, String errorMessage) {
-    GcsPath gcsPath = getGcsPath(path);
-    try {
-      checkArgument(dataflowOptions.getGcsUtil().bucketExists(gcsPath),
-        errorMessage, path);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
-          e);
-    }
-  }
-
-  private GcsPath getGcsPath(String path) {
-    try {
-      return GcsPath.fromUri(path);
-    } catch (IllegalArgumentException e) {
-      throw new IllegalArgumentException(String.format(
-          "%s expected a valid 'gs://' path but was given '%s'",
-          dataflowOptions.getRunner().getSimpleName(), path), e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index d4d571b..7d89735 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -194,7 +194,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     settings.put("appName", "DataflowPipelineTranslatorTest");
     settings.put("project", "some-project");
     settings.put("pathValidatorClass",
-        "org.apache.beam.runners.dataflow.util.DataflowPathValidator");
+        "org.apache.beam.sdk.util.GcsPathValidator");
     settings.put("runner", "org.apache.beam.runners.dataflow.DataflowRunner");
     settings.put("jobName", "some-job-name");
     settings.put("tempLocation", "gs://somebucket/some/path");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
deleted file mode 100644
index a91f56c..0000000
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/DataflowPathValidatorTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.util;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.TestCredential;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Tests for {@link DataflowPathValidator}. */
-@RunWith(JUnit4.class)
-public class DataflowPathValidatorTest {
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-
-  @Mock private GcsUtil mockGcsUtil;
-  private DataflowPathValidator validator;
-
-  @Before
-  public void setUp() throws Exception {
-    MockitoAnnotations.initMocks(this);
-    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
-    when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
-    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
-    options.setGcpCredential(new TestCredential());
-    options.setRunner(DataflowRunner.class);
-    options.setGcsUtil(mockGcsUtil);
-    validator = new DataflowPathValidator(options);
-  }
-
-  @Test
-  public void testValidFilePattern() {
-    validator.validateInputFilePatternSupported("gs://bucket/path");
-  }
-
-  @Test
-  public void testInvalidFilePattern() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "DataflowRunner expected a valid 'gs://' path but was given '/local/path'");
-    validator.validateInputFilePatternSupported("/local/path");
-  }
-
-  @Test
-  public void testWhenBucketDoesNotExist() throws Exception {
-    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "Could not find file gs://non-existent-bucket/location");
-    validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
-  }
-
-  @Test
-  public void testValidOutputPrefix() {
-    validator.validateOutputFilePrefixSupported("gs://bucket/path");
-  }
-
-  @Test
-  public void testInvalidOutputPrefix() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "DataflowRunner expected a valid 'gs://' path but was given '/local/path'");
-    validator.validateOutputFilePrefixSupported("/local/path");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
new file mode 100644
index 0000000..87f9181
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+import java.io.IOException;
+
+/**
+ * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
+ */
+public class GcsPathValidator implements PathValidator {
+
+  private GcsOptions gcpOptions;
+
+  private GcsPathValidator(GcsOptions options) {
+    this.gcpOptions = options;
+  }
+
+  public static GcsPathValidator fromOptions(PipelineOptions options) {
+    return new GcsPathValidator(options.as(GcsOptions.class));
+  }
+
+  /**
+   * Validates the the input GCS path is accessible and that the path
+   * is well formed.
+   */
+  @Override
+  public String validateInputFilePatternSupported(String filepattern) {
+    GcsPath gcsPath = getGcsPath(filepattern);
+    checkArgument(gcpOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
+    String returnValue = verifyPath(filepattern);
+    verifyPathIsAccessible(filepattern, "Could not find file %s");
+    return returnValue;
+  }
+
+  /**
+   * Validates the the output GCS path is accessible and that the path
+   * is well formed.
+   */
+  @Override
+  public String validateOutputFilePrefixSupported(String filePrefix) {
+    String returnValue = verifyPath(filePrefix);
+    verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
+    return returnValue;
+  }
+
+  @Override
+  public String verifyPath(String path) {
+    GcsPath gcsPath = getGcsPath(path);
+    checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
+    checkArgument(!gcsPath.getObject().contains("//"),
+        "Dataflow Service does not allow objects with consecutive slashes");
+    return gcsPath.toResourceName();
+  }
+
+  private void verifyPathIsAccessible(String path, String errorMessage) {
+    GcsPath gcsPath = getGcsPath(path);
+    try {
+      checkArgument(gcpOptions.getGcsUtil().bucketExists(gcsPath),
+        errorMessage, path);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
+          e);
+    }
+  }
+
+  private GcsPath getGcsPath(String path) {
+    try {
+      return GcsPath.fromUri(path);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(String.format(
+          "%s expected a valid 'gs://' path but was given '%s'",
+          gcpOptions.getRunner().getSimpleName(), path), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/724c88d9/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
new file mode 100644
index 0000000..d101627
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.util;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.GcsPathValidator;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link GcsPathValidator}. */
+@RunWith(JUnit4.class)
+public class GcsPathValidatorTest {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @Mock private GcsUtil mockGcsUtil;
+  private GcsPathValidator validator;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
+    when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
+    GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+    options.setGcsUtil(mockGcsUtil);
+    validator = GcsPathValidator.fromOptions(options);
+  }
+
+  @Test
+  public void testValidFilePattern() {
+    validator.validateInputFilePatternSupported("gs://bucket/path");
+  }
+
+  @Test
+  public void testInvalidFilePattern() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "DirectRunner expected a valid 'gs://' path but was given '/local/path'");
+    validator.validateInputFilePatternSupported("/local/path");
+  }
+
+  @Test
+  public void testWhenBucketDoesNotExist() throws Exception {
+    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Could not find file gs://non-existent-bucket/location");
+    validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
+  }
+
+  @Test
+  public void testValidOutputPrefix() {
+    validator.validateOutputFilePrefixSupported("gs://bucket/path");
+  }
+
+  @Test
+  public void testInvalidOutputPrefix() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "DirectRunner expected a valid 'gs://' path but was given '/local/path'");
+    validator.validateOutputFilePrefixSupported("/local/path");
+  }
+}