You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/04 19:53:54 UTC

[29/50] [abbrv] beam git commit: [BEAM-59] Move GcsFileSystem to gcp-core

[BEAM-59] Move GcsFileSystem to gcp-core

It is used by both runner and IO, so should be in core.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2a4ae2b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2a4ae2b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2a4ae2b

Branch: refs/heads/gearpump-runner
Commit: b2a4ae2b307b8c540ff8a40878521bf3d5e532ff
Parents: 027dd77
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 2 11:08:16 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 2 16:17:57 2017 -0700

----------------------------------------------------------------------
 .../src/main/resources/beam/findbugs-filter.xml |   2 +-
 .../extensions/gcp/storage/GcsFileSystem.java   | 266 ++++++++++++++++++
 .../gcp/storage/GcsFileSystemRegistrar.java     |  43 +++
 .../extensions/gcp/storage/GcsResourceId.java   | 128 +++++++++
 .../extensions/gcp/storage/package-info.java    |  21 ++
 .../gcp/storage/GcsFileSystemRegistrarTest.java |  52 ++++
 .../gcp/storage/GcsFileSystemTest.java          | 274 +++++++++++++++++++
 .../gcp/storage/GcsResourceIdTest.java          | 169 ++++++++++++
 sdks/java/io/google-cloud-platform/pom.xml      |   5 -
 .../beam/sdk/io/gcp/storage/GcsFileSystem.java  | 266 ------------------
 .../io/gcp/storage/GcsFileSystemRegistrar.java  |  43 ---
 .../beam/sdk/io/gcp/storage/GcsResourceId.java  | 128 ---------
 .../beam/sdk/io/gcp/storage/package-info.java   |  21 --
 .../gcp/storage/GcsFileSystemRegistrarTest.java |  52 ----
 .../sdk/io/gcp/storage/GcsFileSystemTest.java   | 274 -------------------
 .../sdk/io/gcp/storage/GcsResourceIdTest.java   | 169 ------------
 16 files changed, 954 insertions(+), 959 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index d1d8b4d..28bbc3c 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -203,7 +203,7 @@
   </Match>
 
   <Match>
-    <Class name="org.apache.beam.sdk.io.gcp.storage.GcsResourceId"/>
+    <Class name="org.apache.beam.sdk.extensions.gcp.storage.GcsResourceId"/>
     <Method name="getCurrentDirectory" />
     <Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
     <!--

