You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/09 18:03:51 UTC
[1/2] beam git commit: [BEAM-2211] Move PathValidator into GCP-Core
Repository: beam
Updated Branches:
refs/heads/master 5e4fd1b95 -> 28180c45b
[BEAM-2211] Move PathValidator into GCP-Core
For now, this does not need to be a Beam concept
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d165a73
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d165a73
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d165a73
Branch: refs/heads/master
Commit: 7d165a73070468ff3a8907bcd5c3a3c4972d79e5
Parents: 5e4fd1b
Author: Dan Halperin <dh...@google.com>
Authored: Mon May 8 16:01:40 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 9 11:03:38 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 2 +-
.../beam/sdk/io/fs/NoopPathValidator.java | 52 ---------
.../apache/beam/sdk/io/fs/PathValidator.java | 58 ----------
.../apache/beam/sdk/util/NoopPathValidator.java | 51 ---------
.../sdk/extensions/gcp/options/GcpOptions.java | 2 +-
.../sdk/extensions/gcp/options/GcsOptions.java | 4 +-
.../gcp/storage/GcsPathValidator.java | 107 +++++++++++++++++++
.../gcp/storage/NoopPathValidator.java | 53 +++++++++
.../extensions/gcp/storage/PathValidator.java | 59 ++++++++++
.../apache/beam/sdk/util/GcsPathValidator.java | 107 -------------------
.../apache/beam/sdk/util/NoopPathValidator.java | 51 +++++++++
.../gcp/storage/GcsPathValidatorTest.java | 107 +++++++++++++++++++
.../beam/sdk/util/GcsPathValidatorTest.java | 106 ------------------
14 files changed, 382 insertions(+), 379 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 250c064..2ef8737 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -83,11 +83,11 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.io.fs.PathValidator;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 9040f8f..93c1e5b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -77,6 +77,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -93,7 +94,6 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.GcsPathValidator;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/NoopPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/NoopPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/NoopPathValidator.java
deleted file mode 100644
index d5be8f0..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/NoopPathValidator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.fs;
-
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * <b>For internal use only; no backwards compatibility guarantees.</b>
- *
- * <p>Noop implementation of {@link PathValidator}. All paths are allowed and returned unchanged.
- */
-@Internal
-public class NoopPathValidator implements PathValidator {
-
- private NoopPathValidator() {
- }
-
- public static PathValidator fromOptions(
- @SuppressWarnings("unused") PipelineOptions options) {
- return new NoopPathValidator();
- }
-
- @Override
- public void validateInputFilePatternSupported(String filepattern) {}
-
- @Override
- public void validateOutputFilePrefixSupported(String filePrefix) {}
-
- @Override
- public void validateOutputResourceSupported(ResourceId resourceId) {}
-
- @Override
- public String verifyPath(String path) {
- return path;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/PathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/PathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/PathValidator.java
deleted file mode 100644
index b88a33e..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/PathValidator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.fs;
-
-import org.apache.beam.sdk.annotations.Internal;
-
-/**
- * <b>For internal use only; no backwards compatibility guarantees.</b>
- *
- * <p>Interface for controlling validation of paths.
- */
-@Internal
-public interface PathValidator {
- /**
- * Validate that a file pattern is conforming.
- *
- * @param filepattern The file pattern to verify.
- */
- void validateInputFilePatternSupported(String filepattern);
-
- /**
- * Validate that an output file prefix is conforming.
- *
- * @param filePrefix the file prefix to verify.
- */
- void validateOutputFilePrefixSupported(String filePrefix);
-
- /**
- * Validates that an output path is conforming.
- *
- * @param resourceId the file prefix to verify.
- */
- void validateOutputResourceSupported(ResourceId resourceId);
-
- /**
- * Validate that a path is a valid path and that the path
- * is accessible.
- *
- * @param path The path to verify.
- * @return The post-validation path.
- */
- String verifyPath(String path);
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
deleted file mode 100644
index 0015e59..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.io.fs.PathValidator;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * @deprecated use {@link org.apache.beam.sdk.io.fs.NoopPathValidator}.
- */
-@Deprecated
-public class NoopPathValidator implements PathValidator {
-
- private NoopPathValidator() {
- }
-
- public static PathValidator fromOptions(
- @SuppressWarnings("unused") PipelineOptions options) {
- return new NoopPathValidator();
- }
-
- @Override
- public void validateInputFilePatternSupported(String filepattern) {}
-
- @Override
- public void validateOutputFilePrefixSupported(String filePrefix) {}
-
- @Override
- public void validateOutputResourceSupported(ResourceId resourceId) {}
-
- @Override
- public String verifyPath(String path) {
- return path;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
index 985520f..1e25560 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -48,7 +48,7 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory;
import org.apache.beam.sdk.extensions.gcp.auth.GcpCredentialFactory;
import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
-import org.apache.beam.sdk.io.fs.PathValidator;
+import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
index 78e233e..7ac9b69 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcsOptions.java
@@ -26,14 +26,14 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.fs.PathValidator;
+import org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator;
+import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.GcsPathValidator;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.InstanceBuilder;
http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java
new file mode 100644
index 0000000..e7257b2
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidator.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+/**
+ * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
+ */
+public class GcsPathValidator implements PathValidator {
+
+ private GcsOptions gcpOptions;
+
+ private GcsPathValidator(GcsOptions options) {
+ this.gcpOptions = options;
+ }
+
+ public static GcsPathValidator fromOptions(PipelineOptions options) {
+ return new GcsPathValidator(options.as(GcsOptions.class));
+ }
+
+ /**
+ * Validates the the input GCS path is accessible and that the path
+ * is well formed.
+ */
+ @Override
+ public void validateInputFilePatternSupported(String filepattern) {
+ GcsPath gcsPath = getGcsPath(filepattern);
+ checkArgument(GcsUtil.isGcsPatternSupported(gcsPath.getObject()));
+ verifyPath(filepattern);
+ verifyPathIsAccessible(filepattern, "Could not find file %s");
+ }
+
+ /**
+ * Validates the the output GCS path is accessible and that the path
+ * is well formed.
+ */
+ @Override
+ public void validateOutputFilePrefixSupported(String filePrefix) {
+ verifyPath(filePrefix);
+ verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
+ }
+
+ @Override
+ public void validateOutputResourceSupported(ResourceId resourceId) {
+ checkArgument(
+ resourceId.getScheme().equals("gs"),
+ "Expected a valid 'gs://' path but was given: '%s'",
+ resourceId);
+ verifyPath(resourceId.toString());
+ }
+
+ @Override
+ public String verifyPath(String path) {
+ GcsPath gcsPath = getGcsPath(path);
+ checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
+ checkArgument(!gcsPath.getObject().isEmpty(),
+ "Missing object or bucket in path: '%s', did you mean: 'gs://some-bucket/%s'?",
+ gcsPath, gcsPath.getBucket());
+ checkArgument(!gcsPath.getObject().contains("//"),
+ "Dataflow Service does not allow objects with consecutive slashes");
+ return gcsPath.toResourceName();
+ }
+
+ private void verifyPathIsAccessible(String path, String errorMessage) {
+ GcsPath gcsPath = getGcsPath(path);
+ try {
+ checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath),
+ errorMessage, path);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
+ e);
+ }
+ }
+
+ private GcsPath getGcsPath(String path) {
+ try {
+ return GcsPath.fromUri(path);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(String.format(
+ "Expected a valid 'gs://' path but was given '%s'", path), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/NoopPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/NoopPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/NoopPathValidator.java
new file mode 100644
index 0000000..79b8732
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/NoopPathValidator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * <b>For internal use only; no backwards compatibility guarantees.</b>
+ *
+ * <p>Noop implementation of {@link PathValidator}. All paths are allowed and returned unchanged.
+ */
+@Internal
+public class NoopPathValidator implements PathValidator {
+
+ private NoopPathValidator() {
+ }
+
+ public static PathValidator fromOptions(
+ @SuppressWarnings("unused") PipelineOptions options) {
+ return new NoopPathValidator();
+ }
+
+ @Override
+ public void validateInputFilePatternSupported(String filepattern) {}
+
+ @Override
+ public void validateOutputFilePrefixSupported(String filePrefix) {}
+
+ @Override
+ public void validateOutputResourceSupported(ResourceId resourceId) {}
+
+ @Override
+ public String verifyPath(String path) {
+ return path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/PathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/PathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/PathValidator.java
new file mode 100644
index 0000000..cc769c8
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/PathValidator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.fs.ResourceId;
+
+/**
+ * <b>For internal use only; no backwards compatibility guarantees.</b>
+ *
+ * <p>Interface for controlling validation of paths.
+ */
+@Internal
+public interface PathValidator {
+ /**
+ * Validate that a file pattern is conforming.
+ *
+ * @param filepattern The file pattern to verify.
+ */
+ void validateInputFilePatternSupported(String filepattern);
+
+ /**
+ * Validate that an output file prefix is conforming.
+ *
+ * @param filePrefix the file prefix to verify.
+ */
+ void validateOutputFilePrefixSupported(String filePrefix);
+
+ /**
+ * Validates that an output path is conforming.
+ *
+ * @param resourceId the file prefix to verify.
+ */
+ void validateOutputResourceSupported(ResourceId resourceId);
+
+ /**
+ * Validate that a path is a valid path and that the path
+ * is accessible.
+ *
+ * @param path The path to verify.
+ * @return The post-validation path.
+ */
+ String verifyPath(String path);
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
deleted file mode 100644
index c4e557b..0000000
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.IOException;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.io.fs.PathValidator;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-/**
- * GCP implementation of {@link PathValidator}. Only GCS paths are allowed.
- */
-public class GcsPathValidator implements PathValidator {
-
- private GcsOptions gcpOptions;
-
- private GcsPathValidator(GcsOptions options) {
- this.gcpOptions = options;
- }
-
- public static GcsPathValidator fromOptions(PipelineOptions options) {
- return new GcsPathValidator(options.as(GcsOptions.class));
- }
-
- /**
- * Validates the the input GCS path is accessible and that the path
- * is well formed.
- */
- @Override
- public void validateInputFilePatternSupported(String filepattern) {
- GcsPath gcsPath = getGcsPath(filepattern);
- checkArgument(GcsUtil.isGcsPatternSupported(gcsPath.getObject()));
- verifyPath(filepattern);
- verifyPathIsAccessible(filepattern, "Could not find file %s");
- }
-
- /**
- * Validates the the output GCS path is accessible and that the path
- * is well formed.
- */
- @Override
- public void validateOutputFilePrefixSupported(String filePrefix) {
- verifyPath(filePrefix);
- verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
- }
-
- @Override
- public void validateOutputResourceSupported(ResourceId resourceId) {
- checkArgument(
- resourceId.getScheme().equals("gs"),
- "Expected a valid 'gs://' path but was given: '%s'",
- resourceId);
- verifyPath(resourceId.toString());
- }
-
- @Override
- public String verifyPath(String path) {
- GcsPath gcsPath = getGcsPath(path);
- checkArgument(gcsPath.isAbsolute(), "Must provide absolute paths for Dataflow");
- checkArgument(!gcsPath.getObject().isEmpty(),
- "Missing object or bucket in path: '%s', did you mean: 'gs://some-bucket/%s'?",
- gcsPath, gcsPath.getBucket());
- checkArgument(!gcsPath.getObject().contains("//"),
- "Dataflow Service does not allow objects with consecutive slashes");
- return gcsPath.toResourceName();
- }
-
- private void verifyPathIsAccessible(String path, String errorMessage) {
- GcsPath gcsPath = getGcsPath(path);
- try {
- checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath),
- errorMessage, path);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()),
- e);
- }
- }
-
- private GcsPath getGcsPath(String path) {
- try {
- return GcsPath.fromUri(path);
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(String.format(
- "Expected a valid 'gs://' path but was given '%s'", path), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
new file mode 100644
index 0000000..85b68b2
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * @deprecated use {@link org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator}.
+ */
+@Deprecated
+public class NoopPathValidator implements PathValidator {
+
+ private NoopPathValidator() {
+ }
+
+ public static PathValidator fromOptions(
+ @SuppressWarnings("unused") PipelineOptions options) {
+ return new NoopPathValidator();
+ }
+
+ @Override
+ public void validateInputFilePatternSupported(String filepattern) {}
+
+ @Override
+ public void validateOutputFilePrefixSupported(String filePrefix) {}
+
+ @Override
+ public void validateOutputResourceSupported(ResourceId resourceId) {}
+
+ @Override
+ public String verifyPath(String path) {
+ return path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java
new file mode 100644
index 0000000..91ac46c
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsPathValidatorTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link GcsPathValidator}. */
+@RunWith(JUnit4.class)
+public class GcsPathValidatorTest {
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ @Mock private GcsUtil mockGcsUtil;
+ private GcsPathValidator validator;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
+ GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+ options.setGcsUtil(mockGcsUtil);
+ validator = GcsPathValidator.fromOptions(options);
+ }
+
+ @Test
+ public void testValidFilePattern() {
+ validator.validateInputFilePatternSupported("gs://bucket/path");
+ }
+
+ @Test
+ public void testInvalidFilePattern() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Expected a valid 'gs://' path but was given '/local/path'");
+ validator.validateInputFilePatternSupported("/local/path");
+ }
+
+ @Test
+ public void testFilePatternMissingBucket() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Missing object or bucket in path: 'gs://input/', "
+ + "did you mean: 'gs://some-bucket/input'?");
+ validator.validateInputFilePatternSupported("gs://input");
+ }
+
+ @Test
+ public void testWhenBucketDoesNotExist() throws Exception {
+ when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Could not find file gs://non-existent-bucket/location");
+ validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
+ }
+
+ @Test
+ public void testValidOutputPrefix() {
+ validator.validateOutputFilePrefixSupported("gs://bucket/path");
+ }
+
+ @Test
+ public void testInvalidOutputPrefix() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Expected a valid 'gs://' path but was given '/local/path'");
+ validator.validateOutputFilePrefixSupported("/local/path");
+ }
+
+ @Test
+ public void testOutputPrefixMissingBucket() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Missing object or bucket in path: 'gs://output/', "
+ + "did you mean: 'gs://some-bucket/output'?");
+ validator.validateOutputFilePrefixSupported("gs://output");
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7d165a73/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
deleted file mode 100644
index 65fb228..0000000
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Tests for {@link GcsPathValidator}. */
-@RunWith(JUnit4.class)
-public class GcsPathValidatorTest {
- @Rule public ExpectedException expectedException = ExpectedException.none();
-
- @Mock private GcsUtil mockGcsUtil;
- private GcsPathValidator validator;
-
- @Before
- public void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
- when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
- GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
- options.setGcpCredential(new TestCredential());
- options.setGcsUtil(mockGcsUtil);
- validator = GcsPathValidator.fromOptions(options);
- }
-
- @Test
- public void testValidFilePattern() {
- validator.validateInputFilePatternSupported("gs://bucket/path");
- }
-
- @Test
- public void testInvalidFilePattern() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "Expected a valid 'gs://' path but was given '/local/path'");
- validator.validateInputFilePatternSupported("/local/path");
- }
-
- @Test
- public void testFilePatternMissingBucket() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "Missing object or bucket in path: 'gs://input/', "
- + "did you mean: 'gs://some-bucket/input'?");
- validator.validateInputFilePatternSupported("gs://input");
- }
-
- @Test
- public void testWhenBucketDoesNotExist() throws Exception {
- when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "Could not find file gs://non-existent-bucket/location");
- validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
- }
-
- @Test
- public void testValidOutputPrefix() {
- validator.validateOutputFilePrefixSupported("gs://bucket/path");
- }
-
- @Test
- public void testInvalidOutputPrefix() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "Expected a valid 'gs://' path but was given '/local/path'");
- validator.validateOutputFilePrefixSupported("/local/path");
- }
-
- @Test
- public void testOutputPrefixMissingBucket() {
- expectedException.expect(IllegalArgumentException.class);
- expectedException.expectMessage(
- "Missing object or bucket in path: 'gs://output/', "
- + "did you mean: 'gs://some-bucket/output'?");
- validator.validateOutputFilePrefixSupported("gs://output");
- }
-}
[2/2] beam git commit: This closes #2972
Posted by dh...@apache.org.
This closes #2972
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/28180c45
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/28180c45
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/28180c45
Branch: refs/heads/master
Commit: 28180c45b90c9e337857b1522d6f785df88d2f97
Parents: 5e4fd1b 7d165a7
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 9 11:03:42 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 9 11:03:42 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 2 +-
.../beam/sdk/io/fs/NoopPathValidator.java | 52 ---------
.../apache/beam/sdk/io/fs/PathValidator.java | 58 ----------
.../apache/beam/sdk/util/NoopPathValidator.java | 51 ---------
.../sdk/extensions/gcp/options/GcpOptions.java | 2 +-
.../sdk/extensions/gcp/options/GcsOptions.java | 4 +-
.../gcp/storage/GcsPathValidator.java | 107 +++++++++++++++++++
.../gcp/storage/NoopPathValidator.java | 53 +++++++++
.../extensions/gcp/storage/PathValidator.java | 59 ++++++++++
.../apache/beam/sdk/util/GcsPathValidator.java | 107 -------------------
.../apache/beam/sdk/util/NoopPathValidator.java | 51 +++++++++
.../gcp/storage/GcsPathValidatorTest.java | 107 +++++++++++++++++++
.../beam/sdk/util/GcsPathValidatorTest.java | 106 ------------------
14 files changed, 382 insertions(+), 379 deletions(-)
----------------------------------------------------------------------