You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/18 23:19:28 UTC

[2/7] beam git commit: [BEAM-1871] Create new GCP core module package and move several GCP related classes from beam-sdks-java-core over.

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
new file mode 100644
index 0000000..6fac6dc
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.client.util.BackOff;
+
+
+/**
+ * Implementation of {@link BackOff} that increases the back off period for each retry attempt
+ * using a randomization function that grows exponentially.
+ *
+ * <p>Example: The initial interval is .5 seconds and the maximum interval is 60 secs.
+ * For 14 tries the sequence will be (values in seconds):
+ *
+ * <pre>
+ * retry#      retry_interval     randomized_interval
+ * 1             0.5                [0.25,   0.75]
+ * 2             0.75               [0.375,  1.125]
+ * 3             1.125              [0.562,  1.687]
+ * 4             1.687              [0.8435, 2.53]
+ * 5             2.53               [1.265,  3.795]
+ * 6             3.795              [1.897,  5.692]
+ * 7             5.692              [2.846,  8.538]
+ * 8             8.538              [4.269, 12.807]
+ * 9            12.807              [6.403, 19.210]
+ * 10           28.832              [14.416, 43.248]
+ * 11           43.248              [21.624, 64.873]
+ * 12           60.0                [30.0, 90.0]
+ * 13           60.0                [30.0, 90.0]
+ * 14           60.0                [30.0, 90.0]
+ * </pre>
+ *
+ * <p>Implementation is not thread-safe.
+ */
+@Deprecated
+public class IntervalBoundedExponentialBackOff implements BackOff {
+  public static final double DEFAULT_MULTIPLIER = 1.5;
+  public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;
+  private final long maximumIntervalMillis;
+  private final long initialIntervalMillis;
+  private int currentAttempt;
+
+  public IntervalBoundedExponentialBackOff(long maximumIntervalMillis, long initialIntervalMillis) {
+    checkArgument(maximumIntervalMillis > 0, "Maximum interval must be greater than zero.");
+    checkArgument(initialIntervalMillis > 0, "Initial interval must be greater than zero.");
+    this.maximumIntervalMillis = maximumIntervalMillis;
+    this.initialIntervalMillis = initialIntervalMillis;
+    reset();
+  }
+
+  @Override
+  public void reset() {
+    currentAttempt = 1;
+  }
+
+  @Override
+  public long nextBackOffMillis() {
+    double currentIntervalMillis =
+        Math.min(
+            initialIntervalMillis * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1),
+            maximumIntervalMillis);
+    double randomOffset =
+        (Math.random() * 2 - 1) * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis;
+    currentAttempt += 1;
+    return Math.round(currentIntervalMillis + randomOffset);
+  }
+
+  public boolean atMaxInterval() {
+    return initialIntervalMillis * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1)
+        >= maximumIntervalMillis;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java
new file mode 100644
index 0000000..f703e4c
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.auth.Credentials;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Construct an oauth credential to be used by the SDK and the SDK workers.
+ * Always returns a null Credential object.
+ */
+public class NoopCredentialFactory implements CredentialFactory {
+  private static final NoopCredentialFactory INSTANCE = new NoopCredentialFactory();
+  private static final NoopCredentials NOOP_CREDENTIALS = new NoopCredentials();
+
+  public static NoopCredentialFactory fromOptions(PipelineOptions options) {
+    return INSTANCE;
+  }
+
+  @Override
+  public Credentials getCredential() throws IOException {
+    return NOOP_CREDENTIALS;
+  }
+
+  private static class NoopCredentials extends Credentials {
+    @Override
+    public String getAuthenticationType() {
+      return null;
+    }
+
+    @Override
+    public Map<String, List<String>> getRequestMetadata(URI uri) throws IOException {
+      return null;
+    }
+
+    @Override
+    public boolean hasRequestMetadata() {
+      return false;
+    }
+
+    @Override
+    public boolean hasRequestMetadataOnly() {
+      return false;
+    }
+
+    @Override
+    public void refresh() throws IOException {}
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java
new file mode 100644
index 0000000..4ed35c6
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
+import java.io.IOException;
+
+/**
+ * A {@link HttpRequestInitializer} for requests that don't have credentials.
+ *
+ * <p>When the access is denied, it throws {@link IOException} with a detailed error message.
+ */
+public class NullCredentialInitializer implements HttpRequestInitializer {
+  private static final int ACCESS_DENIED = 401;
+  private static final String NULL_CREDENTIAL_REASON =
+      "Unable to get application default credentials. Please see "
+      + "https://developers.google.com/accounts/docs/application-default-credentials "
+      + "for details on how to specify credentials. This version of the SDK is "
+      + "dependent on the gcloud core component version 2015.02.05 or newer to "
+      + "be able to get credentials from the currently authorized user via gcloud auth.";
+
+  @Override
+  public void initialize(HttpRequest httpRequest) throws IOException {
+    httpRequest.setUnsuccessfulResponseHandler(new NullCredentialHttpUnsuccessfulResponseHandler());
+  }
+
+  private static class NullCredentialHttpUnsuccessfulResponseHandler
+      implements HttpUnsuccessfulResponseHandler {
+
+    @Override
+    public boolean handleResponse(
+        HttpRequest httpRequest,
+        HttpResponse httpResponse, boolean supportsRetry) throws IOException {
+      if (!httpResponse.isSuccessStatusCode() && httpResponse.getStatusCode() == ACCESS_DENIED) {
+        throwNullCredentialException();
+      }
+      return supportsRetry;
+    }
+  }
+
+  public static void throwNullCredentialException() {
+    throw new RuntimeException(NULL_CREDENTIAL_REASON);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
new file mode 100644
index 0000000..80c093b
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.storage.Storage;
+import com.google.auth.Credentials;
+import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.GeneralSecurityException;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PubsubOptions;
+
+/**
+ * Helpers for cloud communication.
+ */
+public class Transport {
+
+  private static class SingletonHelper {
+    /** Global instance of the JSON factory. */
+    private static final JsonFactory JSON_FACTORY;
+
+    /** Global instance of the HTTP transport. */
+    private static final HttpTransport HTTP_TRANSPORT;
+
+    static {
+      try {
+        JSON_FACTORY = JacksonFactory.getDefaultInstance();
+        HTTP_TRANSPORT = GoogleNetHttpTransport.newTrustedTransport();
+      } catch (GeneralSecurityException | IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  public static HttpTransport getTransport() {
+    return SingletonHelper.HTTP_TRANSPORT;
+  }
+
+  public static JsonFactory getJsonFactory() {
+    return SingletonHelper.JSON_FACTORY;
+  }
+
+  private static class ApiComponents {
+    public String rootUrl;
+    public String servicePath;
+
+    public ApiComponents(String root, String path) {
+      this.rootUrl = root;
+      this.servicePath = path;
+    }
+  }
+
+  private static ApiComponents apiComponentsFromUrl(String urlString) {
+    try {
+      URL url = new URL(urlString);
+      String rootUrl = url.getProtocol() + "://" + url.getHost()
+          + (url.getPort() > 0 ? ":" + url.getPort() : "");
+      return new ApiComponents(rootUrl, url.getPath());
+    } catch (MalformedURLException e) {
+      throw new RuntimeException("Invalid URL: " + urlString);
+    }
+  }
+
+  /**
+   * Returns a BigQuery client builder using the specified {@link BigQueryOptions}.
+   */
+  public static Bigquery.Builder
+      newBigQueryClient(BigQueryOptions options) {
+    return new Bigquery.Builder(getTransport(), getJsonFactory(),
+        chainHttpRequestInitializer(
+            options.getGcpCredential(),
+            // Do not log 404. It clutters the output and is possibly even required by the caller.
+            new RetryHttpRequestInitializer(ImmutableList.of(404))))
+        .setApplicationName(options.getAppName())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  /**
+   * Returns a Pubsub client builder using the specified {@link PubsubOptions}.
+   *
+   * @deprecated Use an appropriate org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory
+   */
+  @Deprecated
+  public static Pubsub.Builder
+      newPubsubClient(PubsubOptions options) {
+    return new Pubsub.Builder(getTransport(), getJsonFactory(),
+        chainHttpRequestInitializer(
+            options.getGcpCredential(),
+            // Do not log 404. It clutters the output and is possibly even required by the caller.
+            new RetryHttpRequestInitializer(ImmutableList.of(404))))
+        .setRootUrl(options.getPubsubRootUrl())
+        .setApplicationName(options.getAppName())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  /**
+   * Returns a CloudResourceManager client builder using the specified
+   * {@link CloudResourceManagerOptions}.
+   */
+  public static CloudResourceManager.Builder
+      newCloudResourceManagerClient(CloudResourceManagerOptions options) {
+    Credentials credentials = options.getGcpCredential();
+    if (credentials == null) {
+      NullCredentialInitializer.throwNullCredentialException();
+    }
+    return new CloudResourceManager.Builder(getTransport(), getJsonFactory(),
+        chainHttpRequestInitializer(
+            credentials,
+            // Do not log 404. It clutters the output and is possibly even required by the caller.
+            new RetryHttpRequestInitializer(ImmutableList.of(404))))
+        .setApplicationName(options.getAppName())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  /**
+   * Returns a Cloud Storage client builder using the specified {@link GcsOptions}.
+   */
+  public static Storage.Builder
+      newStorageClient(GcsOptions options) {
+    String servicePath = options.getGcsEndpoint();
+    Storage.Builder storageBuilder = new Storage.Builder(getTransport(), getJsonFactory(),
+        chainHttpRequestInitializer(
+            options.getGcpCredential(),
+            // Do not log the code 404. Code up the stack will deal with 404's if needed, and
+            // logging it by default clutters the output during file staging.
+            new RetryHttpRequestInitializer(
+                ImmutableList.of(404), new UploadIdResponseInterceptor())))
+        .setApplicationName(options.getAppName())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+    if (servicePath != null) {
+      ApiComponents components = apiComponentsFromUrl(servicePath);
+      storageBuilder.setRootUrl(components.rootUrl);
+      storageBuilder.setServicePath(components.servicePath);
+    }
+    return storageBuilder;
+  }
+
+  private static HttpRequestInitializer chainHttpRequestInitializer(
+      Credentials credential, HttpRequestInitializer httpRequestInitializer) {
+    if (credential == null) {
+      return new ChainingHttpRequestInitializer(
+          new NullCredentialInitializer(), httpRequestInitializer);
+    } else {
+      return new ChainingHttpRequestInitializer(
+          new HttpCredentialsAdapter(credential),
+          httpRequestInitializer);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java
new file mode 100644
index 0000000..f8135e7
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Defines Google Cloud Platform component utilities that can be used by Beam runners. */
+package org.apache.beam.sdk.util;

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
new file mode 100644
index 0000000..37fb42d
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam;
+
+import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.beam.sdk.util.ApiSurface;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** API surface verification for Google Cloud Platform core components. */
+@RunWith(JUnit4.class)
+public class GcpCoreApiSurfaceTest {
+
+  @Test
+  public void testApiSurface() throws Exception {
+
+    @SuppressWarnings("unchecked")
+    final Set<String> allowed =
+        ImmutableSet.of(
+            "org.apache.beam",
+            "com.google.api.client",
+            "com.google.api.services.bigquery",
+            "com.google.api.services.cloudresourcemanager",
+            "com.google.api.services.pubsub",
+            "com.google.api.services.storage",
+            "com.google.auth",
+            "com.google.protobuf",
+            "com.fasterxml.jackson.annotation",
+            "com.fasterxml.jackson.core",
+            "com.fasterxml.jackson.databind",
+            "org.apache.avro",
+            "org.hamcrest",
+            // via DataflowMatchers
+            "org.codehaus.jackson",
+            // via Avro
+            "org.joda.time",
+            "org.junit");
+
+    assertThat(
+        ApiSurface.getSdkApiSurface(getClass().getClassLoader()), containsOnlyPackages(allowed));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
new file mode 100644
index 0000000..288383e
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.beam.sdk.options.GcpOptions.DefaultProjectFactory;
+import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GcpOptions}. */
+@RunWith(JUnit4.class)
+public class GcpOptionsTest {
+  @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testGetProjectFromCloudSdkConfigEnv() throws Exception {
+    Map<String, String> environment =
+        ImmutableMap.of("CLOUDSDK_CONFIG", tmpFolder.getRoot().getAbsolutePath());
+    assertEquals("test-project",
+        runGetProjectTest(tmpFolder.newFile("properties"), environment));
+  }
+
+  @Test
+  public void testGetProjectFromAppDataEnv() throws Exception {
+    Map<String, String> environment =
+        ImmutableMap.of("APPDATA", tmpFolder.getRoot().getAbsolutePath());
+    System.setProperty("os.name", "windows");
+    assertEquals("test-project",
+        runGetProjectTest(new File(tmpFolder.newFolder("gcloud"), "properties"),
+            environment));
+  }
+
+  @Test
+  public void testGetProjectFromUserHomeEnvOld() throws Exception {
+    Map<String, String> environment = ImmutableMap.of();
+    System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+    assertEquals("test-project",
+        runGetProjectTest(
+            new File(tmpFolder.newFolder(".config", "gcloud"), "properties"),
+            environment));
+  }
+
+  @Test
+  public void testGetProjectFromUserHomeEnv() throws Exception {
+    Map<String, String> environment = ImmutableMap.of();
+    System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+    assertEquals("test-project",
+        runGetProjectTest(
+            new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"),
+            environment));
+  }
+
+  @Test
+  public void testGetProjectFromUserHomeOldAndNewPrefersNew() throws Exception {
+    Map<String, String> environment = ImmutableMap.of();
+    System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+    makePropertiesFileWithProject(new File(tmpFolder.newFolder(".config", "gcloud"), "properties"),
+        "old-project");
+    assertEquals("test-project",
+        runGetProjectTest(
+            new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"),
+            environment));
+  }
+
+  @Test
+  public void testUnableToGetDefaultProject() throws Exception {
+    System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+    DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
+    when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.<String, String>of());
+    assertNull(projectFactory.create(PipelineOptionsFactory.create()));
+  }
+
+  @Test
+  public void testEmptyGcpTempLocation() throws Exception {
+    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    options.setProject("");
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("--project is a required option");
+    options.getGcpTempLocation();
+  }
+
+  @Test
+  public void testDefaultGcpTempLocation() throws Exception {
+    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    String tempLocation = "gs://bucket";
+    options.setTempLocation(tempLocation);
+    options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class);
+    assertEquals(tempLocation, options.getGcpTempLocation());
+  }
+
+  @Test
+  public void testDefaultGcpTempLocationInvalid() throws Exception {
+    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    options.setTempLocation("file://");
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Error constructing default value for gcpTempLocation: tempLocation is not"
+            + " a valid GCS path");
+    options.getGcpTempLocation();
+  }
+
+  @Test
+  public void testDefaultGcpTempLocationDoesNotExist() {
+    GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+    String tempLocation = "gs://does/not/exist";
+    options.setTempLocation(tempLocation);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Error constructing default value for gcpTempLocation: tempLocation is not"
+            + " a valid GCS path");
+    thrown.expectCause(
+        hasMessage(containsString("Output path does not exist or is not writeable")));
+
+    options.getGcpTempLocation();
+  }
+
+  private static void makePropertiesFileWithProject(File path, String projectId)
+      throws IOException {
+    String properties = String.format("[core]%n"
+        + "account = test-account@google.com%n"
+        + "project = %s%n"
+        + "%n"
+        + "[dataflow]%n"
+        + "magic = true%n", projectId);
+    Files.write(properties, path, StandardCharsets.UTF_8);
+  }
+
+  private static String runGetProjectTest(File path, Map<String, String> environment)
+      throws Exception {
+    makePropertiesFileWithProject(path, "test-project");
+    DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
+    when(projectFactory.getEnvironment()).thenReturn(environment);
+    return projectFactory.create(PipelineOptionsFactory.create());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
new file mode 100644
index 0000000..dae7208
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.bigquery.Bigquery.Datasets.Delete;
+import com.google.api.services.storage.Storage;
+import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.Transport;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GoogleApiDebugOptions}. */
+@RunWith(JUnit4.class)
+public class GoogleApiDebugOptionsTest {
+  private static final String STORAGE_GET_TRACE =
+      "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}";
+  private static final String STORAGE_GET_AND_LIST_TRACE =
+      "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\","
+      + "\"Objects.List\":\"ListTraceDestination\"}";
+  private static final String STORAGE_TRACE = "--googleApiTrace={\"Storage\":\"TraceDestination\"}";
+
+  @Test
+  public void testWhenTracingMatches() throws Exception {
+    String[] args = new String[] {STORAGE_GET_TRACE};
+    GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+    assertNotNull(options.getGoogleApiTrace());
+
+    Storage.Objects.Get request =
+        Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+    assertEquals("GetTraceDestination", request.get("$trace"));
+  }
+
+  @Test
+  public void testWhenTracingDoesNotMatch() throws Exception {
+    String[] args = new String[] {STORAGE_GET_TRACE};
+    GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+
+    assertNotNull(options.getGoogleApiTrace());
+
+    Storage.Objects.List request =
+        Transport.newStorageClient(options).build().objects().list("testProjectId");
+    assertNull(request.get("$trace"));
+  }
+
+  @Test
+  public void testWithMultipleTraces() throws Exception {
+    String[] args = new String[] {STORAGE_GET_AND_LIST_TRACE};
+    GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+
+    assertNotNull(options.getGoogleApiTrace());
+
+    Storage.Objects.Get getRequest =
+        Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+    assertEquals("GetTraceDestination", getRequest.get("$trace"));
+
+    Storage.Objects.List listRequest =
+        Transport.newStorageClient(options).build().objects().list("testProjectId");
+    assertEquals("ListTraceDestination", listRequest.get("$trace"));
+  }
+
+  @Test
+  public void testMatchingAllCalls() throws Exception {
+    String[] args = new String[] {STORAGE_TRACE};
+    GcsOptions options =
+        PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+
+    assertNotNull(options.getGoogleApiTrace());
+
+    Storage.Objects.Get getRequest =
+        Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+    assertEquals("TraceDestination", getRequest.get("$trace"));
+
+    Storage.Objects.List listRequest =
+        Transport.newStorageClient(options).build().objects().list("testProjectId");
+    assertEquals("TraceDestination", listRequest.get("$trace"));
+  }
+
+  @Test
+  public void testMatchingAgainstClient() throws Exception {
+    GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+    options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor(
+        Transport.newStorageClient(options).build(), "TraceDestination"));
+
+    Storage.Objects.Get getRequest =
+        Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+    assertEquals("TraceDestination", getRequest.get("$trace"));
+
+    Delete deleteRequest = Transport.newBigQueryClient(options.as(BigQueryOptions.class))
+        .build().datasets().delete("testProjectId", "testDatasetId");
+    assertNull(deleteRequest.get("$trace"));
+  }
+
+  @Test
+  public void testMatchingAgainstRequestType() throws Exception {
+    GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+    options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor(
+        Transport.newStorageClient(options).build().objects()
+            .get("aProjectId", "aObjectId"), "TraceDestination"));
+
+    Storage.Objects.Get getRequest =
+        Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+    assertEquals("TraceDestination", getRequest.get("$trace"));
+
+    Storage.Objects.List listRequest =
+        Transport.newStorageClient(options).build().objects().list("testProjectId");
+    assertNull(listRequest.get("$trace"));
+  }
+
+  @Test
+  public void testDeserializationAndSerializationOfGoogleApiTracer() throws Exception {
+    String serializedValue = "{\"Api\":\"Token\"}";
+    ObjectMapper objectMapper = new ObjectMapper();
+    assertEquals(serializedValue,
+        objectMapper.writeValueAsString(
+            objectMapper.readValue(serializedValue, GoogleApiTracer.class)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
new file mode 100644
index 0000000..3b35856
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.QueryRequest;
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.api.services.bigquery.model.TableCell;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.math.BigInteger;
+import org.apache.beam.sdk.PipelineResult;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link BigqueryMatcher}.
+ */
+@RunWith(JUnit4.class)
+public class BigqueryMatcherTest {
+  private final String appName = "test-app";
+  private final String projectId = "test-project";
+  private final String query = "test-query";
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
+  @Mock private Bigquery mockBigqueryClient;
+  @Mock private Bigquery.Jobs mockJobs;
+  @Mock private Bigquery.Jobs.Query mockQuery;
+  @Mock private PipelineResult mockResult;
+
+  @Before
+  public void setUp() throws IOException {
+    MockitoAnnotations.initMocks(this);
+    when(mockBigqueryClient.jobs()).thenReturn(mockJobs);
+    when(mockJobs.query(anyString(), any(QueryRequest.class))).thenReturn(mockQuery);
+  }
+
+  @Test
+  public void testBigqueryMatcherThatSucceeds() throws Exception {
+    BigqueryMatcher matcher = spy(
+        new BigqueryMatcher(
+            appName, projectId, query, "9bb47f5c90d2a99cad526453dff5ed5ec74650dc"));
+    doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
+    when(mockQuery.execute()).thenReturn(createResponseContainingTestData());
+
+    assertThat(mockResult, matcher);
+    verify(matcher).newBigqueryClient(eq(appName));
+    verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
+  }
+
+  @Test
+  public void testBigqueryMatcherFailsForChecksumMismatch() throws IOException {
+    BigqueryMatcher matcher = spy(
+        new BigqueryMatcher(appName, projectId, query, "incorrect-checksum"));
+    doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
+    when(mockQuery.execute()).thenReturn(createResponseContainingTestData());
+
+    thrown.expect(AssertionError.class);
+    thrown.expectMessage("Total number of rows are: 1");
+    thrown.expectMessage("abc");
+    try {
+      assertThat(mockResult, matcher);
+    } finally {
+      verify(matcher).newBigqueryClient(eq(appName));
+      verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
+    }
+  }
+
+  @Test
+  public void testBigqueryMatcherFailsWhenQueryJobNotComplete() throws Exception {
+    BigqueryMatcher matcher = spy(
+        new BigqueryMatcher(appName, projectId, query, "some-checksum"));
+    doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
+    when(mockQuery.execute()).thenReturn(new QueryResponse().setJobComplete(false));
+
+    thrown.expect(AssertionError.class);
+    thrown.expectMessage("The query job hasn't completed.");
+    thrown.expectMessage("jobComplete=false");
+    try {
+      assertThat(mockResult, matcher);
+    } finally {
+      verify(matcher).newBigqueryClient(eq(appName));
+      verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
+    }
+  }
+
+  @Test
+  public void testQueryWithRetriesWhenServiceFails() throws Exception {
+    BigqueryMatcher matcher = spy(
+        new BigqueryMatcher(appName, projectId, query, "some-checksum"));
+    when(mockQuery.execute()).thenThrow(new IOException());
+
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Unable to get BigQuery response after retrying");
+    try {
+      matcher.queryWithRetries(
+          mockBigqueryClient,
+          new QueryRequest(),
+          fastClock,
+          BigqueryMatcher.BACKOFF_FACTORY.backoff());
+    } finally {
+      verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
+          .query(eq(projectId), eq(new QueryRequest()));
+    }
+  }
+
+  @Test
+  public void testQueryWithRetriesWhenQueryResponseNull() throws Exception {
+    BigqueryMatcher matcher = spy(
+        new BigqueryMatcher(appName, projectId, query, "some-checksum"));
+    when(mockQuery.execute()).thenReturn(null);
+
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Unable to get BigQuery response after retrying");
+    try {
+      matcher.queryWithRetries(
+          mockBigqueryClient,
+          new QueryRequest(),
+          fastClock,
+          BigqueryMatcher.BACKOFF_FACTORY.backoff());
+    } finally {
+      verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
+          .query(eq(projectId), eq(new QueryRequest()));
+    }
+  }
+
+  private QueryResponse createResponseContainingTestData() {
+    TableCell field1 = new TableCell();
+    field1.setV("abc");
+    TableCell field2 = new TableCell();
+    field2.setV("2");
+    TableCell field3 = new TableCell();
+    field3.setV("testing BigQuery matcher.");
+    TableRow row = new TableRow();
+    row.setF(Lists.newArrayList(field1, field2, field3));
+
+    QueryResponse response = new QueryResponse();
+    response.setJobComplete(true);
+    response.setRows(Lists.newArrayList(row));
+    response.setTotalRows(BigInteger.ONE);
+    return response;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
new file mode 100644
index 0000000..395e1f3
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.storage.model.Bucket;
+import java.io.IOException;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.MockitoAnnotations;
+import org.mockito.MockitoAnnotations.Mock;
+
+/** Tests for DefaultBucket. */
+@RunWith(JUnit4.class)
+public class DefaultBucketTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private PipelineOptions options;
+  @Mock
+  private GcsUtil gcsUtil;
+  @Mock
+  private GcpProjectUtil gcpUtil;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    options = PipelineOptionsFactory.create();
+    options.as(GcsOptions.class).setGcsUtil(gcsUtil);
+    options.as(CloudResourceManagerOptions.class).setGcpProjectUtil(gcpUtil);
+    options.as(GcpOptions.class).setProject("foo");
+    options.as(GcpOptions.class).setZone("us-north1-a");
+  }
+
+  @Test
+  public void testCreateBucket() {
+    String bucket = DefaultBucket.tryCreateDefaultBucket(options);
+    assertEquals("gs://dataflow-staging-us-north1-0", bucket);
+  }
+
+  @Test
+  public void testCreateBucketProjectLookupFails() throws IOException {
+    when(gcpUtil.getProjectNumber("foo")).thenThrow(new IOException("badness"));
+
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Unable to verify project");
+    DefaultBucket.tryCreateDefaultBucket(options);
+  }
+
+  @Test
+  public void testCreateBucketCreateBucketFails() throws IOException {
+    doThrow(new IOException("badness")).when(
+      gcsUtil).createBucket(any(String.class), any(Bucket.class));
+
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Unable create default bucket");
+    DefaultBucket.tryCreateDefaultBucket(options);
+  }
+
+  @Test
+  public void testCannotGetBucketOwner() throws IOException {
+    when(gcsUtil.bucketOwner(any(GcsPath.class)))
+      .thenThrow(new IOException("badness"));
+
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Unable to determine the owner");
+    DefaultBucket.tryCreateDefaultBucket(options);
+  }
+
+  @Test
+  public void testProjectMismatch() throws IOException {
+    when(gcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Bucket owner does not match the project");
+    DefaultBucket.tryCreateDefaultBucket(options);
+  }
+
+  @Test
+  public void regionFromZone() throws IOException {
+    assertEquals("us-central1", DefaultBucket.getRegionFromZone("us-central1-a"));
+    assertEquals("asia-east", DefaultBucket.getRegionFromZone("asia-east-a"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
new file mode 100644
index 0000000..23f0418
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.cloudresourcemanager.model.Project;
+import java.net.SocketTimeoutException;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/** Test case for {@link GcpProjectUtil}. */
+@RunWith(JUnit4.class)
+public class GcpProjectUtilTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private static CloudResourceManagerOptions crmOptionsWithTestCredential() {
+    CloudResourceManagerOptions pipelineOptions =
+        PipelineOptionsFactory.as(CloudResourceManagerOptions.class);
+    pipelineOptions.setGcpCredential(new TestCredential());
+    return pipelineOptions;
+  }
+
+  @Test
+  public void testGetProjectNumber() throws Exception {
+    CloudResourceManagerOptions pipelineOptions = crmOptionsWithTestCredential();
+    GcpProjectUtil projectUtil = pipelineOptions.getGcpProjectUtil();
+
+    CloudResourceManager.Projects mockProjects = Mockito.mock(
+        CloudResourceManager.Projects.class);
+    CloudResourceManager mockCrm = Mockito.mock(CloudResourceManager.class);
+    projectUtil.setCrmClient(mockCrm);
+
+    CloudResourceManager.Projects.Get mockProjectsGet =
+        Mockito.mock(CloudResourceManager.Projects.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+    Project project = new Project();
+    project.setProjectNumber(5L);
+
+    when(mockCrm.projects()).thenReturn(mockProjects);
+    when(mockProjects.get(any(String.class))).thenReturn(mockProjectsGet);
+    when(mockProjectsGet.execute())
+      .thenThrow(new SocketTimeoutException("SocketException"))
+      .thenReturn(project);
+
+    assertEquals(5L, projectUtil.getProjectNumber(
+        "foo", mockBackOff, new FastNanoClockAndSleeper()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
new file mode 100644
index 0000000..a29dd45
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ServiceLoader;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GcsIOChannelFactoryRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class GcsIOChannelFactoryRegistrarTest {
+
+  @Test
+  public void testServiceLoader() {
+    for (IOChannelFactoryRegistrar registrar
+        : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
+      if (registrar instanceof GcsIOChannelFactoryRegistrar) {
+        return;
+      }
+    }
+    fail("Expected to find " + GcsIOChannelFactoryRegistrar.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
new file mode 100644
index 0000000..7248b38
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GcsIOChannelFactoryTest}. */
+@RunWith(JUnit4.class)
+public class GcsIOChannelFactoryTest {
+  private GcsIOChannelFactory factory;
+
+  @Before
+  public void setUp() {
+    factory = GcsIOChannelFactory.fromOptions(PipelineOptionsFactory.as(GcsOptions.class));
+  }
+
+  @Test
+  public void testResolve() throws Exception {
+    assertEquals("gs://bucket/object", factory.resolve("gs://bucket", "object"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
new file mode 100644
index 0000000..dc36319
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link GcsPathValidator}. */
+@RunWith(JUnit4.class)
+public class GcsPathValidatorTest {
+  @Rule public ExpectedException expectedException = ExpectedException.none();
+
+  @Mock private GcsUtil mockGcsUtil;
+  private GcsPathValidator validator;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
+    GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+    options.setGcpCredential(new TestCredential());
+    options.setGcsUtil(mockGcsUtil);
+    validator = GcsPathValidator.fromOptions(options);
+  }
+
+  @Test
+  public void testValidFilePattern() {
+    validator.validateInputFilePatternSupported("gs://bucket/path");
+  }
+
+  @Test
+  public void testInvalidFilePattern() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Expected a valid 'gs://' path but was given '/local/path'");
+    validator.validateInputFilePatternSupported("/local/path");
+  }
+
+  @Test
+  public void testWhenBucketDoesNotExist() throws Exception {
+    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Could not find file gs://non-existent-bucket/location");
+    validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
+  }
+
+  @Test
+  public void testValidOutputPrefix() {
+    validator.validateOutputFilePrefixSupported("gs://bucket/path");
+  }
+
+  @Test
+  public void testInvalidOutputPrefix() {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Expected a valid 'gs://' path but was given '/local/path'");
+    validator.validateOutputFilePrefixSupported("/local/path");
+  }
+}