You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/09 18:03:51 UTC

[1/2] beam git commit: [BEAM-2211] Move PathValidator into GCP-Core

Repository: beam
Updated Branches:
  refs/heads/master 5e4fd1b95 -> 28180c45b


[BEAM-2211] Move PathValidator into GCP-Core

For now, this does not need to be a Beam concept


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d165a73
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d165a73
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d165a73

Branch: refs/heads/master
Commit: 7d165a73070468ff3a8907bcd5c3a3c4972d79e5
Parents: 5e4fd1b
Author: Dan Halperin <dh...@google.com>
Authored: Mon May 8 16:01:40 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 9 11:03:38 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |   2 +-
 .../DataflowPipelineTranslatorTest.java         |   2 +-
 .../beam/sdk/io/fs/NoopPathValidator.java       |  52 ---------
 .../apache/beam/sdk/io/fs/PathValidator.java    |  58 ----------
 .../apache/beam/sdk/util/NoopPathValidator.java |  51 ---------
 .../sdk/extensions/gcp/options/GcpOptions.java  |   2 +-
 .../sdk/extensions/gcp/options/GcsOptions.java  |   4 +-
 .../gcp/storage/GcsPathValidator.java           | 107 +++++++++++++++++++
 .../gcp/storage/NoopPathValidator.java          |  53 +++++++++
 .../extensions/gcp/storage/PathValidator.java   |  59 ++++++++++
 .../apache/beam/sdk/util/GcsPathValidator.java  | 107 -------------------
 .../apache/beam/sdk/util/NoopPathValidator.java |  51 +++++++++
 .../gcp/storage/GcsPathValidatorTest.java       | 107 +++++++++++++++++++
 .../beam/sdk/util/GcsPathValidatorTest.java     | 106 ------------------
 14 files changed, 382 insertions(+), 379 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 250c064..2ef8737 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -83,11 +83,11 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.io.fs.PathValidator;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 9040f8f..93c1e5b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -77,6 +77,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -93,7 +94,6 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.GcsPathValidator;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/NoopPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/NoopPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/NoopPathValidator.java