http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
new file mode 100644
index 0000000..69dd8fc
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link FileSystem} implementation for Google Cloud Storage.
+ */
+class GcsFileSystem extends FileSystem<GcsResourceId> {
+  private static final Logger LOG = LoggerFactory.getLogger(GcsFileSystem.class);
+
+  private final GcsOptions options;
+
+  GcsFileSystem(GcsOptions options) {
+    this.options = checkNotNull(options, "options");
+  }
+
+  @Override
+  protected List<MatchResult> match(List<String> specs) throws IOException {
+    List<GcsPath> gcsPaths = toGcsPaths(specs);
+
+    List<GcsPath> globs = Lists.newArrayList();
+    List<GcsPath> nonGlobs = Lists.newArrayList();
+    List<Boolean> isGlobBooleans = Lists.newArrayList();
+
+    for (GcsPath path : gcsPaths) {
+      if (GcsUtil.isGlob(path)) {
+        globs.add(path);
+        isGlobBooleans.add(true);
+      } else {
+        nonGlobs.add(path);
+        isGlobBooleans.add(false);
+      }
+    }
+
+    Iterator<MatchResult> globsMatchResults = matchGlobs(globs).iterator();
+    Iterator<MatchResult> nonGlobsMatchResults = matchNonGlobs(nonGlobs).iterator();
+
+    ImmutableList.Builder<MatchResult> ret = ImmutableList.builder();
+    for (Boolean isGlob : isGlobBooleans) {
+      if (isGlob) {
+        checkState(globsMatchResults.hasNext(), "Expect globsMatchResults has next.");
+        ret.add(globsMatchResults.next());
+      } else {
+        checkState(nonGlobsMatchResults.hasNext(), "Expect nonGlobsMatchResults has next.");
+        ret.add(nonGlobsMatchResults.next());
+      }
+    }
+    checkState(!globsMatchResults.hasNext(), "Expect no more elements in globsMatchResults.");
+    checkState(!nonGlobsMatchResults.hasNext(), "Expect no more elements in nonGlobsMatchResults.");
+    return ret.build();
+  }
+
+  @Override
+  protected WritableByteChannel create(GcsResourceId resourceId, CreateOptions createOptions)
+      throws IOException {
+    return options.getGcsUtil().create(resourceId.getGcsPath(), createOptions.mimeType());
+  }
+
+  @Override
+  protected ReadableByteChannel open(GcsResourceId resourceId) throws IOException {
+    return options.getGcsUtil().open(resourceId.getGcsPath());
+  }
+
+  @Override
+  protected void rename(
+      List<GcsResourceId> srcResourceIds,
+      List<GcsResourceId> destResourceIds) throws IOException {
+    copy(srcResourceIds, destResourceIds);
+    delete(srcResourceIds);
+  }
+
+  @Override
+  protected void delete(Collection<GcsResourceId> resourceIds) throws IOException {
+    options.getGcsUtil().remove(toFilenames(resourceIds));
+  }
+
+  @Override
+  protected GcsResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
+    if (isDirectory) {
+      if (!singleResourceSpec.endsWith("/")) {
+        singleResourceSpec += '/';
+      }
+    } else {
+      checkArgument(
+          !singleResourceSpec.endsWith("/"),
+          "Expected a file path, but [%s], ends with '/'. This is unsupported in GcsFileSystem.",
+          singleResourceSpec);
+    }
+    GcsPath path = GcsPath.fromUri(singleResourceSpec);
+    return GcsResourceId.fromGcsPath(path);
+  }
+
+  @Override
+  protected void copy(List<GcsResourceId> srcResourceIds, List<GcsResourceId> destResourceIds)
+      throws IOException {
+    options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds));
+  }
+
+  @Override
+  protected String getScheme() {
+    return "gs";
+  }
+
+  private List<MatchResult> matchGlobs(List<GcsPath> globs) {
+    // TODO: Executes in parallel, address https://issues.apache.org/jira/browse/BEAM-1503.
+    return FluentIterable.from(globs)
+        .transform(new Function<GcsPath, MatchResult>() {
+          @Override
+          public MatchResult apply(GcsPath gcsPath) {
+            try {
+              return expand(gcsPath);
+            } catch (IOException e) {
+              return MatchResult.create(Status.ERROR, e);
+            }
+          }})
+        .toList();
+  }
+
+  /**
+   * Expands a pattern into {@link MatchResult}.
+   *
+   * @throws IllegalArgumentException if {@code gcsPattern} does not contain globs.
+   */
+  @VisibleForTesting
+  MatchResult expand(GcsPath gcsPattern) throws IOException {
+    String prefix = GcsUtil.getGlobPrefix(gcsPattern.getObject());
+    Pattern p = Pattern.compile(GcsUtil.globToRegexp(gcsPattern.getObject()));
+
+    LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(),
+        prefix, p.toString());
+
+    String pageToken = null;
+    List<Metadata> results = new LinkedList<>();
+    do {
+      Objects objects = options.getGcsUtil().listObjects(gcsPattern.getBucket(), prefix, pageToken);
+      if (objects.getItems() == null) {
+        break;
+      }
+
+      // Filter objects based on the regex.
+      for (StorageObject o : objects.getItems()) {
+        String name = o.getName();
+        // Skip directories, which end with a slash.
+        if (p.matcher(name).matches() && !name.endsWith("/")) {
+          LOG.debug("Matched object: {}", name);
+          results.add(toMetadata(o));
+        }
+      }
+      pageToken = objects.getNextPageToken();
+    } while (pageToken != null);
+    return MatchResult.create(Status.OK, results);
+  }
+
+  /**
+   * Returns {@link MatchResult MatchResults} for the given {@link GcsPath GcsPaths}.
+   *
+   *<p>The number of returned {@link MatchResult MatchResults} equals to the number of given
+   * {@link GcsPath GcsPaths}. Each {@link MatchResult} contains one {@link Metadata}.
+   */
+  @VisibleForTesting
+  List<MatchResult> matchNonGlobs(List<GcsPath> gcsPaths) throws IOException {
+    List<StorageObjectOrIOException> results = options.getGcsUtil().getObjects(gcsPaths);
+
+    ImmutableList.Builder<MatchResult> ret = ImmutableList.builder();
+    for (StorageObjectOrIOException result : results) {
+      ret.add(toMatchResult(result));
+    }
+    return ret.build();
+  }
+
+  private MatchResult toMatchResult(StorageObjectOrIOException objectOrException) {
+    @Nullable IOException exception = objectOrException.ioException();
+    if (exception instanceof FileNotFoundException) {
+      return MatchResult.create(Status.NOT_FOUND, exception);
+    } else if (exception != null) {
+      return MatchResult.create(Status.ERROR, exception);
+    } else {
+      StorageObject object = objectOrException.storageObject();
+      assert object != null; // fix a warning; guaranteed by StorageObjectOrIOException semantics.
+      return MatchResult.create(Status.OK, ImmutableList.of(toMetadata(object)));
+    }
+  }
+
+  private Metadata toMetadata(StorageObject storageObject) {
+    // TODO: Address https://issues.apache.org/jira/browse/BEAM-1494
+    // It is incorrect to set IsReadSeekEfficient true for files with content encoding set to gzip.
+    Metadata.Builder ret = Metadata.builder()
+        .setIsReadSeekEfficient(true)
+        .setResourceId(GcsResourceId.fromGcsPath(GcsPath.fromObject(storageObject)));
+    BigInteger size = storageObject.getSize();
+    if (size != null) {
+      ret.setSizeBytes(size.longValue());
+    }
+    return ret.build();
+  }
+
+  private List<String> toFilenames(Collection<GcsResourceId> resources) {
+    return FluentIterable.from(resources)
+        .transform(
+            new Function<GcsResourceId, String>() {
+              @Override
+              public String apply(GcsResourceId resource) {
+                return resource.getGcsPath().toString();
+              }})
+        .toList();
+  }
+
+  private List<GcsPath> toGcsPaths(Collection<String> specs) {
+    return FluentIterable.from(specs)
+        .transform(new Function<String, GcsPath>() {
+          @Override
+          public GcsPath apply(String spec) {
+            return GcsPath.fromUri(spec);
+          }})
+        .toList();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
new file mode 100644
index 0000000..9f5980a
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrar.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.FileSystemRegistrar;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * {@link AutoService} registrar for the {@link GcsFileSystem}.
+ */
+@AutoService(FileSystemRegistrar.class)
+public class GcsFileSystemRegistrar implements FileSystemRegistrar {
+
+  @Override
+  public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
+    checkNotNull(
+        options,
+        "Expect the runner have called FileSystems.setDefaultConfigInWorkers().");
+    return ImmutableList.<FileSystem>of(new GcsFileSystem(options.as(GcsOptions.class)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceId.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceId.java
new file mode 100644
index 0000000..e53e5fa
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceId.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+
+/**
+ * {@link ResourceId} implementation for Google Cloud Storage.
+ */
+public class GcsResourceId implements ResourceId {
+
+  private final GcsPath gcsPath;
+
+  static GcsResourceId fromGcsPath(GcsPath gcsPath) {
+    checkNotNull(gcsPath, "gcsPath");
+    return new GcsResourceId(gcsPath);
+  }
+
+  private GcsResourceId(GcsPath gcsPath) {
+    this.gcsPath = gcsPath;
+  }
+
+  @Override
+  public GcsResourceId resolve(String other, ResolveOptions resolveOptions) {
+    checkState(
+        isDirectory(),
+        String.format("Expected the gcsPath is a directory, but had [%s].", gcsPath));
+    checkArgument(
+        resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)
+            || resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY),
+        String.format("ResolveOptions: [%s] is not supported.", resolveOptions));
+    if (resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)) {
+      checkArgument(
+          !other.endsWith("/"),
+          "The resolved file: [%s] should not end with '/'.", other);
+      return fromGcsPath(gcsPath.resolve(other));
+    } else {
+      // StandardResolveOptions.RESOLVE_DIRECTORY
+      if (other.endsWith("/")) {
+        // other already contains the delimiter for gcs.
+        // It is not recommended for callers to set the delimiter.
+        // However, we consider it as a valid input.
+        return fromGcsPath(gcsPath.resolve(other));
+      } else {
+        return fromGcsPath(gcsPath.resolve(other + "/"));
+      }
+    }
+  }
+
+  @Override
+  public GcsResourceId getCurrentDirectory() {
+    if (isDirectory()) {
+      return this;
+    } else {
+      GcsPath parent = gcsPath.getParent();
+      checkState(
+          parent != null,
+          String.format("Failed to get the current directory for path: [%s].", gcsPath));
+      return fromGcsPath(parent);
+    }
+  }
+
+  @Override
+  public boolean isDirectory() {
+    return gcsPath.endsWith("/");
+  }
+
+  @Override
+  public String getScheme() {
+    return "gs";
+  }
+
+  @Override
+  @Nullable public String getFilename() {
+    if (gcsPath.getNameCount() <= 1) {
+      return null;
+    } else {
+      GcsPath gcsFilename = gcsPath.getFileName();
+      return gcsFilename == null ? null : gcsFilename.toString();
+    }
+  }
+
+  GcsPath getGcsPath() {
+    return gcsPath;
+  }
+
+  @Override
+  public String toString() {
+    return gcsPath.toString();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof GcsResourceId)) {
+      return false;
+    }
+    GcsResourceId other = (GcsResourceId) obj;
+    return this.gcsPath.equals(other.gcsPath);
+  }
+
+  @Override
+  public int hashCode() {
+    return gcsPath.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/package-info.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/package-info.java
new file mode 100644
index 0000000..ee8552f
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines IO connectors for Google Cloud Storage.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;

http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java
new file mode 100644
index 0000000..c9ce1e5
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemRegistrarTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.io.FileSystem;
+import org.apache.beam.sdk.io.FileSystemRegistrar;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GcsFileSystemRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class GcsFileSystemRegistrarTest {
+
+  @Test
+  public void testServiceLoader() {
+    for (FileSystemRegistrar registrar
+        : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
+      if (registrar instanceof GcsFileSystemRegistrar) {
+        Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create());
+        assertThat(fileSystems, contains(instanceOf(GcsFileSystem.class)));
+        return;
+      }
+    }
+    fail("Expected to find " + GcsFileSystemRegistrar.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java
new file mode 100644
index 0000000..37ff9c8
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystemTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link GcsFileSystem}.
+ */
+@RunWith(JUnit4.class)
+public class GcsFileSystemTest {
+
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+  @Mock
+  private GcsUtil mockGcsUtil;
+  private GcsOptions gcsOptions;
+  private GcsFileSystem gcsFileSystem;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    gcsOptions = PipelineOptionsFactory.as(GcsOptions.class);
+    gcsOptions.setGcsUtil(mockGcsUtil);
+    gcsFileSystem = new GcsFileSystem(gcsOptions);
+  }
+
+  @Test
+  public void testMatch() throws Exception {
+    Objects modelObjects = new Objects();
+    List<StorageObject> items = new ArrayList<>();
+    // A directory
+    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));
+
+    // Files within the directory
+    items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */));
+    items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */));
+    items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */));
+    items.add(createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */));
+    items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 5L /* fileSize */));
+    items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 6L /* fileSize */));
+
+    modelObjects.setItems(items);
+    when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class)))
+        .thenReturn(modelObjects);
+
+    List<GcsPath> gcsPaths = ImmutableList.of(
+        GcsPath.fromUri("gs://testbucket/testdirectory/non-exist-file"),
+        GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
+
+    when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn(
+        ImmutableList.of(
+            StorageObjectOrIOException.create(new FileNotFoundException()),
+            StorageObjectOrIOException.create(
+                createStorageObject("gs://testbucket/testdirectory/otherfile", 4L))));
+
+    List<String> specs = ImmutableList.of(
+        "gs://testbucket/testdirectory/file[1-3]*",
+        "gs://testbucket/testdirectory/non-exist-file",
+        "gs://testbucket/testdirectory/otherfile");
+    List<MatchResult> matchResults = gcsFileSystem.match(specs);
+    assertEquals(3, matchResults.size());
+    assertEquals(Status.OK, matchResults.get(0).status());
+    assertThat(
+        ImmutableList.of(
+            "gs://testbucket/testdirectory/file1name",
+            "gs://testbucket/testdirectory/file2name",
+            "gs://testbucket/testdirectory/file3name"),
+        contains(toFilenames(matchResults.get(0)).toArray()));
+    assertEquals(Status.NOT_FOUND, matchResults.get(1).status());
+    assertEquals(Status.OK, matchResults.get(2).status());
+    assertThat(
+        ImmutableList.of("gs://testbucket/testdirectory/otherfile"),
+        contains(toFilenames(matchResults.get(2)).toArray()));
+
+  }
+
+  @Test
+  public void testGlobExpansion() throws IOException {
+    Objects modelObjects = new Objects();
+    List<StorageObject> items = new ArrayList<>();
+    // A directory
+    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));
+
+    // Files within the directory
+    items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */));
+    items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */));
+    items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */));
+    items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 4L /* fileSize */));
+    items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 5L /* fileSize */));
+    items.add(createStorageObject(
+        "gs://testbucket/testotherdirectory/file4name", 6L /* fileSize */));
+
+    modelObjects.setItems(items);
+
+    when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class)))
+        .thenReturn(modelObjects);
+
+    // Test patterns.
+    {
+      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*");
+      List<String> expectedFiles = ImmutableList.of(
+          "gs://testbucket/testdirectory/file1name",
+          "gs://testbucket/testdirectory/file2name",
+          "gs://testbucket/testdirectory/file3name");
+
+      assertThat(
+          expectedFiles,
+          contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
+    }
+
+    {
+      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*");
+      List<String> expectedFiles = ImmutableList.of(
+          "gs://testbucket/testdirectory/file1name",
+          "gs://testbucket/testdirectory/file2name",
+          "gs://testbucket/testdirectory/file3name");
+
+      assertThat(
+          expectedFiles,
+          contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
+    }
+
+    {
+      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*");
+      List<String> expectedFiles = ImmutableList.of(
+          "gs://testbucket/testdirectory/file1name",
+          "gs://testbucket/testdirectory/file2name",
+          "gs://testbucket/testdirectory/file3name");
+
+      assertThat(
+          expectedFiles,
+          contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
+    }
+
+    {
+      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name");
+      List<String> expectedFiles = ImmutableList.of(
+          "gs://testbucket/testdirectory/file1name",
+          "gs://testbucket/testdirectory/file2name",
+          "gs://testbucket/testdirectory/file3name");
+
+      assertThat(
+          expectedFiles,
+          contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
+    }
+
+    {
+      GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name");
+      List<String> expectedFiles = ImmutableList.of(
+          "gs://testbucket/testdirectory/file1name",
+          "gs://testbucket/testdirectory/file2name",
+          "gs://testbucket/testdirectory/file3name",
+          "gs://testbucket/testotherdirectory/file4name");
+
+      assertThat(
+          expectedFiles,
+          contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
+    }
+  }
+
+  @Test
+  public void testExpandNonGlob() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Glob expression: [testdirectory/otherfile] is not expandable.");
+    gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
+  }
+
+  // Patterns that contain recursive wildcards ('**') are not supported.
+  @Test
+  public void testRecursiveGlobExpansionFails() throws IOException {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Unsupported wildcard usage");
+    gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/test**"));
+  }
+
+  @Test
+  public void testMatchNonGlobs() throws Exception {
+    List<StorageObjectOrIOException> items = new ArrayList<>();
+    // Files within the directory
+    items.add(StorageObjectOrIOException.create(
+        createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */)));
+    items.add(StorageObjectOrIOException.create(new FileNotFoundException()));
+    items.add(StorageObjectOrIOException.create(new IOException()));
+    items.add(StorageObjectOrIOException.create(
+        createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */)));
+
+    List<GcsPath> gcsPaths = ImmutableList.of(
+        GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
+        GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
+        GcsPath.fromUri("gs://testbucket/testdirectory/file3name"),
+        GcsPath.fromUri("gs://testbucket/testdirectory/file4name"));
+
+    when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn(items);
+    List<MatchResult> matchResults = gcsFileSystem.matchNonGlobs(gcsPaths);
+
+    assertEquals(4, matchResults.size());
+    assertThat(
+        ImmutableList.of("gs://testbucket/testdirectory/file1name"),
+        contains(toFilenames(matchResults.get(0)).toArray()));
+    assertEquals(Status.NOT_FOUND, matchResults.get(1).status());
+    assertEquals(Status.ERROR, matchResults.get(2).status());
+    assertThat(
+        ImmutableList.of("gs://testbucket/testdirectory/file4name"),
+        contains(toFilenames(matchResults.get(3)).toArray()));
+  }
+
+  private StorageObject createStorageObject(String gcsFilename, long fileSize) {
+    GcsPath gcsPath = GcsPath.fromUri(gcsFilename);
+    return new StorageObject()
+        .setBucket(gcsPath.getBucket())
+        .setName(gcsPath.getObject())
+        .setSize(BigInteger.valueOf(fileSize));
+  }
+
+  private List<String> toFilenames(MatchResult matchResult) throws IOException {
+    return FluentIterable
+        .from(matchResult.metadata())
+        .transform(new Function<Metadata, String>() {
+          @Override
+          public String apply(Metadata metadata) {
+            return ((GcsResourceId) metadata.resourceId()).getGcsPath().toString();
+          }})
+        .toList();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
new file mode 100644
index 0000000..b245610
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/storage/GcsResourceIdTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.storage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GcsResourceId}.
+ */
+@RunWith(JUnit4.class)
+public class GcsResourceIdTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testResolve() throws Exception {
+    // Tests for common gcs paths.
+    assertEquals(
+        toResourceIdentifier("gs://bucket/tmp/aa"),
+        toResourceIdentifier("gs://bucket/tmp/")
+            .resolve("aa", StandardResolveOptions.RESOLVE_FILE));
+    assertEquals(
+        toResourceIdentifier("gs://bucket/tmp/aa/bb/cc/"),
+        toResourceIdentifier("gs://bucket/tmp/")
+            .resolve("aa", StandardResolveOptions.RESOLVE_DIRECTORY)
+            .resolve("bb", StandardResolveOptions.RESOLVE_DIRECTORY)
+            .resolve("cc", StandardResolveOptions.RESOLVE_DIRECTORY));
+
+    // Tests absolute path.
+    assertEquals(
+        toResourceIdentifier("gs://bucket/tmp/aa"),
+        toResourceIdentifier("gs://bucket/tmp/bb/")
+            .resolve("gs://bucket/tmp/aa", StandardResolveOptions.RESOLVE_FILE));
+
+    // Tests bucket with no ending '/'.
+    assertEquals(
+        toResourceIdentifier("gs://my_bucket/tmp"),
+        toResourceIdentifier("gs://my_bucket")
+            .resolve("tmp", StandardResolveOptions.RESOLVE_FILE));
+
+    // Tests path with unicode
+    assertEquals(
+        toResourceIdentifier("gs://bucket/输出 目录/输出 文件01.txt"),
+        toResourceIdentifier("gs://bucket/输出 目录/")
+            .resolve("输出 文件01.txt", StandardResolveOptions.RESOLVE_FILE));
+  }
+
+  @Test
+  public void testResolveHandleBadInputs() throws Exception {
+    assertEquals(
+        toResourceIdentifier("gs://my_bucket/tmp/"),
+        toResourceIdentifier("gs://my_bucket/")
+            .resolve("tmp/", StandardResolveOptions.RESOLVE_DIRECTORY));
+  }
+
+  @Test
+  public void testResolveInvalidInputs() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("The resolved file: [tmp/] should not end with '/'.");
+    toResourceIdentifier("gs://my_bucket/").resolve("tmp/", StandardResolveOptions.RESOLVE_FILE);
+  }
+
+  @Test
+  public void testResolveInvalidNotDirectory() throws Exception {
+    ResourceId tmpDir = toResourceIdentifier("gs://my_bucket/")
+        .resolve("tmp dir", StandardResolveOptions.RESOLVE_FILE);
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Expected the gcsPath is a directory, but had [gs://my_bucket/tmp dir].");
+    tmpDir.resolve("aa", StandardResolveOptions.RESOLVE_FILE);
+  }
+
+  @Test
+  public void testGetCurrentDirectory() throws Exception {
+    // Tests gcs paths.
+    assertEquals(
+        toResourceIdentifier("gs://my_bucket/tmp dir/"),
+        toResourceIdentifier("gs://my_bucket/tmp dir/").getCurrentDirectory());
+
+    // Tests path with unicode.
+    assertEquals(
+        toResourceIdentifier("gs://my_bucket/输出 目录/"),
+        toResourceIdentifier("gs://my_bucket/输出 目录/文件01.txt").getCurrentDirectory());
+
+    // Tests bucket with no ending '/'.
+    assertEquals(
+        toResourceIdentifier("gs://my_bucket/"),
+        toResourceIdentifier("gs://my_bucket").getCurrentDirectory());
+  }
+
+  @Test
+  public void testIsDirectory() throws Exception {
+    assertTrue(toResourceIdentifier("gs://my_bucket/tmp dir/").isDirectory());
+    assertTrue(toResourceIdentifier("gs://my_bucket/").isDirectory());
+    assertTrue(toResourceIdentifier("gs://my_bucket").isDirectory());
+
+    assertFalse(toResourceIdentifier("gs://my_bucket/file").isDirectory());
+  }
+
+  @Test
+  public void testInvalidGcsPath() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid GCS URI: gs://");
+    toResourceIdentifier("gs://");
+  }
+
+  @Test
+  public void testGetScheme() throws Exception {
+    // Tests gcs paths.
+    assertEquals("gs", toResourceIdentifier("gs://my_bucket/tmp dir/").getScheme());
+
+    // Tests bucket with no ending '/'.
+    assertEquals("gs", toResourceIdentifier("gs://my_bucket").getScheme());
+  }
+
+  @Test
+  public void testEquals() throws Exception {
+    assertEquals(
+        toResourceIdentifier("gs://my_bucket/tmp/"),
+        toResourceIdentifier("gs://my_bucket/tmp/"));
+
+    assertNotEquals(
+        toResourceIdentifier("gs://my_bucket/tmp"),
+        toResourceIdentifier("gs://my_bucket/tmp/"));
+  }
+
+  @Test
+  public void testGetFilename() throws Exception {
+    assertEquals(toResourceIdentifier("gs://my_bucket/").getFilename(), null);
+    assertEquals(toResourceIdentifier("gs://my_bucket/abc").getFilename(),
+        "abc");
+    assertEquals(toResourceIdentifier("gs://my_bucket/abc/").getFilename(),
+        "abc");
+    assertEquals(toResourceIdentifier("gs://my_bucket/abc/xyz.txt").getFilename(),
+        "xyz.txt");
+  }
+
+  private GcsResourceId toResourceIdentifier(String str) throws Exception {
+    return GcsResourceId.fromGcsPath(GcsPath.fromUri(str));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 3bdc5d0..9051d98 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -88,11 +88,6 @@
 
     <dependency>
       <groupId>com.google.apis</groupId>
-      <artifactId>google-api-services-storage</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.apis</groupId>
       <artifactId>google-api-services-pubsub</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
deleted file mode 100644
index 2663864..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.storage;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.api.services.storage.model.Objects;
-import com.google.api.services.storage.model.StorageObject;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.regex.Pattern;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.io.FileSystem;
-import org.apache.beam.sdk.io.fs.CreateOptions;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.MatchResult.Status;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link FileSystem} implementation for Google Cloud Storage.
- */
-class GcsFileSystem extends FileSystem<GcsResourceId> {
-  private static final Logger LOG = LoggerFactory.getLogger(GcsFileSystem.class);
-
-  private final GcsOptions options;
-
-  GcsFileSystem(GcsOptions options) {
-    this.options = checkNotNull(options, "options");
-  }
-
-  @Override
-  protected List<MatchResult> match(List<String> specs) throws IOException {
-    List<GcsPath> gcsPaths = toGcsPaths(specs);
-
-    List<GcsPath> globs = Lists.newArrayList();
-    List<GcsPath> nonGlobs = Lists.newArrayList();
-    List<Boolean> isGlobBooleans = Lists.newArrayList();
-
-    for (GcsPath path : gcsPaths) {
-      if (GcsUtil.isGlob(path)) {
-        globs.add(path);
-        isGlobBooleans.add(true);
-      } else {
-        nonGlobs.add(path);
-        isGlobBooleans.add(false);
-      }
-    }
-
-    Iterator<MatchResult> globsMatchResults = matchGlobs(globs).iterator();
-    Iterator<MatchResult> nonGlobsMatchResults = matchNonGlobs(nonGlobs).iterator();
-
-    ImmutableList.Builder<MatchResult> ret = ImmutableList.builder();
-    for (Boolean isGlob : isGlobBooleans) {
-      if (isGlob) {
-        checkState(globsMatchResults.hasNext(), "Expect globsMatchResults has next.");
-        ret.add(globsMatchResults.next());
-      } else {
-        checkState(nonGlobsMatchResults.hasNext(), "Expect nonGlobsMatchResults has next.");
-        ret.add(nonGlobsMatchResults.next());
-      }
-    }
-    checkState(!globsMatchResults.hasNext(), "Expect no more elements in globsMatchResults.");
-    checkState(!nonGlobsMatchResults.hasNext(), "Expect no more elements in nonGlobsMatchResults.");
-    return ret.build();
-  }
-
-  @Override
-  protected WritableByteChannel create(GcsResourceId resourceId, CreateOptions createOptions)
-      throws IOException {
-    return options.getGcsUtil().create(resourceId.getGcsPath(), createOptions.mimeType());
-  }
-
-  @Override
-  protected ReadableByteChannel open(GcsResourceId resourceId) throws IOException {
-    return options.getGcsUtil().open(resourceId.getGcsPath());
-  }
-
-  @Override
-  protected void rename(
-      List<GcsResourceId> srcResourceIds,
-      List<GcsResourceId> destResourceIds) throws IOException {
-    copy(srcResourceIds, destResourceIds);
-    delete(srcResourceIds);
-  }
-
-  @Override
-  protected void delete(Collection<GcsResourceId> resourceIds) throws IOException {
-    options.getGcsUtil().remove(toFilenames(resourceIds));
-  }
-
-  @Override
-  protected GcsResourceId matchNewResource(String singleResourceSpec, boolean isDirectory) {
-    if (isDirectory) {
-      if (!singleResourceSpec.endsWith("/")) {
-        singleResourceSpec += '/';
-      }
-    } else {
-      checkArgument(
-          !singleResourceSpec.endsWith("/"),
-          "Expected a file path, but [%s], ends with '/'. This is unsupported in GcsFileSystem.",
-          singleResourceSpec);
-    }
-    GcsPath path = GcsPath.fromUri(singleResourceSpec);
-    return GcsResourceId.fromGcsPath(path);
-  }
-
-  @Override
-  protected void copy(List<GcsResourceId> srcResourceIds, List<GcsResourceId> destResourceIds)
-      throws IOException {
-    options.getGcsUtil().copy(toFilenames(srcResourceIds), toFilenames(destResourceIds));
-  }
-
-  @Override
-  protected String getScheme() {
-    return "gs";
-  }
-
-  private List<MatchResult> matchGlobs(List<GcsPath> globs) {
-    // TODO: Executes in parallel, address https://issues.apache.org/jira/browse/BEAM-1503.
-    return FluentIterable.from(globs)
-        .transform(new Function<GcsPath, MatchResult>() {
-          @Override
-          public MatchResult apply(GcsPath gcsPath) {
-            try {
-              return expand(gcsPath);
-            } catch (IOException e) {
-              return MatchResult.create(Status.ERROR, e);
-            }
-          }})
-        .toList();
-  }
-
-  /**
-   * Expands a pattern into {@link MatchResult}.
-   *
-   * @throws IllegalArgumentException if {@code gcsPattern} does not contain globs.
-   */
-  @VisibleForTesting
-  MatchResult expand(GcsPath gcsPattern) throws IOException {
-    String prefix = GcsUtil.getGlobPrefix(gcsPattern.getObject());
-    Pattern p = Pattern.compile(GcsUtil.globToRegexp(gcsPattern.getObject()));
-
-    LOG.debug("matching files in bucket {}, prefix {} against pattern {}", gcsPattern.getBucket(),
-        prefix, p.toString());
-
-    String pageToken = null;
-    List<Metadata> results = new LinkedList<>();
-    do {
-      Objects objects = options.getGcsUtil().listObjects(gcsPattern.getBucket(), prefix, pageToken);
-      if (objects.getItems() == null) {
-        break;
-      }
-
-      // Filter objects based on the regex.
-      for (StorageObject o : objects.getItems()) {
-        String name = o.getName();
-        // Skip directories, which end with a slash.
-        if (p.matcher(name).matches() && !name.endsWith("/")) {
-          LOG.debug("Matched object: {}", name);
-          results.add(toMetadata(o));
-        }
-      }
-      pageToken = objects.getNextPageToken();
-    } while (pageToken != null);
-    return MatchResult.create(Status.OK, results);
-  }
-
-  /**
-   * Returns {@link MatchResult MatchResults} for the given {@link GcsPath GcsPaths}.
-   *
-   *<p>The number of returned {@link MatchResult MatchResults} equals to the number of given
-   * {@link GcsPath GcsPaths}. Each {@link MatchResult} contains one {@link Metadata}.
-   */
-  @VisibleForTesting
-  List<MatchResult> matchNonGlobs(List<GcsPath> gcsPaths) throws IOException {
-    List<StorageObjectOrIOException> results = options.getGcsUtil().getObjects(gcsPaths);
-
-    ImmutableList.Builder<MatchResult> ret = ImmutableList.builder();
-    for (StorageObjectOrIOException result : results) {
-      ret.add(toMatchResult(result));
-    }
-    return ret.build();
-  }
-
-  private MatchResult toMatchResult(StorageObjectOrIOException objectOrException) {
-    @Nullable IOException exception = objectOrException.ioException();
-    if (exception instanceof FileNotFoundException) {
-      return MatchResult.create(Status.NOT_FOUND, exception);
-    } else if (exception != null) {
-      return MatchResult.create(Status.ERROR, exception);
-    } else {
-      StorageObject object = objectOrException.storageObject();
-      assert object != null; // fix a warning; guaranteed by StorageObjectOrIOException semantics.
-      return MatchResult.create(Status.OK, ImmutableList.of(toMetadata(object)));
-    }
-  }
-
-  private Metadata toMetadata(StorageObject storageObject) {
-    // TODO: Address https://issues.apache.org/jira/browse/BEAM-1494
-    // It is incorrect to set IsReadSeekEfficient true for files with content encoding set to gzip.
-    Metadata.Builder ret = Metadata.builder()
-        .setIsReadSeekEfficient(true)
-        .setResourceId(GcsResourceId.fromGcsPath(GcsPath.fromObject(storageObject)));
-    BigInteger size = storageObject.getSize();
-    if (size != null) {
-      ret.setSizeBytes(size.longValue());
-    }
-    return ret.build();
-  }
-
-  private List<String> toFilenames(Collection<GcsResourceId> resources) {
-    return FluentIterable.from(resources)
-        .transform(
-            new Function<GcsResourceId, String>() {
-              @Override
-              public String apply(GcsResourceId resource) {
-                return resource.getGcsPath().toString();
-              }})
-        .toList();
-  }
-
-  private List<GcsPath> toGcsPaths(Collection<String> specs) {
-    return FluentIterable.from(specs)
-        .transform(new Function<String, GcsPath>() {
-          @Override
-          public GcsPath apply(String spec) {
-            return GcsPath.fromUri(spec);
-          }})
-        .toList();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java
deleted file mode 100644
index 1d4e4ad..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrar.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.storage;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-import javax.annotation.Nonnull;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.io.FileSystem;
-import org.apache.beam.sdk.io.FileSystemRegistrar;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * {@link AutoService} registrar for the {@link GcsFileSystem}.
- */
-@AutoService(FileSystemRegistrar.class)
-public class GcsFileSystemRegistrar implements FileSystemRegistrar {
-
-  @Override
-  public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
-    checkNotNull(
-        options,
-        "Expect the runner have called FileSystems.setDefaultConfigInWorkers().");
-    return ImmutableList.<FileSystem>of(new GcsFileSystem(options.as(GcsOptions.class)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
deleted file mode 100644
index 29215e7..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceId.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.storage;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.fs.ResolveOptions;
-import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-
-/**
- * {@link ResourceId} implementation for Google Cloud Storage.
- */
-public class GcsResourceId implements ResourceId {
-
-  private final GcsPath gcsPath;
-
-  static GcsResourceId fromGcsPath(GcsPath gcsPath) {
-    checkNotNull(gcsPath, "gcsPath");
-    return new GcsResourceId(gcsPath);
-  }
-
-  private GcsResourceId(GcsPath gcsPath) {
-    this.gcsPath = gcsPath;
-  }
-
-  @Override
-  public GcsResourceId resolve(String other, ResolveOptions resolveOptions) {
-    checkState(
-        isDirectory(),
-        String.format("Expected the gcsPath is a directory, but had [%s].", gcsPath));
-    checkArgument(
-        resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)
-            || resolveOptions.equals(StandardResolveOptions.RESOLVE_DIRECTORY),
-        String.format("ResolveOptions: [%s] is not supported.", resolveOptions));
-    if (resolveOptions.equals(StandardResolveOptions.RESOLVE_FILE)) {
-      checkArgument(
-          !other.endsWith("/"),
-          "The resolved file: [%s] should not end with '/'.", other);
-      return fromGcsPath(gcsPath.resolve(other));
-    } else {
-      // StandardResolveOptions.RESOLVE_DIRECTORY
-      if (other.endsWith("/")) {
-        // other already contains the delimiter for gcs.
-        // It is not recommended for callers to set the delimiter.
-        // However, we consider it as a valid input.
-        return fromGcsPath(gcsPath.resolve(other));
-      } else {
-        return fromGcsPath(gcsPath.resolve(other + "/"));
-      }
-    }
-  }
-
-  @Override
-  public GcsResourceId getCurrentDirectory() {
-    if (isDirectory()) {
-      return this;
-    } else {
-      GcsPath parent = gcsPath.getParent();
-      checkState(
-          parent != null,
-          String.format("Failed to get the current directory for path: [%s].", gcsPath));
-      return fromGcsPath(parent);
-    }
-  }
-
-  @Override
-  public boolean isDirectory() {
-    return gcsPath.endsWith("/");
-  }
-
-  @Override
-  public String getScheme() {
-    return "gs";
-  }
-
-  @Override
-  @Nullable public String getFilename() {
-    if (gcsPath.getNameCount() <= 1) {
-      return null;
-    } else {
-      GcsPath gcsFilename = gcsPath.getFileName();
-      return gcsFilename == null ? null : gcsFilename.toString();
-    }
-  }
-
-  GcsPath getGcsPath() {
-    return gcsPath;
-  }
-
-  @Override
-  public String toString() {
-    return gcsPath.toString();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (!(obj instanceof GcsResourceId)) {
-      return false;
-    }
-    GcsResourceId other = (GcsResourceId) obj;
-    return this.gcsPath.equals(other.gcsPath);
-  }
-
-  @Override
-  public int hashCode() {
-    return gcsPath.hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java
deleted file mode 100644
index b5378be..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines IO connectors for Google Cloud Storage.
- */
-package org.apache.beam.sdk.io.gcp.storage;

http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java
deleted file mode 100644
index 2fc337a..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemRegistrarTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.storage;
-
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.Lists;
-import java.util.ServiceLoader;
-import org.apache.beam.sdk.io.FileSystem;
-import org.apache.beam.sdk.io.FileSystemRegistrar;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link GcsFileSystemRegistrar}.
- */
-@RunWith(JUnit4.class)
-public class GcsFileSystemRegistrarTest {
-
-  @Test
-  public void testServiceLoader() {
-    for (FileSystemRegistrar registrar
-        : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
-      if (registrar instanceof GcsFileSystemRegistrar) {
-        Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create());
-        assertThat(fileSystems, contains(instanceOf(GcsFileSystem.class)));
-        return;
-      }
-    }
-    fail("Expected to find " + GcsFileSystemRegistrar.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
deleted file mode 100644
index cc2e0c4..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystemTest.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.storage;
-
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
-import static org.mockito.Mockito.when;
-
-import com.google.api.services.storage.model.Objects;
-import com.google.api.services.storage.model.StorageObject;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.MatchResult.Status;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.GcsUtil;
-import org.apache.beam.sdk.util.GcsUtil.StorageObjectOrIOException;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link GcsFileSystem}.
- */
-@RunWith(JUnit4.class)
-public class GcsFileSystemTest {
-
-  @Rule
-  public transient ExpectedException thrown = ExpectedException.none();
-  @Mock
-  private GcsUtil mockGcsUtil;
-  private GcsOptions gcsOptions;
-  private GcsFileSystem gcsFileSystem;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-    gcsOptions = PipelineOptionsFactory.as(GcsOptions.class);
-    gcsOptions.setGcsUtil(mockGcsUtil);
-    gcsFileSystem = new GcsFileSystem(gcsOptions);
-  }
-
-  @Test
-  public void testMatch() throws Exception {
-    Objects modelObjects = new Objects();
-    List<StorageObject> items = new ArrayList<>();
-    // A directory
-    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));
-
-    // Files within the directory
-    items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */));
-    items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */));
-    items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */));
-    items.add(createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */));
-    items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 5L /* fileSize */));
-    items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 6L /* fileSize */));
-
-    modelObjects.setItems(items);
-    when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class)))
-        .thenReturn(modelObjects);
-
-    List<GcsPath> gcsPaths = ImmutableList.of(
-        GcsPath.fromUri("gs://testbucket/testdirectory/non-exist-file"),
-        GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
-
-    when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn(
-        ImmutableList.of(
-            StorageObjectOrIOException.create(new FileNotFoundException()),
-            StorageObjectOrIOException.create(
-                createStorageObject("gs://testbucket/testdirectory/otherfile", 4L))));
-
-    List<String> specs = ImmutableList.of(
-        "gs://testbucket/testdirectory/file[1-3]*",
-        "gs://testbucket/testdirectory/non-exist-file",
-        "gs://testbucket/testdirectory/otherfile");
-    List<MatchResult> matchResults = gcsFileSystem.match(specs);
-    assertEquals(3, matchResults.size());
-    assertEquals(Status.OK, matchResults.get(0).status());
-    assertThat(
-        ImmutableList.of(
-            "gs://testbucket/testdirectory/file1name",
-            "gs://testbucket/testdirectory/file2name",
-            "gs://testbucket/testdirectory/file3name"),
-        contains(toFilenames(matchResults.get(0)).toArray()));
-    assertEquals(Status.NOT_FOUND, matchResults.get(1).status());
-    assertEquals(Status.OK, matchResults.get(2).status());
-    assertThat(
-        ImmutableList.of("gs://testbucket/testdirectory/otherfile"),
-        contains(toFilenames(matchResults.get(2)).toArray()));
-
-  }
-
-  @Test
-  public void testGlobExpansion() throws IOException {
-    Objects modelObjects = new Objects();
-    List<StorageObject> items = new ArrayList<>();
-    // A directory
-    items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));
-
-    // Files within the directory
-    items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */));
-    items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */));
-    items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */));
-    items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 4L /* fileSize */));
-    items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 5L /* fileSize */));
-    items.add(createStorageObject(
-        "gs://testbucket/testotherdirectory/file4name", 6L /* fileSize */));
-
-    modelObjects.setItems(items);
-
-    when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class)))
-        .thenReturn(modelObjects);
-
-    // Test patterns.
-    {
-      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*");
-      List<String> expectedFiles = ImmutableList.of(
-          "gs://testbucket/testdirectory/file1name",
-          "gs://testbucket/testdirectory/file2name",
-          "gs://testbucket/testdirectory/file3name");
-
-      assertThat(
-          expectedFiles,
-          contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
-    }
-
-    {
-      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*");
-      List<String> expectedFiles = ImmutableList.of(
-          "gs://testbucket/testdirectory/file1name",
-          "gs://testbucket/testdirectory/file2name",
-          "gs://testbucket/testdirectory/file3name");
-
-      assertThat(
-          expectedFiles,
-          contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
-    }
-
-    {
-      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*");
-      List<String> expectedFiles = ImmutableList.of(
-          "gs://testbucket/testdirectory/file1name",
-          "gs://testbucket/testdirectory/file2name",
-          "gs://testbucket/testdirectory/file3name");
-
-      assertThat(
-          expectedFiles,
-          contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
-    }
-
-    {
-      GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name");
-      List<String> expectedFiles = ImmutableList.of(
-          "gs://testbucket/testdirectory/file1name",
-          "gs://testbucket/testdirectory/file2name",
-          "gs://testbucket/testdirectory/file3name");
-
-      assertThat(
-          expectedFiles,
-          contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
-    }
-
-    {
-      GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name");
-      List<String> expectedFiles = ImmutableList.of(
-          "gs://testbucket/testdirectory/file1name",
-          "gs://testbucket/testdirectory/file2name",
-          "gs://testbucket/testdirectory/file3name",
-          "gs://testbucket/testotherdirectory/file4name");
-
-      assertThat(
-          expectedFiles,
-          contains(toFilenames(gcsFileSystem.expand(pattern)).toArray()));
-    }
-  }
-
-  @Test
-  public void testExpandNonGlob() throws Exception {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Glob expression: [testdirectory/otherfile] is not expandable.");
-    gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
-  }
-
-  // Patterns that contain recursive wildcards ('**') are not supported.
-  @Test
-  public void testRecursiveGlobExpansionFails() throws IOException {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Unsupported wildcard usage");
-    gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/test**"));
-  }
-
-  @Test
-  public void testMatchNonGlobs() throws Exception {
-    List<StorageObjectOrIOException> items = new ArrayList<>();
-    // Files within the directory
-    items.add(StorageObjectOrIOException.create(
-        createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */)));
-    items.add(StorageObjectOrIOException.create(new FileNotFoundException()));
-    items.add(StorageObjectOrIOException.create(new IOException()));
-    items.add(StorageObjectOrIOException.create(
-        createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */)));
-
-    List<GcsPath> gcsPaths = ImmutableList.of(
-        GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
-        GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
-        GcsPath.fromUri("gs://testbucket/testdirectory/file3name"),
-        GcsPath.fromUri("gs://testbucket/testdirectory/file4name"));
-
-    when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn(items);
-    List<MatchResult> matchResults = gcsFileSystem.matchNonGlobs(gcsPaths);
-
-    assertEquals(4, matchResults.size());
-    assertThat(
-        ImmutableList.of("gs://testbucket/testdirectory/file1name"),
-        contains(toFilenames(matchResults.get(0)).toArray()));
-    assertEquals(Status.NOT_FOUND, matchResults.get(1).status());
-    assertEquals(Status.ERROR, matchResults.get(2).status());
-    assertThat(
-        ImmutableList.of("gs://testbucket/testdirectory/file4name"),
-        contains(toFilenames(matchResults.get(3)).toArray()));
-  }
-
-  private StorageObject createStorageObject(String gcsFilename, long fileSize) {
-    GcsPath gcsPath = GcsPath.fromUri(gcsFilename);
-    return new StorageObject()
-        .setBucket(gcsPath.getBucket())
-        .setName(gcsPath.getObject())
-        .setSize(BigInteger.valueOf(fileSize));
-  }
-
-  private List<String> toFilenames(MatchResult matchResult) throws IOException {
-    return FluentIterable
-        .from(matchResult.metadata())
-        .transform(new Function<Metadata, String>() {
-          @Override
-          public String apply(Metadata metadata) {
-            return ((GcsResourceId) metadata.resourceId()).getGcsPath().toString();
-          }})
-        .toList();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b2a4ae2b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java
deleted file mode 100644
index 702e754..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsResourceIdTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.storage;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link GcsResourceId}.
- */
-@RunWith(JUnit4.class)
-public class GcsResourceIdTest {
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Test
-  public void testResolve() throws Exception {
-    // Tests for common gcs paths.
-    assertEquals(
-        toResourceIdentifier("gs://bucket/tmp/aa"),
-        toResourceIdentifier("gs://bucket/tmp/")
-            .resolve("aa", StandardResolveOptions.RESOLVE_FILE));
-    assertEquals(
-        toResourceIdentifier("gs://bucket/tmp/aa/bb/cc/"),
-        toResourceIdentifier("gs://bucket/tmp/")
-            .resolve("aa", StandardResolveOptions.RESOLVE_DIRECTORY)
-            .resolve("bb", StandardResolveOptions.RESOLVE_DIRECTORY)
-            .resolve("cc", StandardResolveOptions.RESOLVE_DIRECTORY));
-
-    // Tests absolute path.
-    assertEquals(
-        toResourceIdentifier("gs://bucket/tmp/aa"),
-        toResourceIdentifier("gs://bucket/tmp/bb/")
-            .resolve("gs://bucket/tmp/aa", StandardResolveOptions.RESOLVE_FILE));
-
-    // Tests bucket with no ending '/'.
-    assertEquals(
-        toResourceIdentifier("gs://my_bucket/tmp"),
-        toResourceIdentifier("gs://my_bucket")
-            .resolve("tmp", StandardResolveOptions.RESOLVE_FILE));
-
-    // Tests path with unicode
-    assertEquals(
-        toResourceIdentifier("gs://bucket/输出 目录/输出 文件01.txt"),
-        toResourceIdentifier("gs://bucket/输出 目录/")
-            .resolve("输出 文件01.txt", StandardResolveOptions.RESOLVE_FILE));
-  }
-
-  @Test
-  public void testResolveHandleBadInputs() throws Exception {
-    assertEquals(
-        toResourceIdentifier("gs://my_bucket/tmp/"),
-        toResourceIdentifier("gs://my_bucket/")
-            .resolve("tmp/", StandardResolveOptions.RESOLVE_DIRECTORY));
-  }
-
-  @Test
-  public void testResolveInvalidInputs() throws Exception {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("The resolved file: [tmp/] should not end with '/'.");
-    toResourceIdentifier("gs://my_bucket/").resolve("tmp/", StandardResolveOptions.RESOLVE_FILE);
-  }
-
-  @Test
-  public void testResolveInvalidNotDirectory() throws Exception {
-    ResourceId tmpDir = toResourceIdentifier("gs://my_bucket/")
-        .resolve("tmp dir", StandardResolveOptions.RESOLVE_FILE);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Expected the gcsPath is a directory, but had [gs://my_bucket/tmp dir].");
-    tmpDir.resolve("aa", StandardResolveOptions.RESOLVE_FILE);
-  }
-
-  @Test
-  public void testGetCurrentDirectory() throws Exception {
-    // Tests gcs paths.
-    assertEquals(
-        toResourceIdentifier("gs://my_bucket/tmp dir/"),
-        toResourceIdentifier("gs://my_bucket/tmp dir/").getCurrentDirectory());
-
-    // Tests path with unicode.
-    assertEquals(
-        toResourceIdentifier("gs://my_bucket/输出 目录/"),
-        toResourceIdentifier("gs://my_bucket/输出 目录/文件01.txt").getCurrentDirectory());
-
-    // Tests bucket with no ending '/'.
-    assertEquals(
-        toResourceIdentifier("gs://my_bucket/"),
-        toResourceIdentifier("gs://my_bucket").getCurrentDirectory());
-  }
-
-  @Test
-  public void testIsDirectory() throws Exception {
-    assertTrue(toResourceIdentifier("gs://my_bucket/tmp dir/").isDirectory());
-    assertTrue(toResourceIdentifier("gs://my_bucket/").isDirectory());
-    assertTrue(toResourceIdentifier("gs://my_bucket").isDirectory());
-
-    assertFalse(toResourceIdentifier("gs://my_bucket/file").isDirectory());
-  }
-
-  @Test
-  public void testInvalidGcsPath() throws Exception {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid GCS URI: gs://");
-    toResourceIdentifier("gs://");
-  }
-
-  @Test
-  public void testGetScheme() throws Exception {
-    // Tests gcs paths.
-    assertEquals("gs", toResourceIdentifier("gs://my_bucket/tmp dir/").getScheme());
-
-    // Tests bucket with no ending '/'.
-    assertEquals("gs", toResourceIdentifier("gs://my_bucket").getScheme());
-  }
-
-  @Test
-  public void testEquals() throws Exception {
-    assertEquals(
-        toResourceIdentifier("gs://my_bucket/tmp/"),
-        toResourceIdentifier("gs://my_bucket/tmp/"));
-
-    assertNotEquals(
-        toResourceIdentifier("gs://my_bucket/tmp"),
-        toResourceIdentifier("gs://my_bucket/tmp/"));
-  }
-
-  @Test
-  public void testGetFilename() throws Exception {
-    assertEquals(toResourceIdentifier("gs://my_bucket/").getFilename(), null);
-    assertEquals(toResourceIdentifier("gs://my_bucket/abc").getFilename(),
-        "abc");
-    assertEquals(toResourceIdentifier("gs://my_bucket/abc/").getFilename(),
-        "abc");
-    assertEquals(toResourceIdentifier("gs://my_bucket/abc/xyz.txt").getFilename(),
-        "xyz.txt");
-  }
-
-  private GcsResourceId toResourceIdentifier(String str) throws Exception {
-    return GcsResourceId.fromGcsPath(GcsPath.fromUri(str));
-  }
-}