You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/18 23:19:28 UTC
[2/7] beam git commit: [BEAM-1871] Create new GCP core module package
and move several GCP related classes from beam-sdks-java-core over.
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
new file mode 100644
index 0000000..6fac6dc
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.client.util.BackOff;
+
+
+/**
+ * Implementation of {@link BackOff} that increases the back off period for each retry attempt
+ * using a randomization function that grows exponentially.
+ *
+ * <p>Example: The initial interval is .5 seconds and the maximum interval is 60 secs.
+ * For 14 tries the sequence will be (values in seconds):
+ *
+ * <pre>
+ * retry# retry_interval randomized_interval
+ * 1 0.5 [0.25, 0.75]
+ * 2 0.75 [0.375, 1.125]
+ * 3 1.125 [0.562, 1.687]
+ * 4 1.687 [0.8435, 2.53]
+ * 5 2.53 [1.265, 3.795]
+ * 6 3.795 [1.897, 5.692]
+ * 7 5.692 [2.846, 8.538]
+ * 8 8.538 [4.269, 12.807]
+ * 9 12.807 [6.403, 19.210]
+ * 10 28.832 [14.416, 43.248]
+ * 11 43.248 [21.624, 64.873]
+ * 12 60.0 [30.0, 90.0]
+ * 13 60.0 [30.0, 90.0]
+ * 14 60.0 [30.0, 90.0]
+ * </pre>
+ *
+ * <p>Implementation is not thread-safe.
+ */
+@Deprecated
+public class IntervalBoundedExponentialBackOff implements BackOff {
+ public static final double DEFAULT_MULTIPLIER = 1.5;
+ public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;
+ private final long maximumIntervalMillis;
+ private final long initialIntervalMillis;
+ private int currentAttempt;
+
+ public IntervalBoundedExponentialBackOff(long maximumIntervalMillis, long initialIntervalMillis) {
+ checkArgument(maximumIntervalMillis > 0, "Maximum interval must be greater than zero.");
+ checkArgument(initialIntervalMillis > 0, "Initial interval must be greater than zero.");
+ this.maximumIntervalMillis = maximumIntervalMillis;
+ this.initialIntervalMillis = initialIntervalMillis;
+ reset();
+ }
+
+ @Override
+ public void reset() {
+ currentAttempt = 1;
+ }
+
+ @Override
+ public long nextBackOffMillis() {
+ double currentIntervalMillis =
+ Math.min(
+ initialIntervalMillis * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1),
+ maximumIntervalMillis);
+ double randomOffset =
+ (Math.random() * 2 - 1) * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis;
+ currentAttempt += 1;
+ return Math.round(currentIntervalMillis + randomOffset);
+ }
+
+ public boolean atMaxInterval() {
+ return initialIntervalMillis * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1)
+ >= maximumIntervalMillis;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java
new file mode 100644
index 0000000..f703e4c
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.auth.Credentials;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * Construct an oauth credential to be used by the SDK and the SDK workers.
+ * Always returns a null Credential object.
+ */
+public class NoopCredentialFactory implements CredentialFactory {
+ private static final NoopCredentialFactory INSTANCE = new NoopCredentialFactory();
+ private static final NoopCredentials NOOP_CREDENTIALS = new NoopCredentials();
+
+ public static NoopCredentialFactory fromOptions(PipelineOptions options) {
+ return INSTANCE;
+ }
+
+ @Override
+ public Credentials getCredential() throws IOException {
+ return NOOP_CREDENTIALS;
+ }
+
+ private static class NoopCredentials extends Credentials {
+ @Override
+ public String getAuthenticationType() {
+ return null;
+ }
+
+ @Override
+ public Map<String, List<String>> getRequestMetadata(URI uri) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean hasRequestMetadata() {
+ return false;
+ }
+
+ @Override
+ public boolean hasRequestMetadataOnly() {
+ return false;
+ }
+
+ @Override
+ public void refresh() throws IOException {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java
new file mode 100644
index 0000000..4ed35c6
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/NullCredentialInitializer.java
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
+import java.io.IOException;
+
+/**
+ * A {@link HttpRequestInitializer} for requests that don't have credentials.
+ *
+ * <p>When the access is denied, it throws {@link IOException} with a detailed error message.
+ */
+public class NullCredentialInitializer implements HttpRequestInitializer {
+ private static final int ACCESS_DENIED = 401;
+ private static final String NULL_CREDENTIAL_REASON =
+ "Unable to get application default credentials. Please see "
+ + "https://developers.google.com/accounts/docs/application-default-credentials "
+ + "for details on how to specify credentials. This version of the SDK is "
+ + "dependent on the gcloud core component version 2015.02.05 or newer to "
+ + "be able to get credentials from the currently authorized user via gcloud auth.";
+
+ @Override
+ public void initialize(HttpRequest httpRequest) throws IOException {
+ httpRequest.setUnsuccessfulResponseHandler(new NullCredentialHttpUnsuccessfulResponseHandler());
+ }
+
+ private static class NullCredentialHttpUnsuccessfulResponseHandler
+ implements HttpUnsuccessfulResponseHandler {
+
+ @Override
+ public boolean handleResponse(
+ HttpRequest httpRequest,
+ HttpResponse httpResponse, boolean supportsRetry) throws IOException {
+ if (!httpResponse.isSuccessStatusCode() && httpResponse.getStatusCode() == ACCESS_DENIED) {
+ throwNullCredentialException();
+ }
+ return supportsRetry;
+ }
+ }
+
+ public static void throwNullCredentialException() {
+ throw new RuntimeException(NULL_CREDENTIAL_REASON);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
new file mode 100644
index 0000000..80c093b
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/Transport.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.storage.Storage;
+import com.google.auth.Credentials;
+import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.GeneralSecurityException;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PubsubOptions;
+
+/**
+ * Helpers for cloud communication.
+ */
+public class Transport {
+
+ private static class SingletonHelper {
+ /** Global instance of the JSON factory. */
+ private static final JsonFactory JSON_FACTORY;
+
+ /** Global instance of the HTTP transport. */
+ private static final HttpTransport HTTP_TRANSPORT;
+
+ static {
+ try {
+ JSON_FACTORY = JacksonFactory.getDefaultInstance();
+ HTTP_TRANSPORT = GoogleNetHttpTransport.newTrustedTransport();
+ } catch (GeneralSecurityException | IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static HttpTransport getTransport() {
+ return SingletonHelper.HTTP_TRANSPORT;
+ }
+
+ public static JsonFactory getJsonFactory() {
+ return SingletonHelper.JSON_FACTORY;
+ }
+
+ private static class ApiComponents {
+ public String rootUrl;
+ public String servicePath;
+
+ public ApiComponents(String root, String path) {
+ this.rootUrl = root;
+ this.servicePath = path;
+ }
+ }
+
+ private static ApiComponents apiComponentsFromUrl(String urlString) {
+ try {
+ URL url = new URL(urlString);
+ String rootUrl = url.getProtocol() + "://" + url.getHost()
+ + (url.getPort() > 0 ? ":" + url.getPort() : "");
+ return new ApiComponents(rootUrl, url.getPath());
+ } catch (MalformedURLException e) {
+ throw new RuntimeException("Invalid URL: " + urlString);
+ }
+ }
+
+ /**
+ * Returns a BigQuery client builder using the specified {@link BigQueryOptions}.
+ */
+ public static Bigquery.Builder
+ newBigQueryClient(BigQueryOptions options) {
+ return new Bigquery.Builder(getTransport(), getJsonFactory(),
+ chainHttpRequestInitializer(
+ options.getGcpCredential(),
+ // Do not log 404. It clutters the output and is possibly even required by the caller.
+ new RetryHttpRequestInitializer(ImmutableList.of(404))))
+ .setApplicationName(options.getAppName())
+ .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+ }
+
+ /**
+ * Returns a Pubsub client builder using the specified {@link PubsubOptions}.
+ *
+ * @deprecated Use an appropriate org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory
+ */
+ @Deprecated
+ public static Pubsub.Builder
+ newPubsubClient(PubsubOptions options) {
+ return new Pubsub.Builder(getTransport(), getJsonFactory(),
+ chainHttpRequestInitializer(
+ options.getGcpCredential(),
+ // Do not log 404. It clutters the output and is possibly even required by the caller.
+ new RetryHttpRequestInitializer(ImmutableList.of(404))))
+ .setRootUrl(options.getPubsubRootUrl())
+ .setApplicationName(options.getAppName())
+ .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+ }
+
+ /**
+ * Returns a CloudResourceManager client builder using the specified
+ * {@link CloudResourceManagerOptions}.
+ */
+ public static CloudResourceManager.Builder
+ newCloudResourceManagerClient(CloudResourceManagerOptions options) {
+ Credentials credentials = options.getGcpCredential();
+ if (credentials == null) {
+ NullCredentialInitializer.throwNullCredentialException();
+ }
+ return new CloudResourceManager.Builder(getTransport(), getJsonFactory(),
+ chainHttpRequestInitializer(
+ credentials,
+ // Do not log 404. It clutters the output and is possibly even required by the caller.
+ new RetryHttpRequestInitializer(ImmutableList.of(404))))
+ .setApplicationName(options.getAppName())
+ .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+ }
+
+ /**
+ * Returns a Cloud Storage client builder using the specified {@link GcsOptions}.
+ */
+ public static Storage.Builder
+ newStorageClient(GcsOptions options) {
+ String servicePath = options.getGcsEndpoint();
+ Storage.Builder storageBuilder = new Storage.Builder(getTransport(), getJsonFactory(),
+ chainHttpRequestInitializer(
+ options.getGcpCredential(),
+ // Do not log the code 404. Code up the stack will deal with 404's if needed, and
+ // logging it by default clutters the output during file staging.
+ new RetryHttpRequestInitializer(
+ ImmutableList.of(404), new UploadIdResponseInterceptor())))
+ .setApplicationName(options.getAppName())
+ .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+ if (servicePath != null) {
+ ApiComponents components = apiComponentsFromUrl(servicePath);
+ storageBuilder.setRootUrl(components.rootUrl);
+ storageBuilder.setServicePath(components.servicePath);
+ }
+ return storageBuilder;
+ }
+
+ private static HttpRequestInitializer chainHttpRequestInitializer(
+ Credentials credential, HttpRequestInitializer httpRequestInitializer) {
+ if (credential == null) {
+ return new ChainingHttpRequestInitializer(
+ new NullCredentialInitializer(), httpRequestInitializer);
+ } else {
+ return new ChainingHttpRequestInitializer(
+ new HttpCredentialsAdapter(credential),
+ httpRequestInitializer);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java
new file mode 100644
index 0000000..f8135e7
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Defines Google Cloud Platform component utilities that can be used by Beam runners. */
+package org.apache.beam.sdk.util;
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
new file mode 100644
index 0000000..37fb42d
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam;
+
+import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.beam.sdk.util.ApiSurface;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** API surface verification for Google Cloud Platform core components. */
+@RunWith(JUnit4.class)
+public class GcpCoreApiSurfaceTest {
+
+ @Test
+ public void testApiSurface() throws Exception {
+
+ @SuppressWarnings("unchecked")
+ final Set<String> allowed =
+ ImmutableSet.of(
+ "org.apache.beam",
+ "com.google.api.client",
+ "com.google.api.services.bigquery",
+ "com.google.api.services.cloudresourcemanager",
+ "com.google.api.services.pubsub",
+ "com.google.api.services.storage",
+ "com.google.auth",
+ "com.google.protobuf",
+ "com.fasterxml.jackson.annotation",
+ "com.fasterxml.jackson.core",
+ "com.fasterxml.jackson.databind",
+ "org.apache.avro",
+ "org.hamcrest",
+ // via DataflowMatchers
+ "org.codehaus.jackson",
+ // via Avro
+ "org.joda.time",
+ "org.junit");
+
+ assertThat(
+ ApiSurface.getSdkApiSurface(getClass().getClassLoader()), containsOnlyPackages(allowed));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
new file mode 100644
index 0000000..288383e
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.beam.sdk.options.GcpOptions.DefaultProjectFactory;
+import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GcpOptions}. */
+@RunWith(JUnit4.class)
+public class GcpOptionsTest {
+ @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testGetProjectFromCloudSdkConfigEnv() throws Exception {
+ Map<String, String> environment =
+ ImmutableMap.of("CLOUDSDK_CONFIG", tmpFolder.getRoot().getAbsolutePath());
+ assertEquals("test-project",
+ runGetProjectTest(tmpFolder.newFile("properties"), environment));
+ }
+
+ @Test
+ public void testGetProjectFromAppDataEnv() throws Exception {
+ Map<String, String> environment =
+ ImmutableMap.of("APPDATA", tmpFolder.getRoot().getAbsolutePath());
+ System.setProperty("os.name", "windows");
+ assertEquals("test-project",
+ runGetProjectTest(new File(tmpFolder.newFolder("gcloud"), "properties"),
+ environment));
+ }
+
+ @Test
+ public void testGetProjectFromUserHomeEnvOld() throws Exception {
+ Map<String, String> environment = ImmutableMap.of();
+ System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+ assertEquals("test-project",
+ runGetProjectTest(
+ new File(tmpFolder.newFolder(".config", "gcloud"), "properties"),
+ environment));
+ }
+
+ @Test
+ public void testGetProjectFromUserHomeEnv() throws Exception {
+ Map<String, String> environment = ImmutableMap.of();
+ System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+ assertEquals("test-project",
+ runGetProjectTest(
+ new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"),
+ environment));
+ }
+
+ @Test
+ public void testGetProjectFromUserHomeOldAndNewPrefersNew() throws Exception {
+ Map<String, String> environment = ImmutableMap.of();
+ System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+ makePropertiesFileWithProject(new File(tmpFolder.newFolder(".config", "gcloud"), "properties"),
+ "old-project");
+ assertEquals("test-project",
+ runGetProjectTest(
+ new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"),
+ environment));
+ }
+
+ @Test
+ public void testUnableToGetDefaultProject() throws Exception {
+ System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+ DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
+ when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.<String, String>of());
+ assertNull(projectFactory.create(PipelineOptionsFactory.create()));
+ }
+
+ @Test
+ public void testEmptyGcpTempLocation() throws Exception {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ options.setProject("");
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("--project is a required option");
+ options.getGcpTempLocation();
+ }
+
+ @Test
+ public void testDefaultGcpTempLocation() throws Exception {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ String tempLocation = "gs://bucket";
+ options.setTempLocation(tempLocation);
+ options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class);
+ assertEquals(tempLocation, options.getGcpTempLocation());
+ }
+
+ @Test
+ public void testDefaultGcpTempLocationInvalid() throws Exception {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ options.setTempLocation("file://");
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Error constructing default value for gcpTempLocation: tempLocation is not"
+ + " a valid GCS path");
+ options.getGcpTempLocation();
+ }
+
+ @Test
+ public void testDefaultGcpTempLocationDoesNotExist() {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ String tempLocation = "gs://does/not/exist";
+ options.setTempLocation(tempLocation);
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Error constructing default value for gcpTempLocation: tempLocation is not"
+ + " a valid GCS path");
+ thrown.expectCause(
+ hasMessage(containsString("Output path does not exist or is not writeable")));
+
+ options.getGcpTempLocation();
+ }
+
+ private static void makePropertiesFileWithProject(File path, String projectId)
+ throws IOException {
+ String properties = String.format("[core]%n"
+ + "account = test-account@google.com%n"
+ + "project = %s%n"
+ + "%n"
+ + "[dataflow]%n"
+ + "magic = true%n", projectId);
+ Files.write(properties, path, StandardCharsets.UTF_8);
+ }
+
+ private static String runGetProjectTest(File path, Map<String, String> environment)
+ throws Exception {
+ makePropertiesFileWithProject(path, "test-project");
+ DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
+ when(projectFactory.getEnvironment()).thenReturn(environment);
+ return projectFactory.create(PipelineOptionsFactory.create());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
new file mode 100644
index 0000000..dae7208
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.bigquery.Bigquery.Datasets.Delete;
+import com.google.api.services.storage.Storage;
+import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer;
+import org.apache.beam.sdk.util.TestCredential;
+import org.apache.beam.sdk.util.Transport;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GoogleApiDebugOptions}. */
+@RunWith(JUnit4.class)
+public class GoogleApiDebugOptionsTest {
+ private static final String STORAGE_GET_TRACE =
+ "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}";
+ private static final String STORAGE_GET_AND_LIST_TRACE =
+ "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\","
+ + "\"Objects.List\":\"ListTraceDestination\"}";
+ private static final String STORAGE_TRACE = "--googleApiTrace={\"Storage\":\"TraceDestination\"}";
+
+ @Test
+ public void testWhenTracingMatches() throws Exception {
+ String[] args = new String[] {STORAGE_GET_TRACE};
+ GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+ assertNotNull(options.getGoogleApiTrace());
+
+ Storage.Objects.Get request =
+ Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+ assertEquals("GetTraceDestination", request.get("$trace"));
+ }
+
+ @Test
+ public void testWhenTracingDoesNotMatch() throws Exception {
+ String[] args = new String[] {STORAGE_GET_TRACE};
+ GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+
+ assertNotNull(options.getGoogleApiTrace());
+
+ Storage.Objects.List request =
+ Transport.newStorageClient(options).build().objects().list("testProjectId");
+ assertNull(request.get("$trace"));
+ }
+
+ @Test
+ public void testWithMultipleTraces() throws Exception {
+ String[] args = new String[] {STORAGE_GET_AND_LIST_TRACE};
+ GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+
+ assertNotNull(options.getGoogleApiTrace());
+
+ Storage.Objects.Get getRequest =
+ Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+ assertEquals("GetTraceDestination", getRequest.get("$trace"));
+
+ Storage.Objects.List listRequest =
+ Transport.newStorageClient(options).build().objects().list("testProjectId");
+ assertEquals("ListTraceDestination", listRequest.get("$trace"));
+ }
+
+ @Test
+ public void testMatchingAllCalls() throws Exception {
+ String[] args = new String[] {STORAGE_TRACE};
+ GcsOptions options =
+ PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+
+ assertNotNull(options.getGoogleApiTrace());
+
+ Storage.Objects.Get getRequest =
+ Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+ assertEquals("TraceDestination", getRequest.get("$trace"));
+
+ Storage.Objects.List listRequest =
+ Transport.newStorageClient(options).build().objects().list("testProjectId");
+ assertEquals("TraceDestination", listRequest.get("$trace"));
+ }
+
+ @Test
+ public void testMatchingAgainstClient() throws Exception {
+ GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+ options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor(
+ Transport.newStorageClient(options).build(), "TraceDestination"));
+
+ Storage.Objects.Get getRequest =
+ Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+ assertEquals("TraceDestination", getRequest.get("$trace"));
+
+ Delete deleteRequest = Transport.newBigQueryClient(options.as(BigQueryOptions.class))
+ .build().datasets().delete("testProjectId", "testDatasetId");
+ assertNull(deleteRequest.get("$trace"));
+ }
+
+ @Test
+ public void testMatchingAgainstRequestType() throws Exception {
+ GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+ options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor(
+ Transport.newStorageClient(options).build().objects()
+ .get("aProjectId", "aObjectId"), "TraceDestination"));
+
+ Storage.Objects.Get getRequest =
+ Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId");
+ assertEquals("TraceDestination", getRequest.get("$trace"));
+
+ Storage.Objects.List listRequest =
+ Transport.newStorageClient(options).build().objects().list("testProjectId");
+ assertNull(listRequest.get("$trace"));
+ }
+
+ @Test
+ public void testDeserializationAndSerializationOfGoogleApiTracer() throws Exception {
+ String serializedValue = "{\"Api\":\"Token\"}";
+ ObjectMapper objectMapper = new ObjectMapper();
+ assertEquals(serializedValue,
+ objectMapper.writeValueAsString(
+ objectMapper.readValue(serializedValue, GoogleApiTracer.class)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
new file mode 100644
index 0000000..3b35856
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/testing/BigqueryMatcherTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.QueryRequest;
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.api.services.bigquery.model.TableCell;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.math.BigInteger;
+import org.apache.beam.sdk.PipelineResult;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link BigqueryMatcher}.
+ */
+@RunWith(JUnit4.class)
+public class BigqueryMatcherTest {
+ private final String appName = "test-app";
+ private final String projectId = "test-project";
+ private final String query = "test-query";
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+ @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
+ @Mock private Bigquery mockBigqueryClient;
+ @Mock private Bigquery.Jobs mockJobs;
+ @Mock private Bigquery.Jobs.Query mockQuery;
+ @Mock private PipelineResult mockResult;
+
+ @Before
+ public void setUp() throws IOException {
+ MockitoAnnotations.initMocks(this);
+ when(mockBigqueryClient.jobs()).thenReturn(mockJobs);
+ when(mockJobs.query(anyString(), any(QueryRequest.class))).thenReturn(mockQuery);
+ }
+
+ @Test
+ public void testBigqueryMatcherThatSucceeds() throws Exception {
+ BigqueryMatcher matcher = spy(
+ new BigqueryMatcher(
+ appName, projectId, query, "9bb47f5c90d2a99cad526453dff5ed5ec74650dc"));
+ doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
+ when(mockQuery.execute()).thenReturn(createResponseContainingTestData());
+
+ assertThat(mockResult, matcher);
+ verify(matcher).newBigqueryClient(eq(appName));
+ verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
+ }
+
+ @Test
+ public void testBigqueryMatcherFailsForChecksumMismatch() throws IOException {
+ BigqueryMatcher matcher = spy(
+ new BigqueryMatcher(appName, projectId, query, "incorrect-checksum"));
+ doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
+ when(mockQuery.execute()).thenReturn(createResponseContainingTestData());
+
+ thrown.expect(AssertionError.class);
+ thrown.expectMessage("Total number of rows are: 1");
+ thrown.expectMessage("abc");
+ try {
+ assertThat(mockResult, matcher);
+ } finally {
+ verify(matcher).newBigqueryClient(eq(appName));
+ verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
+ }
+ }
+
+ @Test
+ public void testBigqueryMatcherFailsWhenQueryJobNotComplete() throws Exception {
+ BigqueryMatcher matcher = spy(
+ new BigqueryMatcher(appName, projectId, query, "some-checksum"));
+ doReturn(mockBigqueryClient).when(matcher).newBigqueryClient(anyString());
+ when(mockQuery.execute()).thenReturn(new QueryResponse().setJobComplete(false));
+
+ thrown.expect(AssertionError.class);
+ thrown.expectMessage("The query job hasn't completed.");
+ thrown.expectMessage("jobComplete=false");
+ try {
+ assertThat(mockResult, matcher);
+ } finally {
+ verify(matcher).newBigqueryClient(eq(appName));
+ verify(mockJobs).query(eq(projectId), eq(new QueryRequest().setQuery(query)));
+ }
+ }
+
+ @Test
+ public void testQueryWithRetriesWhenServiceFails() throws Exception {
+ BigqueryMatcher matcher = spy(
+ new BigqueryMatcher(appName, projectId, query, "some-checksum"));
+ when(mockQuery.execute()).thenThrow(new IOException());
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Unable to get BigQuery response after retrying");
+ try {
+ matcher.queryWithRetries(
+ mockBigqueryClient,
+ new QueryRequest(),
+ fastClock,
+ BigqueryMatcher.BACKOFF_FACTORY.backoff());
+ } finally {
+ verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
+ .query(eq(projectId), eq(new QueryRequest()));
+ }
+ }
+
+ @Test
+ public void testQueryWithRetriesWhenQueryResponseNull() throws Exception {
+ BigqueryMatcher matcher = spy(
+ new BigqueryMatcher(appName, projectId, query, "some-checksum"));
+ when(mockQuery.execute()).thenReturn(null);
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Unable to get BigQuery response after retrying");
+ try {
+ matcher.queryWithRetries(
+ mockBigqueryClient,
+ new QueryRequest(),
+ fastClock,
+ BigqueryMatcher.BACKOFF_FACTORY.backoff());
+ } finally {
+ verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES))
+ .query(eq(projectId), eq(new QueryRequest()));
+ }
+ }
+
+ private QueryResponse createResponseContainingTestData() {
+ TableCell field1 = new TableCell();
+ field1.setV("abc");
+ TableCell field2 = new TableCell();
+ field2.setV("2");
+ TableCell field3 = new TableCell();
+ field3.setV("testing BigQuery matcher.");
+ TableRow row = new TableRow();
+ row.setF(Lists.newArrayList(field1, field2, field3));
+
+ QueryResponse response = new QueryResponse();
+ response.setJobComplete(true);
+ response.setRows(Lists.newArrayList(row));
+ response.setTotalRows(BigInteger.ONE);
+ return response;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
new file mode 100644
index 0000000..395e1f3
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.storage.model.Bucket;
+import java.io.IOException;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.MockitoAnnotations;
+import org.mockito.MockitoAnnotations.Mock;
+
+/** Tests for DefaultBucket. */
+@RunWith(JUnit4.class)
+public class DefaultBucketTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private PipelineOptions options;
+ @Mock
+ private GcsUtil gcsUtil;
+ @Mock
+ private GcpProjectUtil gcpUtil;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ options = PipelineOptionsFactory.create();
+ options.as(GcsOptions.class).setGcsUtil(gcsUtil);
+ options.as(CloudResourceManagerOptions.class).setGcpProjectUtil(gcpUtil);
+ options.as(GcpOptions.class).setProject("foo");
+ options.as(GcpOptions.class).setZone("us-north1-a");
+ }
+
+ @Test
+ public void testCreateBucket() {
+ String bucket = DefaultBucket.tryCreateDefaultBucket(options);
+ assertEquals("gs://dataflow-staging-us-north1-0", bucket);
+ }
+
+ @Test
+ public void testCreateBucketProjectLookupFails() throws IOException {
+ when(gcpUtil.getProjectNumber("foo")).thenThrow(new IOException("badness"));
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Unable to verify project");
+ DefaultBucket.tryCreateDefaultBucket(options);
+ }
+
+ @Test
+ public void testCreateBucketCreateBucketFails() throws IOException {
+ doThrow(new IOException("badness")).when(
+ gcsUtil).createBucket(any(String.class), any(Bucket.class));
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Unable create default bucket");
+ DefaultBucket.tryCreateDefaultBucket(options);
+ }
+
+ @Test
+ public void testCannotGetBucketOwner() throws IOException {
+ when(gcsUtil.bucketOwner(any(GcsPath.class)))
+ .thenThrow(new IOException("badness"));
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Unable to determine the owner");
+ DefaultBucket.tryCreateDefaultBucket(options);
+ }
+
+ @Test
+ public void testProjectMismatch() throws IOException {
+ when(gcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Bucket owner does not match the project");
+ DefaultBucket.tryCreateDefaultBucket(options);
+ }
+
+ @Test
+ public void regionFromZone() throws IOException {
+ assertEquals("us-central1", DefaultBucket.getRegionFromZone("us-central1-a"));
+ assertEquals("asia-east", DefaultBucket.getRegionFromZone("asia-east-a"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
new file mode 100644
index 0000000..23f0418
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.cloudresourcemanager.model.Project;
+import java.net.SocketTimeoutException;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/** Test case for {@link GcpProjectUtil}. */
+@RunWith(JUnit4.class)
+public class GcpProjectUtilTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private static CloudResourceManagerOptions crmOptionsWithTestCredential() {
+ CloudResourceManagerOptions pipelineOptions =
+ PipelineOptionsFactory.as(CloudResourceManagerOptions.class);
+ pipelineOptions.setGcpCredential(new TestCredential());
+ return pipelineOptions;
+ }
+
+ @Test
+ public void testGetProjectNumber() throws Exception {
+ CloudResourceManagerOptions pipelineOptions = crmOptionsWithTestCredential();
+ GcpProjectUtil projectUtil = pipelineOptions.getGcpProjectUtil();
+
+ CloudResourceManager.Projects mockProjects = Mockito.mock(
+ CloudResourceManager.Projects.class);
+ CloudResourceManager mockCrm = Mockito.mock(CloudResourceManager.class);
+ projectUtil.setCrmClient(mockCrm);
+
+ CloudResourceManager.Projects.Get mockProjectsGet =
+ Mockito.mock(CloudResourceManager.Projects.Get.class);
+
+ BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+ Project project = new Project();
+ project.setProjectNumber(5L);
+
+ when(mockCrm.projects()).thenReturn(mockProjects);
+ when(mockProjects.get(any(String.class))).thenReturn(mockProjectsGet);
+ when(mockProjectsGet.execute())
+ .thenThrow(new SocketTimeoutException("SocketException"))
+ .thenReturn(project);
+
+ assertEquals(5L, projectUtil.getProjectNumber(
+ "foo", mockBackOff, new FastNanoClockAndSleeper()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
new file mode 100644
index 0000000..a29dd45
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrarTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.ServiceLoader;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link GcsIOChannelFactoryRegistrar}.
+ */
+@RunWith(JUnit4.class)
+public class GcsIOChannelFactoryRegistrarTest {
+
+ @Test
+ public void testServiceLoader() {
+ for (IOChannelFactoryRegistrar registrar
+ : Lists.newArrayList(ServiceLoader.load(IOChannelFactoryRegistrar.class).iterator())) {
+ if (registrar instanceof GcsIOChannelFactoryRegistrar) {
+ return;
+ }
+ }
+ fail("Expected to find " + GcsIOChannelFactoryRegistrar.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
new file mode 100644
index 0000000..7248b38
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsIOChannelFactoryTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GcsIOChannelFactoryTest}. */
+@RunWith(JUnit4.class)
+public class GcsIOChannelFactoryTest {
+ private GcsIOChannelFactory factory;
+
+ @Before
+ public void setUp() {
+ factory = GcsIOChannelFactory.fromOptions(PipelineOptionsFactory.as(GcsOptions.class));
+ }
+
+ @Test
+ public void testResolve() throws Exception {
+ assertEquals("gs://bucket/object", factory.resolve("gs://bucket", "object"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/be92f595/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
new file mode 100644
index 0000000..dc36319
--- /dev/null
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcsPathValidatorTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link GcsPathValidator}. */
+@RunWith(JUnit4.class)
+public class GcsPathValidatorTest {
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ @Mock private GcsUtil mockGcsUtil;
+ private GcsPathValidator validator;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
+ GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
+ options.setGcpCredential(new TestCredential());
+ options.setGcsUtil(mockGcsUtil);
+ validator = GcsPathValidator.fromOptions(options);
+ }
+
+ @Test
+ public void testValidFilePattern() {
+ validator.validateInputFilePatternSupported("gs://bucket/path");
+ }
+
+ @Test
+ public void testInvalidFilePattern() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Expected a valid 'gs://' path but was given '/local/path'");
+ validator.validateInputFilePatternSupported("/local/path");
+ }
+
+ @Test
+ public void testWhenBucketDoesNotExist() throws Exception {
+ when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Could not find file gs://non-existent-bucket/location");
+ validator.validateInputFilePatternSupported("gs://non-existent-bucket/location");
+ }
+
+ @Test
+ public void testValidOutputPrefix() {
+ validator.validateOutputFilePrefixSupported("gs://bucket/path");
+ }
+
+ @Test
+ public void testInvalidOutputPrefix() {
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(
+ "Expected a valid 'gs://' path but was given '/local/path'");
+ validator.validateOutputFilePrefixSupported("/local/path");
+ }
+}