deleted file mode 100644
index d5be8f0..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/NoopPathValidator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.fs;
-
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * <b>For internal use only; no backwards compatibility guarantees.</b>
- *
- * <p>Noop implementation of {@link PathValidator}. All paths are allowed and returned unchanged.
- */
-@Internal
-public class NoopPathValidator implements PathValidator {
-
-  private NoopPathValidator() {
-  }
-
-  public static PathValidator fromOptions(
-      @SuppressWarnings("unused") PipelineOptions options) {
-    return new NoopPathValidator();
-  }
-
-  @Override
-  public void validateInputFilePatternSupported(String filepattern) {}
-
-  @Override
-  public void validateOutputFilePrefixSupported(String filePrefix) {}
-
-  @Override
-  public void validateOutputResourceSupported(ResourceId resourceId) {}
-
-  @Override
-  public String verifyPath(String path) {
-    return path;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/PathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/PathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/PathValidator.java
deleted file mode 100644
index b88a33e..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/PathValidator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.fs;
-
-import org.apache.beam.sdk.annotations.Internal;
-
-/**
- * <b>For internal use only; no backwards compatibility guarantees.</b>
- *
- * <p>Interface for controlling validation of paths.
- */
-@Internal
-public interface PathValidator {
-  /**
-   * Validate that a file pattern is conforming.
-   *
-   * @param filepattern The file pattern to verify.
-   */
-  void validateInputFilePatternSupported(String filepattern);
-
-  /**
-   * Validate that an output file prefix is conforming.
-   *
-   * @param filePrefix the file prefix to verify.
-   */
-  void validateOutputFilePrefixSupported(String filePrefix);
-
-  /**
-   * Validates that an output path is conforming.
-   *
-   * @param resourceId the file prefix to verify.
-   */
-  void validateOutputResourceSupported(ResourceId resourceId);
-
-  /**
-   * Validate that a path is a valid path and that the path
-   * is accessible.
-   *
-   * @param path The path to verify.
-   * @return The post-validation path.
-   */
-  String verifyPath(String path);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
deleted file mode 100644
index 0015e59..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.io.fs.PathValidator;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * @deprecated use {@link org.apache.beam.sdk.io.fs.NoopPathValidator}.
- */
-@Deprecated
-public class NoopPathValidator implements PathValidator {
-
-  private NoopPathValidator() {
-  }
-
-  public static PathValidator fromOptions(
-      @SuppressWarnings("unused") PipelineOptions options) {
-    return new NoopPathValidator();
-  }
-
-  @Override
-  public void validateInputFilePatternSupported(String filepattern) {}
-
-  @Override
-  public void validateOutputFilePrefixSupported(String filePrefix) {}
-
-  @Override
-  public void validateOutputResourceSupported(ResourceId resourceId) {}
-
-  @Override
-  public String verifyPath(String path) {
-    return path;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
index 985520f..1e25560 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -48,7 +48,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory;
 import org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory;
 import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
-import org.apache.beam.sdk.io.fs.PathValidator;
+import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;

http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
index 78e233e..7ac9b69 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
@@ -26,14 +26,14 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.fs.PathValidator;
+import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator;
+import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.Hidden;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.GcsPathValidator;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.InstanceBuilder;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java
new file mode 100644
index 0000000..e7257b2
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+/**
+ * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
+ */
+public class GcsPathValidator implements PathValidator {
+
+  private GcsOptions gcpOptions;
+
+  private GcsPathValidator(GcsOptions options) {
+    this.gcpOptions = options;
+  }
+
+  public static GcsPathValidator fromOptions(PipelineOptions options) {
+    return new GcsPathValidator(options.as(GcsOptions.class));
+  }
+
+  /**
+   * Validates the the input GCS path is accessible and that the path
+   * is well formed.
+   */
+  @Override
+  public void validateInputFilePatternSupported(String filepattern) {
+    GcsPath gcsPath = getGcsPath(filepattern);
+    checkArgument(GcsUtil.isGcsPatternSupported(gcsPath.getObject()));
+    verifyPath(filepattern);
+    verifyPathIsAccessible(filepattern, "Could not find file %s");
+  }
+
+  /**
+   * Validates the the output GCS path is accessible and that the path
+   * is well formed.
+   */
+  @Override
+  public void validateOutputFilePrefixSupported(String filePrefix) {
+    verifyPath(filePrefix);
+    verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
+  }
+
+  @Override
+  public void validateOutputResourceSupported(ResourceId resourceId) {
+    checkArgument(
+        resourceId.getScheme().equals("gs"),
+        "Expected a valid 'gs://' path but was given: '%s'",
+        resourceId);
+    verifyPath(resourceId.toString());
+  }
+
+  @Override
+  public String verifyPath(String path) {
+    GcsPath gcsPath = getGcsPath(path);
+    checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
+    checkArgument(!gcsPath.getObject().isEmpty(),
+        "Missing object or bucket in path: '%s', did you mean: 'gs://some-bucket/%s'?",
+        gcsPath, gcsPath.getBucket());
+    checkArgument(!gcsPath.getObject().contains("//"),
+        "Dataflow Service does not allow objects with consecutive slashes");
+    return gcsPath.toResourceName();
+  }
+
+  private void verifyPathIsAccessible(String path, String errorMessage) {
+    GcsPath gcsPath = getGcsPath(path);
+    try {
+      checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath),
+        errorMessage, path);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
+          e);
+    }
+  }
+
+  private GcsPath getGcsPath(String path) {
+    try {
+      return GcsPath.fromUri(path);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(String.format(
+          "Expected a valid 'gs://' path but was given '%s'", path), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/NoopPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/NoopPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/NoopPathValidator.java
new file mode 100644
index 0000000..79b8732
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/NoopPathValidator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * <b>For internal use only; no backwards compatibility guarantees.</b>
+ *
+ * <p>Noop implementation of {@link PathValidator}. All paths are allowed and returned unchanged.
+ */
+@Internal
+public class NoopPathValidator implements PathValidator {
+
+  private NoopPathValidator() {
+  }
+
+  public static PathValidator fromOptions(
+      @SuppressWarnings("unused") PipelineOptions options) {
+    return new NoopPathValidator();
+  }
+
+  @Override
+  public void validateInputFilePatternSupported(String filepattern) {}
+
+  @Override
+  public void validateOutputFilePrefixSupported(String filePrefix) {}
+
+  @Override
+  public void validateOutputResourceSupported(ResourceId resourceId) {}
+
+  @Override
+  public String verifyPath(String path) {
+    return path;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/PathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/PathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/PathValidator.java
new file mode 100644
index 0000000..cc769c8
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/PathValidator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.fs.ResourceId;
+
+/**
+ * <b>For internal use only; no backwards compatibility guarantees.</b>
+ *
+ * <p>Interface for controlling validation of paths.
+ */
+@Internal
+public interface PathValidator {
+  /**
+   * Validate that a file pattern is conforming.
+   *
+   * @param filepattern The file pattern to verify.
+   */
+  void validateInputFilePatternSupported(String filepattern);
+
+  /**
+   * Validate that an output file prefix is conforming.
+   *
+   * @param filePrefix the file prefix to verify.
+   */
+  void validateOutputFilePrefixSupported(String filePrefix);
+
+  /**
+   * Validates that an output path is conforming.
+   *
+   * @param resourceId the file prefix to verify.
+   */
+  void validateOutputResourceSupported(ResourceId resourceId);
+
+  /**
+   * Validate that a path is a valid path and that the path
+   * is accessible.
+   *
+   * @param path The path to verify.
+   * @return The post-validation path.
+   */
+  String verifyPath(String path);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
deleted file mode 100644
index c4e557b..0000000
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.IOException;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.io.fs.PathValidator;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-/**
- * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
- */
-public class GcsPathValidator implements PathValidator {
-
-  private GcsOptions gcpOptions;
-
-  private GcsPathValidator(GcsOptions options) {
-    this.gcpOptions = options;
-  }
-
-  public static GcsPathValidator fromOptions(PipelineOptions options) {
-    return new GcsPathValidator(options.as(GcsOptions.class));
-  }
-
-  /**
-   * Validates the the input GCS path is accessible and that the path
-   * is well formed.
-   */
-  @Override
-  public void validateInputFilePatternSupported(String filepattern) {
-    GcsPath gcsPath = getGcsPath(filepattern);
-    checkArgument(GcsUtil.isGcsPatternSupported(gcsPath.getObject()));
-    verifyPath(filepattern);
-    verifyPathIsAccessible(filepattern, "Could not find file %s");
-  }
-
-  /**
-   * Validates the the output GCS path is accessible and that the path
-   * is well formed.
-   */
-  @Override
-  public void validateOutputFilePrefixSupported(String filePrefix) {
-    verifyPath(filePrefix);
-    verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
-  }
-
-  @Override
-  public void validateOutputResourceSupported(ResourceId resourceId) {
-    checkArgument(
-        resourceId.getScheme().equals("gs"),
-        "Expected a valid 'gs://' path but was given: '%s'",
-        resourceId);
-    verifyPath(resourceId.toString());
-  }
-
-  @Override
-  public String verifyPath(String path) {
-    GcsPath gcsPath = getGcsPath(path);
-    checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
-    checkArgument(!gcsPath.getObject().isEmpty(),
-        "Missing object or bucket in path: '%s', did you mean: 'gs://some-bucket/%s'?",
-        gcsPath, gcsPath.getBucket());
-    checkArgument(!gcsPath.getObject().contains("//"),
-        "Dataflow Service does not allow objects with consecutive slashes");
-    return gcsPath.toResourceName();
-  }
-
-  private void verifyPathIsAccessible(String path, String errorMessage) {
-    GcsPath gcsPath = getGcsPath(path);
-    try {
-      checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath),
-        errorMessage, path);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
-          e);
-    }
-  }
-
-  private GcsPath getGcsPath(String path) {
-    try {
-      return GcsPath.fromUri(path);
-    } catch (IllegalArgumentException e) {
-      throw new IllegalArgumentException(String.format(
-          "Expected a valid 'gs://' path but was given '%s'", path), e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
new file mode 100644
index 0000000..85b68b2
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * @deprecated use {@link org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator}.
+ */
+@Deprecated
+public class NoopPathValidator implements PathValidator {
+
+  private NoopPathValidator() {
+  }
+
+  public static PathValidator fromOptions(
+      @SuppressWarnings("unused") PipelineOptions options) {
+    return new NoopPathValidator();
+  }
+
+  @Override
+  public void validateInputFilePatternSupported(String filepattern) {}
+
+  @Override
+  public void validateOutputFilePrefixSupported(String filePrefix) {}
+
+  @Override
+  public void validateOutputResourceSupported(ResourceId resourceId) {}
+
+  @Override
+  public String verifyPath(String path) {
+    return path;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java
new file mode 100644
index 0000000..91ac46c
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link GcsPathValidator}. */
+@RunWith(JUnit4.class)
+public class GcsPathValidatorTest {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @Mock private GcsUtil mockGcsUtil;
+  private GcsPathValidator validator;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
+    GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+    options.setGcsUtil(mockGcsUtil);
+    validator = GcsPathValidator.fromOptions(options);
+  }
+
+  @Test
+  public void testValidFilePattern() {
+    validator.validateInputFilePatternSupported("gs://bucket/path");
+  }
+
+  @Test
+  public void testInvalidFilePattern() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Expected a valid 'gs://' path but was given '/local/path'");
+    validator.validateInputFilePatternSupported("/local/path");
+  }
+
+  @Test
+  public void testFilePatternMissingBucket() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Missing object or bucket in path: 'gs://input/', "
+            + "did you mean: 'gs://some-bucket/input'?");
+    validator.validateInputFilePatternSupported("gs://input");
+  }
+
+  @Test
+  public void testWhenBucketDoesNotExist() throws Exception {
+    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Could not find file gs://non-existent-bucket/location");
+    validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
+  }
+
+  @Test
+  public void testValidOutputPrefix() {
+    validator.validateOutputFilePrefixSupported("gs://bucket/path");
+  }
+
+  @Test
+  public void testInvalidOutputPrefix() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Expected a valid 'gs://' path but was given '/local/path'");
+    validator.validateOutputFilePrefixSupported("/local/path");
+  }
+
+  @Test
+  public void testOutputPrefixMissingBucket() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Missing object or bucket in path: 'gs://output/', "
+            + "did you mean: 'gs://some-bucket/output'?");
+    validator.validateOutputFilePrefixSupported("gs://output");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
deleted file mode 100644
index 65fb228..0000000
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Tests for {@link GcsPathValidator}. */
-@RunWith(JUnit4.class)
-public class GcsPathValidatorTest {
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-
-  @Mock private GcsUtil mockGcsUtil;
-  private GcsPathValidator validator;
-
-  @Before
-  public void setUp() throws Exception {
-    MockitoAnnotations.initMocks(this);
-    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
-    GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
-    options.setGcpCredential(new TestCredential());
-    options.setGcsUtil(mockGcsUtil);
-    validator = GcsPathValidator.fromOptions(options);
-  }
-
-  @Test
-  public void testValidFilePattern() {
-    validator.validateInputFilePatternSupported("gs://bucket/path");
-  }
-
-  @Test
-  public void testInvalidFilePattern() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "Expected a valid 'gs://' path but was given '/local/path'");
-    validator.validateInputFilePatternSupported("/local/path");
-  }
-
-  @Test
-  public void testFilePatternMissingBucket() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "Missing object or bucket in path: 'gs://input/', "
-            + "did you mean: 'gs://some-bucket/input'?");
-    validator.validateInputFilePatternSupported("gs://input");
-  }
-
-  @Test
-  public void testWhenBucketDoesNotExist() throws Exception {
-    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "Could not find file gs://non-existent-bucket/location");
-    validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
-  }
-
-  @Test
-  public void testValidOutputPrefix() {
-    validator.validateOutputFilePrefixSupported("gs://bucket/path");
-  }
-
-  @Test
-  public void testInvalidOutputPrefix() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "Expected a valid 'gs://' path but was given '/local/path'");
-    validator.validateOutputFilePrefixSupported("/local/path");
-  }
-
-  @Test
-  public void testOutputPrefixMissingBucket() {
-    expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage(
-        "Missing object or bucket in path: 'gs://output/', "
-            + "did you mean: 'gs://some-bucket/output'?");
-    validator.validateOutputFilePrefixSupported("gs://output");
-  }
-}


