You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/02 23:18:18 UTC
[1/2] beam git commit: [BEAM-59] Move GcsFileSystem to gcp-core
Repository: beam
Updated Branches:
refs/heads/master 027dd777d -> 5bfd3e049
[BEAM-59] Move GcsFileSystem to gcp-core
It is used by both runner and IO, so should be in core.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2a4ae2b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2a4ae2b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2a4ae2b
Branch: refs/heads/master
Commit: b2a4ae2b307b8c540ff8a40878521bf3d5e532ff
Parents: 027dd77
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 2 11:08:16 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 2 16:17:57 2017 -0700
----------------------------------------------------------------------
.../src/main/resources/beam/findbugs-filter.xml | 2 +-
.../extensions/gcp/storage/GcsFileSystem.java | 266 ++++++++++++++++++
.../gcp/storage/GcsFileSystemRegistrar.java | 43 +++
.../extensions/gcp/storage/GcsResourceId.java | 128 +++++++++
.../extensions/gcp/storage/package-info.java | 21 ++
.../gcp/storage/GcsFileSystemRegistrarTest.java | 52 ++++
.../gcp/storage/GcsFileSystemTest.java | 274 +++++++++++++++++++
.../gcp/storage/GcsResourceIdTest.java | 169 ++++++++++++
sdks/java/io/google-cloud-platform/pom.xml | 5 -
.../beam/sdk/io/gcp/storage/GcsFileSystem.java | 266 ------------------
.../io/gcp/storage/GcsFileSystemRegistrar.java | 43 ---
.../beam/sdk/io/gcp/storage/GcsResourceId.java | 128 ---------
.../beam/sdk/io/gcp/storage/package-info.java | 21 --
.../gcp/storage/GcsFileSystemRegistrarTest.java | 52 ----
.../sdk/io/gcp/storage/GcsFileSystemTest.java | 274 -------------------
.../sdk/io/gcp/storage/GcsResourceIdTest.java | 169 ------------
16 files changed, 954 insertions(+), 959 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index d1d8b4d..28bbc3c 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -203,7 +203,7 @@
</Match>
<Match>
- <Class name="org.apache.beam.sdk.io.gcp.storage.GcsResourceId"/>
+ <Class name="org.apache.beam.sdk.extensions.gcp.storage.GcsResourceId"/>
<Method name="getCurrentDirectory" />
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
<!--
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
new file mode 100644
index 0000000..69dd8fc
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link FileSystem} implementation for Google Cloud Storage.
+ */
+class GcsFileSystem extends FileSystem<GcsResourceId> {
+ private static final Logger LOG = LoggerFactory.getLogger(GcsFileSystem.class);
+
+ private final GcsOptions options;
+
+ GcsFileSystem(GcsOptions options) {
+ this.options = checkNotNull(options, "options");
+ }
+
+ @Override
+ protected List<MatchResult> match(List<String> specs) throws IOException {
+ List<GcsPath> gcsPaths = toGcsPaths(specs);
+
+ List<GcsPath> globs = Lists.newArrayList();
+ List<GcsPath> nonGlobs = Lists.newArrayList();
+ List<Boolean> isGlobBooleans = Lists.newArrayList();
+
+ for (GcsPath path : gcsPaths) {
+ if (GcsUtil.isGlob(path)) {
+ globs.add(path);
+ isGlobBooleans.add(true);
+ } else {
+ nonGlobs.add(path);
+ isGlobBooleans.add(false);
+ }
+ }
+
+ Iterator<MatchResult> globsMatchResults = matchGlobs(globs).iterator();
+ Iterator<MatchResult> nonGlobsMatchResults = matchNonGlobs(nonGlobs).iterator();
+
+ ImmutableList.Builder<MatchResult> ret = ImmutableList.builder();
+ for (Boolean isGlob : isGlobBooleans) {
+ if (isGlob) {
+ checkState(globsMatchResults.hasNext(), "Expect globsMatchResults has next.");
+ ret.add(globsMatchResults.next());
+ } else {
+ checkState(nonGlobsMatchResults.hasNext(), "Expect nonGlobsMatchResults has next.");
+ ret.add(nonGlobsMatchResults.next());
+ }
+ }
+ checkState(!globsMatchResults.hasNext(), "Expect no more elements in globsMatchResults.");
+ checkState(!nonGlobsMatchResults.hasNext(), "Expect no more elements in nonGlobsMatchResults.");
+ return ret.build();
+ }
+
+ @Override
+ protected WritableByteChannel create(GcsResourceId resourceId, CreateOptions createOptions)
+ throws IOException {
+ return options.getGcsUtil().create(resourceId.getGcsPath(), createOptions.mimeType());
+ }
+
+ @Override
+ protected ReadableByteChannel open(GcsResourceId resourceId) throws IOException {
+ return options.getGcsUtil().open(resourceId.getGcsPath());
+ }
+
+ @Override
+ protected void rename(
+ List<GcsResourceId> srcResourceIds,
+ List<GcsResourceId> destResourceIds) throws IOException {
+ copy(srcResourceIds, destResourceIds);
+ delete(srcResourceIds);
+ }
+
+ @Override
+ protected void delete(Collection<GcsResourceId> resourceIds) throws IOException {
+ options.getGcsUtil().remove(toFilenames(resourceIds));
+ }
+
+ @Override
+ protected GcsResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
+ if (isDirectory) {
+ if (!singleResourceSpec.endsWith("/")) {
+ singleResourceSpec += '/';
+ }
+ } else {
+ checkArgument(
+ !singleResourceSpec.endsWith("/"),
+ "Expected a file path, but [%s], ends with '/'. This is unsupported in GcsFileSystem.",
+ singleResourceSpec);
+ }
+ GcsPath path = GcsPath.fromUri(singleResourceSpec);
+ return GcsResourceId.fromGcsPath(path);
+ }
+
+ @Override
+ protected void copy(List<GcsResourceId> srcResourceIds, List<GcsResourceId> destResourceIds)
+ throws IOException {
+ options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds));
+ }
+
+ @Override
+ protected String getScheme() {
+ return "gs";
+ }
+
+ private List<MatchResult> matchGlobs(List<GcsPath> globs) {
+ // TODO: Executes in parallel, address https://issues.apache.org/jira/browse/BEAM-1503.
+ return FluentIterable.from(globs)
+ .transform(new Function<GcsPath, MatchResult>() {
+ @Override
+ public MatchResult apply(GcsPath gcsPath) {
+ try {
+ return expand(gcsPath);
+ } catch (IOException e) {
+ return MatchResult.create(Status.ERROR, e);
+ }
+ }})
+ .toList();
+ }
+
+ /**
+ * Expands a pattern into {@link MatchResult}.
+ *
+ * @throws IllegalArgumentException if {@code gcsPattern} does not contain globs.
+ */
+ @VisibleForTesting
+ MatchResult expand(GcsPath gcsPattern) throws IOException {
+ String prefix = GcsUtil.getGlobPrefix(gcsPattern.getObject());
+ Pattern p = Pattern.compile(GcsUtil.globToRegexp(gcsPattern.getObject()));
+
+ LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(),
+ prefix, p.toString());
+
+ String pageToken = null;
+ List<Metadata> results = new LinkedList<>();
+ do {
+ Objects objects = options.getGcsUtil().listObjects(gcsPattern.getBucket(), prefix, pageToken);
+ if (objects.getItems() == null) {
+ break;
+ }
+
+ // Filter objects based on the regex.
+ for (StorageObject o : objects.getItems()) {
+ String name = o.getName();
+ // Skip directories, which end with a slash.
+ if (p.matcher(name).matches() && !name.endsWith("/")) {
+ LOG.debug("Matched object: {}", name);
+ results.add(toMetadata(o));
+ }
+ }
+ pageToken = objects.getNextPageToken();
+ } while (pageToken != null);
+ return MatchResult.create(Status.OK, results);
+ }
+
+ /**
+ * Returns {@link MatchResult MatchResults} for the given {@link GcsPath GcsPaths}.
+ *
+ *<p>The number of returned {@link MatchResult MatchResults} equals to the number of given
+ * {@link GcsPath GcsPaths}. Each {@link MatchResult} contains one {@link Metadata}.
+ */
+ @VisibleForTesting
+ List<MatchResult> matchNonGlobs(List<GcsPath> gcsPaths) throws IOException {
+ List<StorageObjectOrIOException> results = options.getGcsUtil().getObjects(gcsPaths);
+
+ ImmutableList.Builder<MatchResult> ret = ImmutableList.builder();
+ for (StorageObjectOrIOException result : results) {
+ ret.add(toMatchResult(result));
+ }
+ return ret.build();
+ }
+
+ private MatchResult toMatchResult(StorageObjectOrIOException objectOrException) {
+ @Nullable IOException exception = objectOrException.ioException();
+ if (exception instanceof FileNotFoundException) {
+ return MatchResult.create(Status.NOT_FOUND, exception);
+ } else if (exception != null) {
+ return MatchResult.create(Status.ERROR, exception);
+ } else {
+ StorageObject object = objectOrException.storageObject();
+ assert object != null; // fix a warning; guaranteed by StorageObjectOrIOException semantics.
+ return MatchResult.create(Status.OK, ImmutableList.of(toMetadata(object)));
+ }
+ }
+
+ private Metadata toMetadata(StorageObject storageObject) {
+ // TODO: Address https://issues.apache.org/jira/browse/BEAM-1494
+ // It is incorrect to set IsReadSeekEfficient true for files with content encoding set to gzip.
+ Metadata.Builder ret = Metadata.builder()
+ .setIsReadSeekEfficient(true)
+ .setResourceId(GcsResourceId.fromGcsPath(GcsPath.fromObject(storageObject)));
+ BigInteger size = storageObject.getSize();
+ if (size != null) {
+ ret.setSizeBytes(size.longValue());
+ }
+ return ret.build();
+ }
+
+ private List<String> toFilenames(Collection<GcsResourceId> resources) {
+ return FluentIterable.from(resources)
+ .transform(
+ new Function<GcsResourceId, String>() {
+ @Override
+ public String apply(GcsResourceId resource) {
+ return resource.getGcsPath().toString();
+ }})
+ .toList();
+ }
+
+ private List<GcsPath> toGcsPaths(Collection<String> specs) {
+ return FluentIterable.from(specs)
+ .transform(new Function<String, GcsPath>() {
+ @Override
+ public GcsPath apply(String spec) {
+ return GcsPath.fromUri(spec);
+ }})
+ .toList();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
new file mode 100644
index 0000000..9f5980a
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.FileSystemRegistrar;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * {@link AutoService} registrar for the {@link GcsFileSystem}.
+ */
+@AutoService(FileSystemRegistrar.class)
+public class GcsFileSystemRegistrar implements FileSystemRegistrar {
+
+ @Override
+ public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
+ checkNotNull(
+ options,
+ "Expect the runner have called FileSystems.setDefaultConfigInWorkers().");
+ return ImmutableList.<FileSystem>of(new GcsFileSystem(options.as(GcsOptions.class)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceId.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceId.java
new file mode 100644
index 0000000..e53e5fa
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceId.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+/**
+ * {@link ResourceId} implementation for Google Cloud Storage.
+ */
+public class GcsResourceId implements ResourceId {
+
+ private final GcsPath gcsPath;
+
+ static GcsResourceId fromGcsPath(GcsPath gcsPath) {
+ checkNotNull(gcsPath, "gcsPath");
+ return new GcsResourceId(gcsPath);
+ }
+
+ private GcsResourceId(GcsPath gcsPath) {
+ this.gcsPath = gcsPath;
+ }
+
+ @Override
+ public GcsResourceId resolve(String other, ResolveOptions resolveOptions) {
+ checkState(
+ isDirectory(),
+ String.format("Expected the gcsPath is a directory, but had [%s].", gcsPath));
+ checkArgument(
+ resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)
+ || resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY),
+ String.format("ResolveOptions: [%s] is not supported.", resolveOptions));
+ if (resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)) {
+ checkArgument(
+ !other.endsWith("/"),
+ "The resolved file: [%s] should not end with '/'.", other);
+ return fromGcsPath(gcsPath.resolve(other));
+ } else {
+ // StandardResolveOptions.RESOLVE_DIRECTORY
+ if (other.endsWith("/")) {
+ // other already contains the delimiter for gcs.
+ // It is not recommended for callers to set the delimiter.
+ // However, we consider it as a valid input.
+ return fromGcsPath(gcsPath.resolve(other));
+ } else {
+ return fromGcsPath(gcsPath.resolve(other + "/"));
+ }
+ }
+ }
+
+ @Override
+ public GcsResourceId getCurrentDirectory() {
+ if (isDirectory()) {
+ return this;
+ } else {
+ GcsPath parent = gcsPath.getParent();
+ checkState(
+ parent != null,
+ String.format("Failed to get the current directory for path: [%s].", gcsPath));
+ return fromGcsPath(parent);
+ }
+ }
+
+ @Override
+ public boolean isDirectory() {
+ return gcsPath.endsWith("/");
+ }
+
+ @Override
+ public String getScheme() {
+ return "gs";
+ }
+
+ @Override
+ @Nullable public String getFilename() {
+ if (gcsPath.getNameCount() <= 1) {
+ return null;
+ } else {
+ GcsPath gcsFilename = gcsPath.getFileName();
+ return gcsFilename == null ? null : gcsFilename.toString();
+ }
+ }
+
+ GcsPath getGcsPath() {
+ return gcsPath;
+ }
+
+ @Override
+ public String toString() {
+ return gcsPath.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof GcsResourceId)) {
+ return false;
+ }
+ GcsResourceId other = (GcsResourceId) obj;
+ return this.gcsPath.equals(other.gcsPath);
+ }
+
+ @Override
+ public int hashCode() {
+ return gcsPath.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/package-info.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/package-info.java
new file mode 100644
index 0000000..ee8552f
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines IO connectors for Google Cloud Storage.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java
new file mode 100644
index 0000000..c9ce1e5
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.FileSystemRegistrar;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GcsFileSystemRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class GcsFileSystemRegistrarTest {
+
+ @Test
+ public void testServiceLoader() {
+ for (FileSystemRegistrar registrar
+ : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
+ if (registrar instanceof GcsFileSystemRegistrar) {
+ Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create());
+ assertThat(fileSystems, contains(instanceOf(GcsFileSystem.class)));
+ return;
+ }
+ }
+ fail("Expected to find " + GcsFileSystemRegistrar.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java
new file mode 100644
index 0000000..37ff9c8
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link GcsFileSystem}.
+ */
+@RunWith(JUnit4.class)
+public class GcsFileSystemTest {
+
+ @Rule
+ public transient ExpectedException thrown = ExpectedException.none();
+ @Mock
+ private GcsUtil mockGcsUtil;
+ private GcsOptions gcsOptions;
+ private GcsFileSystem gcsFileSystem;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ gcsOptions = PipelineOptionsFactory.as(GcsOptions.class);
+ gcsOptions.setGcsUtil(mockGcsUtil);
+ gcsFileSystem = new GcsFileSystem(gcsOptions);
+ }
+
+ @Test
+ public void testMatch() throws Exception {
+ Objects modelObjects = new Objects();
+ List<StorageObject> items = new ArrayList<>();
+ // A directory
+ items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));
+
+ // Files within the directory
+ items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */));
+ items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */));
+ items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */));
+ items.add(createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */));
+ items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 5L /* fileSize */));
+ items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 6L /* fileSize */));
+
+ modelObjects.setItems(items);
+ when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class)))
+ .thenReturn(modelObjects);
+
+ List<GcsPath> gcsPaths = ImmutableList.of(
+ GcsPath.fromUri("gs://testbucket/testdirectory/non-exist-file"),
+ GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
+
+ when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn(
+ ImmutableList.of(
+ StorageObjectOrIOException.create(new FileNotFoundException()),
+ StorageObjectOrIOException.create(
+ createStorageObject("gs://testbucket/testdirectory/otherfile", 4L))));
+
+ List<String> specs = ImmutableList.of(
+ "gs://testbucket/testdirectory/file[1-3]*",
+ "gs://testbucket/testdirectory/non-exist-file",
+ "gs://testbucket/testdirectory/otherfile");
+ List<MatchResult> matchResults = gcsFileSystem.match(specs);
+ assertEquals(3, matchResults.size());
+ assertEquals(Status.OK, matchResults.get(0).status());
+ assertThat(
+ ImmutableList.of(
+ "gs://testbucket/testdirectory/file1name",
+ "gs://testbucket/testdirectory/file2name",
+ "gs://testbucket/testdirectory/file3name"),
+ contains(toFilenames(matchResults.get(0)).toArray()));
+ assertEquals(Status.NOT_FOUND, matchResults.get(1).status());
+ assertEquals(Status.OK, matchResults.get(2).status());
+ assertThat(
+ ImmutableList.of("gs://testbucket/testdirectory/otherfile"),
+ contains(toFilenames(matchResults.get(2)).toArray()));
+
+ }
+
+ @Test
+ public void testGlobExpansion() throws IOException {
+ Objects modelObjects = new Objects();
+ List<StorageObject> items = new ArrayList<>();
+ // A directory
+ items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));
+
+ // Files within the directory
+ items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */));
+ items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */));
+ items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */));
+ items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 4L /* fileSize */));
+ items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 5L /* fileSize */));
+ items.add(createStorageObject(
+ "gs://testbucket/testotherdirectory/file4name", 6L /* fileSize */));
+
+ modelObjects.setItems(items);
+
+ when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class)))
+ .thenReturn(modelObjects);
+
+ // Test patterns.
+ {
+ GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*");
+ List<String> expectedFiles = ImmutableList.of(
+ "gs://testbucket/testdirectory/file1name",
+ "gs://testbucket/testdirectory/file2name",
+ "gs://testbucket/testdirectory/file3name");
+
+ assertThat(
+ expectedFiles,
+ contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
+ }
+
+ {
+ GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*");
+ List<String> expectedFiles = ImmutableList.of(
+ "gs://testbucket/testdirectory/file1name",
+ "gs://testbucket/testdirectory/file2name",
+ "gs://testbucket/testdirectory/file3name");
+
+ assertThat(
+ expectedFiles,
+ contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
+ }
+
+ {
+ GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*");
+ List<String> expectedFiles = ImmutableList.of(
+ "gs://testbucket/testdirectory/file1name",
+ "gs://testbucket/testdirectory/file2name",
+ "gs://testbucket/testdirectory/file3name");
+
+ assertThat(
+ expectedFiles,
+ contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
+ }
+
+ {
+ GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name");
+ List<String> expectedFiles = ImmutableList.of(
+ "gs://testbucket/testdirectory/file1name",
+ "gs://testbucket/testdirectory/file2name",
+ "gs://testbucket/testdirectory/file3name");
+
+ assertThat(
+ expectedFiles,
+ contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
+ }
+
+ {
+ GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name");
+ List<String> expectedFiles = ImmutableList.of(
+ "gs://testbucket/testdirectory/file1name",
+ "gs://testbucket/testdirectory/file2name",
+ "gs://testbucket/testdirectory/file3name",
+ "gs://testbucket/testotherdirectory/file4name");
+
+ assertThat(
+ expectedFiles,
+ contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
+ }
+ }
+
+ @Test
+ public void testExpandNonGlob() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Glob expression: [testdirectory/otherfile] is not expandable.");
+ gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
+ }
+
+ // Patterns that contain recursive wildcards ('**') are not supported.
+ @Test
+ public void testRecursiveGlobExpansionFails() throws IOException {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Unsupported wildcard usage");
+ gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/test**"));
+ }
+
+ @Test
+ public void testMatchNonGlobs() throws Exception {
+ List<StorageObjectOrIOException> items = new ArrayList<>();
+ // Files within the directory
+ items.add(StorageObjectOrIOException.create(
+ createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */)));
+ items.add(StorageObjectOrIOException.create(new FileNotFoundException()));
+ items.add(StorageObjectOrIOException.create(new IOException()));
+ items.add(StorageObjectOrIOException.create(
+ createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */)));
+
+ List<GcsPath> gcsPaths = ImmutableList.of(
+ GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
+ GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
+ GcsPath.fromUri("gs://testbucket/testdirectory/file3name"),
+ GcsPath.fromUri("gs://testbucket/testdirectory/file4name"));
+
+ when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn(items);
+ List<MatchResult> matchResults = gcsFileSystem.matchNonGlobs(gcsPaths);
+
+ assertEquals(4, matchResults.size());
+ assertThat(
+ ImmutableList.of("gs://testbucket/testdirectory/file1name"),
+ contains(toFilenames(matchResults.get(0)).toArray()));
+ assertEquals(Status.NOT_FOUND, matchResults.get(1).status());
+ assertEquals(Status.ERROR, matchResults.get(2).status());
+ assertThat(
+ ImmutableList.of("gs://testbucket/testdirectory/file4name"),
+ contains(toFilenames(matchResults.get(3)).toArray()));
+ }
+
+ private StorageObject createStorageObject(String gcsFilename, long fileSize) {
+ GcsPath gcsPath = GcsPath.fromUri(gcsFilename);
+ return new StorageObject()
+ .setBucket(gcsPath.getBucket())
+ .setName(gcsPath.getObject())
+ .setSize(BigInteger.valueOf(fileSize));
+ }
+
+ private List<String> toFilenames(MatchResult matchResult) throws IOException {
+ return FluentIterable
+ .from(matchResult.metadata())
+ .transform(new Function<Metadata, String>() {
+ @Override
+ public String apply(Metadata metadata) {
+ return ((GcsResourceId) metadata.resourceId()).getGcsPath().toString();
+ }})
+ .toList();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
new file mode 100644
index 0000000..b245610
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GcsResourceId}.
+ */
+@RunWith(JUnit4.class)
+public class GcsResourceIdTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testResolve() throws Exception {
+ // Tests for common gcs paths.
+ assertEquals(
+ toResourceIdentifier("gs://bucket/tmp/aa"),
+ toResourceIdentifier("gs://bucket/tmp/")
+ .resolve("aa", StandardResolveOptions.RESOLVE_FILE));
+ assertEquals(
+ toResourceIdentifier("gs://bucket/tmp/aa/bb/cc/"),
+ toResourceIdentifier("gs://bucket/tmp/")
+ .resolve("aa", StandardResolveOptions.RESOLVE_DIRECTORY)
+ .resolve("bb", StandardResolveOptions.RESOLVE_DIRECTORY)
+ .resolve("cc", StandardResolveOptions.RESOLVE_DIRECTORY));
+
+ // Tests absolute path.
+ assertEquals(
+ toResourceIdentifier("gs://bucket/tmp/aa"),
+ toResourceIdentifier("gs://bucket/tmp/bb/")
+ .resolve("gs://bucket/tmp/aa", StandardResolveOptions.RESOLVE_FILE));
+
+ // Tests bucket with no ending '/'.
+ assertEquals(
+ toResourceIdentifier("gs://my_bucket/tmp"),
+ toResourceIdentifier("gs://my_bucket")
+ .resolve("tmp", StandardResolveOptions.RESOLVE_FILE));
+
+ // Tests path with unicode
+ assertEquals(
+ toResourceIdentifier("gs://bucket/输出 目录/输出 文件01.txt"),
+ toResourceIdentifier("gs://bucket/输出 目录/")
+ .resolve("输出 文件01.txt", StandardResolveOptions.RESOLVE_FILE));
+ }
+
+ @Test
+ public void testResolveHandleBadInputs() throws Exception {
+ assertEquals(
+ toResourceIdentifier("gs://my_bucket/tmp/"),
+ toResourceIdentifier("gs://my_bucket/")
+ .resolve("tmp/", StandardResolveOptions.RESOLVE_DIRECTORY));
+ }
+
+ @Test
+ public void testResolveInvalidInputs() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("The resolved file: [tmp/] should not end with '/'.");
+ toResourceIdentifier("gs://my_bucket/").resolve("tmp/", StandardResolveOptions.RESOLVE_FILE);
+ }
+
+ @Test
+ public void testResolveInvalidNotDirectory() throws Exception {
+ ResourceId tmpDir = toResourceIdentifier("gs://my_bucket/")
+ .resolve("tmp dir", StandardResolveOptions.RESOLVE_FILE);
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("Expected the gcsPath is a directory, but had [gs://my_bucket/tmp dir].");
+ tmpDir.resolve("aa", StandardResolveOptions.RESOLVE_FILE);
+ }
+
+ @Test
+ public void testGetCurrentDirectory() throws Exception {
+ // Tests gcs paths.
+ assertEquals(
+ toResourceIdentifier("gs://my_bucket/tmp dir/"),
+ toResourceIdentifier("gs://my_bucket/tmp dir/").getCurrentDirectory());
+
+ // Tests path with unicode.
+ assertEquals(
+ toResourceIdentifier("gs://my_bucket/输出 目录/"),
+ toResourceIdentifier("gs://my_bucket/输出 目录/文件01.txt").getCurrentDirectory());
+
+ // Tests bucket with no ending '/'.
+ assertEquals(
+ toResourceIdentifier("gs://my_bucket/"),
+ toResourceIdentifier("gs://my_bucket").getCurrentDirectory());
+ }
+
+ @Test
+ public void testIsDirectory() throws Exception {
+ assertTrue(toResourceIdentifier("gs://my_bucket/tmp dir/").isDirectory());
+ assertTrue(toResourceIdentifier("gs://my_bucket/").isDirectory());
+ assertTrue(toResourceIdentifier("gs://my_bucket").isDirectory());
+
+ assertFalse(toResourceIdentifier("gs://my_bucket/file").isDirectory());
+ }
+
+ @Test
+ public void testInvalidGcsPath() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Invalid GCS URI: gs://");
+ toResourceIdentifier("gs://");
+ }
+
+ @Test
+ public void testGetScheme() throws Exception {
+ // Tests gcs paths.
+ assertEquals("gs", toResourceIdentifier("gs://my_bucket/tmp dir/").getScheme());
+
+ // Tests bucket with no ending '/'.
+ assertEquals("gs", toResourceIdentifier("gs://my_bucket").getScheme());
+ }
+
+ @Test
+ public void testEquals() throws Exception {
+ assertEquals(
+ toResourceIdentifier("gs://my_bucket/tmp/"),
+ toResourceIdentifier("gs://my_bucket/tmp/"));
+
+ assertNotEquals(
+ toResourceIdentifier("gs://my_bucket/tmp"),
+ toResourceIdentifier("gs://my_bucket/tmp/"));
+ }
+
+ @Test
+ public void testGetFilename() throws Exception {
+ assertEquals(toResourceIdentifier("gs://my_bucket/").getFilename(), null);
+ assertEquals(toResourceIdentifier("gs://my_bucket/abc").getFilename(),
+ "abc");
+ assertEquals(toResourceIdentifier("gs://my_bucket/abc/").getFilename(),
+ "abc");
+ assertEquals(toResourceIdentifier("gs://my_bucket/abc/xyz.txt").getFilename(),
+ "xyz.txt");
+ }
+
+ private GcsResourceId toResourceIdentifier(String str) throws Exception {
+ return GcsResourceId.fromGcsPath(GcsPath.fromUri(str));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 3bdc5d0..9051d98 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -88,11 +88,6 @@
<dependency>
<groupId>com.google.apis</groupId>
- <artifactId>google-api-services-storage</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
deleted file mode 100644
index 2663864..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.storage;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.api.services.storage.model.Objects;
-import com.google.api.services.storage.model.StorageObject;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.io.FileSystem;
-import org.apache.beam.sdk.io.fs.CreateOptions;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.MatchResult.Status;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link FileSystem} implementation for Google Cloud Storage.
- */
-class GcsFileSystem extends FileSystem<GcsResourceId> {
- private static final Logger LOG = LoggerFactory.getLogger(GcsFileSystem.class);
-
- private final GcsOptions options;
-
- GcsFileSystem(GcsOptions options) {
- this.options = checkNotNull(options, "options");
- }
-
- @Override
- protected List<MatchResult> match(List<String> specs) throws IOException {
- List<GcsPath> gcsPaths = toGcsPaths(specs);
-
- List<GcsPath> globs = Lists.newArrayList();
- List<GcsPath> nonGlobs = Lists.newArrayList();
- List<Boolean> isGlobBooleans = Lists.newArrayList();
-
- for (GcsPath path : gcsPaths) {
- if (GcsUtil.isGlob(path)) {
- globs.add(path);
- isGlobBooleans.add(true);
- } else {
- nonGlobs.add(path);
- isGlobBooleans.add(false);
- }
- }
-
- Iterator<MatchResult> globsMatchResults = matchGlobs(globs).iterator();
- Iterator<MatchResult> nonGlobsMatchResults = matchNonGlobs(nonGlobs).iterator();
-
- ImmutableList.Builder<MatchResult> ret = ImmutableList.builder();
- for (Boolean isGlob : isGlobBooleans) {
- if (isGlob) {
- checkState(globsMatchResults.hasNext(), "Expect globsMatchResults has next.");
- ret.add(globsMatchResults.next());
- } else {
- checkState(nonGlobsMatchResults.hasNext(), "Expect nonGlobsMatchResults has next.");
- ret.add(nonGlobsMatchResults.next());
- }
- }
- checkState(!globsMatchResults.hasNext(), "Expect no more elements in globsMatchResults.");
- checkState(!nonGlobsMatchResults.hasNext(), "Expect no more elements in nonGlobsMatchResults.");
- return ret.build();
- }
-
- @Override
- protected WritableByteChannel create(GcsResourceId resourceId, CreateOptions createOptions)
- throws IOException {
- return options.getGcsUtil().create(resourceId.getGcsPath(), createOptions.mimeType());
- }
-
- @Override
- protected ReadableByteChannel open(GcsResourceId resourceId) throws IOException {
- return options.getGcsUtil().open(resourceId.getGcsPath());
- }
-
- @Override
- protected void rename(
- List<GcsResourceId> srcResourceIds,
- List<GcsResourceId> destResourceIds) throws IOException {
- copy(srcResourceIds, destResourceIds);
- delete(srcResourceIds);
- }
-
- @Override
- protected void delete(Collection<GcsResourceId> resourceIds) throws IOException {
- options.getGcsUtil().remove(toFilenames(resourceIds));
- }
-
- @Override
- protected GcsResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
- if (isDirectory) {
- if (!singleResourceSpec.endsWith("/")) {
- singleResourceSpec += '/';
- }
- } else {
- checkArgument(
- !singleResourceSpec.endsWith("/"),
- "Expected a file path, but [%s], ends with '/'. This is unsupported in GcsFileSystem.",
- singleResourceSpec);
- }
- GcsPath path = GcsPath.fromUri(singleResourceSpec);
- return GcsResourceId.fromGcsPath(path);
- }
-
- @Override
- protected void copy(List<GcsResourceId> srcResourceIds, List<GcsResourceId> destResourceIds)
- throws IOException {
- options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds));
- }
-
- @Override
- protected String getScheme() {
- return "gs";
- }
-
- private List<MatchResult> matchGlobs(List<GcsPath> globs) {
- // TODO: Executes in parallel, address https://issues.apache.org/jira/browse/BEAM-1503.
- return FluentIterable.from(globs)
- .transform(new Function<GcsPath, MatchResult>() {
- @Override
- public MatchResult apply(GcsPath gcsPath) {
- try {
- return expand(gcsPath);
- } catch (IOException e) {
- return MatchResult.create(Status.ERROR, e);
- }
- }})
- .toList();
- }
-
- /**
- * Expands a pattern into {@link MatchResult}.
- *
- * @throws IllegalArgumentException if {@code gcsPattern} does not contain globs.
- */
- @VisibleForTesting
- MatchResult expand(GcsPath gcsPattern) throws IOException {
- String prefix = GcsUtil.getGlobPrefix(gcsPattern.getObject());
- Pattern p = Pattern.compile(GcsUtil.globToRegexp(gcsPattern.getObject()));
-
- LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(),
- prefix, p.toString());
-
- String pageToken = null;
- List<Metadata> results = new LinkedList<>();
- do {
- Objects objects = options.getGcsUtil().listObjects(gcsPattern.getBucket(), prefix, pageToken);
- if (objects.getItems() == null) {
- break;
- }
-
- // Filter objects based on the regex.
- for (StorageObject o : objects.getItems()) {
- String name = o.getName();
- // Skip directories, which end with a slash.
- if (p.matcher(name).matches() && !name.endsWith("/")) {
- LOG.debug("Matched object: {}", name);
- results.add(toMetadata(o));
- }
- }
- pageToken = objects.getNextPageToken();
- } while (pageToken != null);
- return MatchResult.create(Status.OK, results);
- }
-
- /**
- * Returns {@link MatchResult MatchResults} for the given {@link GcsPath GcsPaths}.
- *
- *<p>The number of returned {@link MatchResult MatchResults} equals to the number of given
- * {@link GcsPath GcsPaths}. Each {@link MatchResult} contains one {@link Metadata}.
- */
- @VisibleForTesting
- List<MatchResult> matchNonGlobs(List<GcsPath> gcsPaths) throws IOException {
- List<StorageObjectOrIOException> results = options.getGcsUtil().getObjects(gcsPaths);
-
- ImmutableList.Builder<MatchResult> ret = ImmutableList.builder();
- for (StorageObjectOrIOException result : results) {
- ret.add(toMatchResult(result));
- }
- return ret.build();
- }
-
- private MatchResult toMatchResult(StorageObjectOrIOException objectOrException) {
- @Nullable IOException exception = objectOrException.ioException();
- if (exception instanceof FileNotFoundException) {
- return MatchResult.create(Status.NOT_FOUND, exception);
- } else if (exception != null) {
- return MatchResult.create(Status.ERROR, exception);
- } else {
- StorageObject object = objectOrException.storageObject();
- assert object != null; // fix a warning; guaranteed by StorageObjectOrIOException semantics.
- return MatchResult.create(Status.OK, ImmutableList.of(toMetadata(object)));
- }
- }
-
- private Metadata toMetadata(StorageObject storageObject) {
- // TODO: Address https://issues.apache.org/jira/browse/BEAM-1494
- // It is incorrect to set IsReadSeekEfficient true for files with content encoding set to gzip.
- Metadata.Builder ret = Metadata.builder()
- .setIsReadSeekEfficient(true)
- .setResourceId(GcsResourceId.fromGcsPath(GcsPath.fromObject(storageObject)));
- BigInteger size = storageObject.getSize();
- if (size != null) {
- ret.setSizeBytes(size.longValue());
- }
- return ret.build();
- }
-
- private List<String> toFilenames(Collection<GcsResourceId> resources) {
- return FluentIterable.from(resources)
- .transform(
- new Function<GcsResourceId, String>() {
- @Override
- public String apply(GcsResourceId resource) {
- return resource.getGcsPath().toString();
- }})
- .toList();
- }
-
- private List<GcsPath> toGcsPaths(Collection<String> specs) {
- return FluentIterable.from(specs)
- .transform(new Function<String, GcsPath>() {
- @Override
- public GcsPath apply(String spec) {
- return GcsPath.fromUri(spec);
- }})
- .toList();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java
deleted file mode 100644
index 1d4e4ad..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.storage;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-import javax.annotation.Nonnull;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.io.FileSystem;
-import org.apache.beam.sdk.io.FileSystemRegistrar;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * {@link AutoService} registrar for the {@link GcsFileSystem}.
- */
-@AutoService(FileSystemRegistrar.class)
-public class GcsFileSystemRegistrar implements FileSystemRegistrar {
-
- @Override
- public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
- checkNotNull(
- options,
- "Expect the runner have called FileSystems.setDefaultConfigInWorkers().");
- return ImmutableList.<FileSystem>of(new GcsFileSystem(options.as(GcsOptions.class)));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
deleted file mode 100644
index 29215e7..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.storage;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.fs.ResolveOptions;
-import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-/**
- * {@link ResourceId} implementation for Google Cloud Storage.
- */
-public class GcsResourceId implements ResourceId {
-
- private final GcsPath gcsPath;
-
- static GcsResourceId fromGcsPath(GcsPath gcsPath) {
- checkNotNull(gcsPath, "gcsPath");
- return new GcsResourceId(gcsPath);
- }
-
- private GcsResourceId(GcsPath gcsPath) {
- this.gcsPath = gcsPath;
- }
-
- @Override
- public GcsResourceId resolve(String other, ResolveOptions resolveOptions) {
- checkState(
- isDirectory(),
- String.format("Expected the gcsPath is a directory, but had [%s].", gcsPath));
- checkArgument(
- resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)
- || resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY),
- String.format("ResolveOptions: [%s] is not supported.", resolveOptions));
- if (resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)) {
- checkArgument(
- !other.endsWith("/"),
- "The resolved file: [%s] should not end with '/'.", other);
- return fromGcsPath(gcsPath.resolve(other));
- } else {
- // StandardResolveOptions.RESOLVE_DIRECTORY
- if (other.endsWith("/")) {
- // other already contains the delimiter for gcs.
- // It is not recommended for callers to set the delimiter.
- // However, we consider it as a valid input.
- return fromGcsPath(gcsPath.resolve(other));
- } else {
- return fromGcsPath(gcsPath.resolve(other + "/"));
- }
- }
- }
-
- @Override
- public GcsResourceId getCurrentDirectory() {
- if (isDirectory()) {
- return this;
- } else {
- GcsPath parent = gcsPath.getParent();
- checkState(
- parent != null,
- String.format("Failed to get the current directory for path: [%s].", gcsPath));
- return fromGcsPath(parent);
- }
- }
-
- @Override
- public boolean isDirectory() {
- return gcsPath.endsWith("/");
- }
-
- @Override
- public String getScheme() {
- return "gs";
- }
-
- @Override
- @Nullable public String getFilename() {
- if (gcsPath.getNameCount() <= 1) {
- return null;
- } else {
- GcsPath gcsFilename = gcsPath.getFileName();
- return gcsFilename == null ? null : gcsFilename.toString();
- }
- }
-
- GcsPath getGcsPath() {
- return gcsPath;
- }
-
- @Override
- public String toString() {
- return gcsPath.toString();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof GcsResourceId)) {
- return false;
- }
- GcsResourceId other = (GcsResourceId) obj;
- return this.gcsPath.equals(other.gcsPath);
- }
-
- @Override
- public int hashCode() {
- return gcsPath.hashCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java
deleted file mode 100644
index b5378be..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines IO connectors for Google Cloud Storage.
- */
-package org.apache.beam.sdk.io.gcp.storage;
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java
deleted file mode 100644
index 2fc337a..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.storage;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.Lists;
-import java.util.ServiceLoader;
-import org.apache.beam.sdk.io.FileSystem;
-import org.apache.beam.sdk.io.FileSystemRegistrar;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link GcsFileSystemRegistrar}.
- */
-@RunWith(JUnit4.class)
-public class GcsFileSystemRegistrarTest {
-
- @Test
- public void testServiceLoader() {
- for (FileSystemRegistrar registrar
- : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
- if (registrar instanceof GcsFileSystemRegistrar) {
- Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create());
- assertThat(fileSystems, contains(instanceOf(GcsFileSystem.class)));
- return;
- }
- }
- fail("Expected to find " + GcsFileSystemRegistrar.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
deleted file mode 100644
index cc2e0c4..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.storage;
-
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
-import static org.mockito.Mockito.when;
-
-import com.google.api.services.storage.model.Objects;
-import com.google.api.services.storage.model.StorageObject;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.MatchResult.Status;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link GcsFileSystem}.
- */
-@RunWith(JUnit4.class)
-public class GcsFileSystemTest {
-
- @Rule
- public transient ExpectedException thrown = ExpectedException.none();
- @Mock
- private GcsUtil mockGcsUtil;
- private GcsOptions gcsOptions;
- private GcsFileSystem gcsFileSystem;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- gcsOptions = PipelineOptionsFactory.as(GcsOptions.class);
- gcsOptions.setGcsUtil(mockGcsUtil);
- gcsFileSystem = new GcsFileSystem(gcsOptions);
- }
-
- @Test
- public void testMatch() throws Exception {
- Objects modelObjects = new Objects();
- List<StorageObject> items = new ArrayList<>();
- // A directory
- items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));
-
- // Files within the directory
- items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */));
- items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */));
- items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */));
- items.add(createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */));
- items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 5L /* fileSize */));
- items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 6L /* fileSize */));
-
- modelObjects.setItems(items);
- when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class)))
- .thenReturn(modelObjects);
-
- List<GcsPath> gcsPaths = ImmutableList.of(
- GcsPath.fromUri("gs://testbucket/testdirectory/non-exist-file"),
- GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
-
- when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn(
- ImmutableList.of(
- StorageObjectOrIOException.create(new FileNotFoundException()),
- StorageObjectOrIOException.create(
- createStorageObject("gs://testbucket/testdirectory/otherfile", 4L))));
-
- List<String> specs = ImmutableList.of(
- "gs://testbucket/testdirectory/file[1-3]*",
- "gs://testbucket/testdirectory/non-exist-file",
- "gs://testbucket/testdirectory/otherfile");
- List<MatchResult> matchResults = gcsFileSystem.match(specs);
- assertEquals(3, matchResults.size());
- assertEquals(Status.OK, matchResults.get(0).status());
- assertThat(
- ImmutableList.of(
- "gs://testbucket/testdirectory/file1name",
- "gs://testbucket/testdirectory/file2name",
- "gs://testbucket/testdirectory/file3name"),
- contains(toFilenames(matchResults.get(0)).toArray()));
- assertEquals(Status.NOT_FOUND, matchResults.get(1).status());
- assertEquals(Status.OK, matchResults.get(2).status());
- assertThat(
- ImmutableList.of("gs://testbucket/testdirectory/otherfile"),
- contains(toFilenames(matchResults.get(2)).toArray()));
-
- }
-
- @Test
- public void testGlobExpansion() throws IOException {
- Objects modelObjects = new Objects();
- List<StorageObject> items = new ArrayList<>();
- // A directory
- items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));
-
- // Files within the directory
- items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */));
- items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */));
- items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */));
- items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 4L /* fileSize */));
- items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 5L /* fileSize */));
- items.add(createStorageObject(
- "gs://testbucket/testotherdirectory/file4name", 6L /* fileSize */));
-
- modelObjects.setItems(items);
-
- when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class)))
- .thenReturn(modelObjects);
-
- // Test patterns.
- {
- GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*");
- List<String> expectedFiles = ImmutableList.of(
- "gs://testbucket/testdirectory/file1name",
- "gs://testbucket/testdirectory/file2name",
- "gs://testbucket/testdirectory/file3name");
-
- assertThat(
- expectedFiles,
- contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
- }
-
- {
- GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*");
- List<String> expectedFiles = ImmutableList.of(
- "gs://testbucket/testdirectory/file1name",
- "gs://testbucket/testdirectory/file2name",
- "gs://testbucket/testdirectory/file3name");
-
- assertThat(
- expectedFiles,
- contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
- }
-
- {
- GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*");
- List<String> expectedFiles = ImmutableList.of(
- "gs://testbucket/testdirectory/file1name",
- "gs://testbucket/testdirectory/file2name",
- "gs://testbucket/testdirectory/file3name");
-
- assertThat(
- expectedFiles,
- contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
- }
-
- {
- GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name");
- List<String> expectedFiles = ImmutableList.of(
- "gs://testbucket/testdirectory/file1name",
- "gs://testbucket/testdirectory/file2name",
- "gs://testbucket/testdirectory/file3name");
-
- assertThat(
- expectedFiles,
- contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
- }
-
- {
- GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name");
- List<String> expectedFiles = ImmutableList.of(
- "gs://testbucket/testdirectory/file1name",
- "gs://testbucket/testdirectory/file2name",
- "gs://testbucket/testdirectory/file3name",
- "gs://testbucket/testotherdirectory/file4name");
-
- assertThat(
- expectedFiles,
- contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
- }
- }
-
- @Test
- public void testExpandNonGlob() throws Exception {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Glob expression: [testdirectory/otherfile] is not expandable.");
- gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
- }
-
- // Patterns that contain recursive wildcards ('**') are not supported.
- @Test
- public void testRecursiveGlobExpansionFails() throws IOException {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Unsupported wildcard usage");
- gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/test**"));
- }
-
- @Test
- public void testMatchNonGlobs() throws Exception {
- List<StorageObjectOrIOException> items = new ArrayList<>();
- // Files within the directory
- items.add(StorageObjectOrIOException.create(
- createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */)));
- items.add(StorageObjectOrIOException.create(new FileNotFoundException()));
- items.add(StorageObjectOrIOException.create(new IOException()));
- items.add(StorageObjectOrIOException.create(
- createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */)));
-
- List<GcsPath> gcsPaths = ImmutableList.of(
- GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
- GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
- GcsPath.fromUri("gs://testbucket/testdirectory/file3name"),
- GcsPath.fromUri("gs://testbucket/testdirectory/file4name"));
-
- when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn(items);
- List<MatchResult> matchResults = gcsFileSystem.matchNonGlobs(gcsPaths);
-
- assertEquals(4, matchResults.size());
- assertThat(
- ImmutableList.of("gs://testbucket/testdirectory/file1name"),
- contains(toFilenames(matchResults.get(0)).toArray()));
- assertEquals(Status.NOT_FOUND, matchResults.get(1).status());
- assertEquals(Status.ERROR, matchResults.get(2).status());
- assertThat(
- ImmutableList.of("gs://testbucket/testdirectory/file4name"),
- contains(toFilenames(matchResults.get(3)).toArray()));
- }
-
- private StorageObject createStorageObject(String gcsFilename, long fileSize) {
- GcsPath gcsPath = GcsPath.fromUri(gcsFilename);
- return new StorageObject()
- .setBucket(gcsPath.getBucket())
- .setName(gcsPath.getObject())
- .setSize(BigInteger.valueOf(fileSize));
- }
-
- private List<String> toFilenames(MatchResult matchResult) throws IOException {
- return FluentIterable
- .from(matchResult.metadata())
- .transform(new Function<Metadata, String>() {
- @Override
- public String apply(Metadata metadata) {
- return ((GcsResourceId) metadata.resourceId()).getGcsPath().toString();
- }})
- .toList();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java
deleted file mode 100644
index 702e754..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.storage;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link GcsResourceId}.
- */
-@RunWith(JUnit4.class)
-public class GcsResourceIdTest {
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void testResolve() throws Exception {
- // Tests for common gcs paths.
- assertEquals(
- toResourceIdentifier("gs://bucket/tmp/aa"),
- toResourceIdentifier("gs://bucket/tmp/")
- .resolve("aa", StandardResolveOptions.RESOLVE_FILE));
- assertEquals(
- toResourceIdentifier("gs://bucket/tmp/aa/bb/cc/"),
- toResourceIdentifier("gs://bucket/tmp/")
- .resolve("aa", StandardResolveOptions.RESOLVE_DIRECTORY)
- .resolve("bb", StandardResolveOptions.RESOLVE_DIRECTORY)
- .resolve("cc", StandardResolveOptions.RESOLVE_DIRECTORY));
-
- // Tests absolute path.
- assertEquals(
- toResourceIdentifier("gs://bucket/tmp/aa"),
- toResourceIdentifier("gs://bucket/tmp/bb/")
- .resolve("gs://bucket/tmp/aa", StandardResolveOptions.RESOLVE_FILE));
-
- // Tests bucket with no ending '/'.
- assertEquals(
- toResourceIdentifier("gs://my_bucket/tmp"),
- toResourceIdentifier("gs://my_bucket")
- .resolve("tmp", StandardResolveOptions.RESOLVE_FILE));
-
- // Tests path with unicode
- assertEquals(
- toResourceIdentifier("gs://bucket/输出 目录/输出 文件01.txt"),
- toResourceIdentifier("gs://bucket/输出 目录/")
- .resolve("输出 文件01.txt", StandardResolveOptions.RESOLVE_FILE));
- }
-
- @Test
- public void testResolveHandleBadInputs() throws Exception {
- assertEquals(
- toResourceIdentifier("gs://my_bucket/tmp/"),
- toResourceIdentifier("gs://my_bucket/")
- .resolve("tmp/", StandardResolveOptions.RESOLVE_DIRECTORY));
- }
-
- @Test
- public void testResolveInvalidInputs() throws Exception {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("The resolved file: [tmp/] should not end with '/'.");
- toResourceIdentifier("gs://my_bucket/").resolve("tmp/", StandardResolveOptions.RESOLVE_FILE);
- }
-
- @Test
- public void testResolveInvalidNotDirectory() throws Exception {
- ResourceId tmpDir = toResourceIdentifier("gs://my_bucket/")
- .resolve("tmp dir", StandardResolveOptions.RESOLVE_FILE);
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Expected the gcsPath is a directory, but had [gs://my_bucket/tmp dir].");
- tmpDir.resolve("aa", StandardResolveOptions.RESOLVE_FILE);
- }
-
- @Test
- public void testGetCurrentDirectory() throws Exception {
- // Tests gcs paths.
- assertEquals(
- toResourceIdentifier("gs://my_bucket/tmp dir/"),
- toResourceIdentifier("gs://my_bucket/tmp dir/").getCurrentDirectory());
-
- // Tests path with unicode.
- assertEquals(
- toResourceIdentifier("gs://my_bucket/输出 目录/"),
- toResourceIdentifier("gs://my_bucket/输出 目录/文件01.txt").getCurrentDirectory());
-
- // Tests bucket with no ending '/'.
- assertEquals(
- toResourceIdentifier("gs://my_bucket/"),
- toResourceIdentifier("gs://my_bucket").getCurrentDirectory());
- }
-
- @Test
- public void testIsDirectory() throws Exception {
- assertTrue(toResourceIdentifier("gs://my_bucket/tmp dir/").isDirectory());
- assertTrue(toResourceIdentifier("gs://my_bucket/").isDirectory());
- assertTrue(toResourceIdentifier("gs://my_bucket").isDirectory());
-
- assertFalse(toResourceIdentifier("gs://my_bucket/file").isDirectory());
- }
-
- @Test
- public void testInvalidGcsPath() throws Exception {
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Invalid GCS URI: gs://");
- toResourceIdentifier("gs://");
- }
-
- @Test
- public void testGetScheme() throws Exception {
- // Tests gcs paths.
- assertEquals("gs", toResourceIdentifier("gs://my_bucket/tmp dir/").getScheme());
-
- // Tests bucket with no ending '/'.
- assertEquals("gs", toResourceIdentifier("gs://my_bucket").getScheme());
- }
-
- @Test
- public void testEquals() throws Exception {
- assertEquals(
- toResourceIdentifier("gs://my_bucket/tmp/"),
- toResourceIdentifier("gs://my_bucket/tmp/"));
-
- assertNotEquals(
- toResourceIdentifier("gs://my_bucket/tmp"),
- toResourceIdentifier("gs://my_bucket/tmp/"));
- }
-
- @Test
- public void testGetFilename() throws Exception {
- assertEquals(toResourceIdentifier("gs://my_bucket/").getFilename(), null);
- assertEquals(toResourceIdentifier("gs://my_bucket/abc").getFilename(),
- "abc");
- assertEquals(toResourceIdentifier("gs://my_bucket/abc/").getFilename(),
- "abc");
- assertEquals(toResourceIdentifier("gs://my_bucket/abc/xyz.txt").getFilename(),
- "xyz.txt");
- }
-
- private GcsResourceId toResourceIdentifier(String str) throws Exception {
- return GcsResourceId.fromGcsPath(GcsPath.fromUri(str));
- }
-}
[2/2] beam git commit: This closes #2834
Posted by dh...@apache.org.
This closes #2834
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5bfd3e04
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5bfd3e04
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5bfd3e04
Branch: refs/heads/master
Commit: 5bfd3e049c0ca0744165b0243a645e8e427032d5
Parents: 027dd77 b2a4ae2
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 2 16:18:01 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 2 16:18:01 2017 -0700
----------------------------------------------------------------------
.../src/main/resources/beam/findbugs-filter.xml | 2 +-
.../extensions/gcp/storage/GcsFileSystem.java | 266 ++++++++++++++++++
.../gcp/storage/GcsFileSystemRegistrar.java | 43 +++
.../extensions/gcp/storage/GcsResourceId.java | 128 +++++++++
.../extensions/gcp/storage/package-info.java | 21 ++
.../gcp/storage/GcsFileSystemRegistrarTest.java | 52 ++++
.../gcp/storage/GcsFileSystemTest.java | 274 +++++++++++++++++++
.../gcp/storage/GcsResourceIdTest.java | 169 ++++++++++++
sdks/java/io/google-cloud-platform/pom.xml | 5 -
.../beam/sdk/io/gcp/storage/GcsFileSystem.java | 266 ------------------
.../io/gcp/storage/GcsFileSystemRegistrar.java | 43 ---
.../beam/sdk/io/gcp/storage/GcsResourceId.java | 128 ---------
.../beam/sdk/io/gcp/storage/package-info.java | 21 --
.../gcp/storage/GcsFileSystemRegistrarTest.java | 52 ----
.../sdk/io/gcp/storage/GcsFileSystemTest.java | 274 -------------------
.../sdk/io/gcp/storage/GcsResourceIdTest.java | 169 ------------
16 files changed, 954 insertions(+), 959 deletions(-)
----------------------------------------------------------------------