[2/2] beam git commit: This closes #2972

Posted by dh...@apache.org.
This closes #2972


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/28180c45
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/28180c45
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/28180c45

Branch: refs/heads/master
Commit: 28180c45b90c9e337857b1522d6f785df88d2f97
Parents: 5e4fd1b 7d165a7
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 9 11:03:42 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 9 11:03:42 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |   2 +-
 .../DataflowPipelineTranslatorTest.java         |   2 +-
 .../beam/sdk/io/fs/NoopPathValidator.java       |  52 ---------
 .../apache/beam/sdk/io/fs/PathValidator.java    |  58 ----------
 .../apache/beam/sdk/util/NoopPathValidator.java |  51 ---------
 .../sdk/extensions/gcp/options/GcpOptions.java  |   2 +-
 .../sdk/extensions/gcp/options/GcsOptions.java  |   4 +-
 .../gcp/storage/GcsPathValidator.java           | 107 +++++++++++++++++++
 .../gcp/storage/NoopPathValidator.java          |  53 +++++++++
 .../extensions/gcp/storage/PathValidator.java   |  59 ++++++++++
 .../apache/beam/sdk/util/GcsPathValidator.java  | 107 -------------------
 .../apache/beam/sdk/util/NoopPathValidator.java |  51 +++++++++
 .../gcp/storage/GcsPathValidatorTest.java       | 107 +++++++++++++++++++
 .../beam/sdk/util/GcsPathValidatorTest.java     | 106 ------------------
 14 files changed, 382 insertions(+), 379 deletions(-)
----------------------------------------------------------------------