You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/04/29 16:12:13 UTC
[1/2] beam git commit: [BEAM-1871] Hide internal implementation
details of how we create a DefaultBucket for GCP Temp Location
Repository: beam
Updated Branches:
refs/heads/master 47821ad69 -> f29444bf8
[BEAM-1871] Hide internal implementation details of how we create a DefaultBucket for GCP Temp Location
Moved relevant contents of GcpProjectUtil and DefaultProject into GcpOptions.GcpTempLocation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c1b35f46
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c1b35f46
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c1b35f46
Branch: refs/heads/master
Commit: c1b35f46ddd321b29132606d3633d45ff134ff6c
Parents: 47821ad
Author: Luke Cwik <lc...@google.com>
Authored: Thu Apr 27 13:50:00 2017 -0700
Committer: Lukasz Cwik <lc...@google.com>
Committed: Sat Apr 29 09:06:12 2017 -0700
----------------------------------------------------------------------
.../options/CloudResourceManagerOptions.java | 14 -
.../sdk/extensions/gcp/options/GcpOptions.java | 124 ++++++-
.../org/apache/beam/sdk/util/DefaultBucket.java | 105 ------
.../apache/beam/sdk/util/GcpProjectUtil.java | 106 ------
.../extensions/gcp/options/GcpOptionsTest.java | 325 ++++++++++++-------
.../apache/beam/sdk/util/DefaultBucketTest.java | 112 -------
.../beam/sdk/util/GcpProjectUtilTest.java | 77 -----
7 files changed, 335 insertions(+), 528 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b35f46/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java
index 68432cf..87557e5 100644
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java
@@ -17,14 +17,10 @@
*/
package org.apache.beam.sdk.extensions.gcp.options;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.util.GcpProjectUtil;
/**
* Properties needed when using Google CloudResourceManager with the Apache Beam SDK.
@@ -33,14 +29,4 @@ import org.apache.beam.sdk.util.GcpProjectUtil;
+ "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.")
public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions,
PipelineOptions, StreamingOptions {
- /**
- * The GcpProjectUtil instance that should be used to communicate with Google Cloud Storage.
- */
- @JsonIgnore
- @Description("The GcpProjectUtil instance that should be used to communicate"
- + " with Google Cloud Resource Manager.")
- @Default.InstanceFactory(GcpProjectUtil.GcpProjectUtilFactory.class)
- @Hidden
- GcpProjectUtil getGcpProjectUtil();
- void setGcpProjectUtil(GcpProjectUtil value);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b35f46/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
index 09904b6..b2a83e9 100644
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
+++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java
@@ -17,15 +17,24 @@
*/
package org.apache.beam.sdk.extensions.gcp.options;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.cloudresourcemanager.model.Project;
+import com.google.api.services.storage.model.Bucket;
import com.google.auth.Credentials;
+import com.google.cloud.hadoop.util.ResilientOperation;
+import com.google.cloud.hadoop.util.RetryDeterminer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.nio.file.FileAlreadyExistsException;
import java.security.GeneralSecurityException;
import java.util.Locale;
import java.util.Map;
@@ -38,9 +47,12 @@ import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.DefaultBucket;
+import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.PathValidator;
+import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -207,13 +219,18 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
* Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location.
*/
class GcpTempLocationFactory implements DefaultValueFactory<String> {
+ private static final FluentBackoff BACKOFF_FACTORY =
+ FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
+ static final String DEFAULT_REGION = "us-central1";
+ static final Logger LOG = LoggerFactory.getLogger(GcpTempLocationFactory.class);
@Override
@Nullable
public String create(PipelineOptions options) {
String tempLocation = options.getTempLocation();
if (isNullOrEmpty(tempLocation)) {
- tempLocation = DefaultBucket.tryCreateDefaultBucket(options);
+ tempLocation = tryCreateDefaultBucket(options, Transport.newCloudResourceManagerClient(
+ options.as(CloudResourceManagerOptions.class)).build());
options.setTempLocation(tempLocation);
} else {
try {
@@ -227,5 +244,108 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
}
return tempLocation;
}
+
+ /**
+ * Creates a default bucket or verifies the existence and proper access control
+ * of an existing default bucket. Returns the location if successful.
+ */
+ @VisibleForTesting
+ static String tryCreateDefaultBucket(
+ PipelineOptions options, CloudResourceManager crmClient) {
+ GcsOptions gcpOptions = options.as(GcsOptions.class);
+
+ final String projectId = gcpOptions.getProject();
+ checkArgument(!isNullOrEmpty(projectId),
+ "--project is a required option.");
+
+ // Look up the project number, to create a default bucket with a stable
+ // name with no special characters.
+ long projectNumber = 0L;
+ try {
+ projectNumber = getProjectNumber(projectId, crmClient);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to verify project with ID " + projectId, e);
+ }
+ String region = DEFAULT_REGION;
+ if (!isNullOrEmpty(gcpOptions.getZone())) {
+ region = getRegionFromZone(gcpOptions.getZone());
+ }
+ final String bucketName =
+ "dataflow-staging-" + region + "-" + projectNumber;
+ LOG.info("No staging location provided, attempting to use default bucket: {}",
+ bucketName);
+ Bucket bucket = new Bucket()
+ .setName(bucketName)
+ .setLocation(region);
+ // Always try to create the bucket before checking access, so that we do not
+ // race with other pipelines that may be attempting to do the same thing.
+ try {
+ gcpOptions.getGcsUtil().createBucket(projectId, bucket);
+ } catch (FileAlreadyExistsException e) {
+ LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable create default bucket.", e);
+ }
+
+ // Once the bucket is expected to exist, verify that it is correctly owned
+ // by the project executing the job.
+ try {
+ long owner = gcpOptions.getGcsUtil().bucketOwner(
+ GcsPath.fromComponents(bucketName, ""));
+ checkArgument(
+ owner == projectNumber,
+ "Bucket owner does not match the project from --project:"
+ + " %s vs. %s", owner, projectNumber);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Unable to determine the owner of the default bucket at gs://" + bucketName, e);
+ }
+ return "gs://" + bucketName;
+ }
+
+ /**
+ * Returns the project number or throws an exception if the project does not
+ * exist or has other access exceptions.
+ */
+ private static long getProjectNumber(
+ String projectId,
+ CloudResourceManager crmClient) throws IOException {
+ return getProjectNumber(
+ projectId,
+ crmClient,
+ BACKOFF_FACTORY.backoff(),
+ Sleeper.DEFAULT);
+ }
+
+ /**
+ * Returns the project number or throws an error if the project does not
+ * exist or has other access errors.
+ */
+ private static long getProjectNumber(
+ String projectId,
+ CloudResourceManager crmClient,
+ BackOff backoff,
+ Sleeper sleeper) throws IOException {
+ CloudResourceManager.Projects.Get getProject =
+ crmClient.projects().get(projectId);
+ try {
+ Project project = ResilientOperation.retry(
+ ResilientOperation.getGoogleRequestCallable(getProject),
+ backoff,
+ RetryDeterminer.SOCKET_ERRORS,
+ IOException.class,
+ sleeper);
+ return project.getProjectNumber();
+ } catch (Exception e) {
+ throw new IOException("Unable to get project number", e);
+ }
+ }
+
+ @VisibleForTesting
+ static String getRegionFromZone(String zone) {
+ String[] zoneParts = zone.split("-");
+ checkArgument(zoneParts.length >= 2, "Invalid zone provided: %s", zone);
+ return zoneParts[0] + "-" + zoneParts[1];
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b35f46/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
deleted file mode 100644
index 6e7298c..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Strings.isNullOrEmpty;
-
-import com.google.api.services.storage.model.Bucket;
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.nio.file.FileAlreadyExistsException;
-import org.apache.beam.sdk.extensions.gcp.options.CloudResourceManagerOptions;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility class for handling default GCS buckets.
- */
-public class DefaultBucket {
- static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
-
- static final String DEFAULT_REGION = "us-central1";
-
- /**
- * Creates a default bucket or verifies the existence and proper access control
- * of an existing default bucket. Returns the location if successful.
- */
- public static String tryCreateDefaultBucket(PipelineOptions options) {
- GcsOptions gcpOptions = options.as(GcsOptions.class);
-
- final String projectId = gcpOptions.getProject();
- checkArgument(!isNullOrEmpty(projectId),
- "--project is a required option.");
-
- // Look up the project number, to create a default bucket with a stable
- // name with no special characters.
- long projectNumber = 0L;
- try {
- projectNumber = gcpOptions.as(CloudResourceManagerOptions.class)
- .getGcpProjectUtil().getProjectNumber(projectId);
- } catch (IOException e) {
- throw new RuntimeException("Unable to verify project with ID " + projectId, e);
- }
- String region = DEFAULT_REGION;
- if (!isNullOrEmpty(gcpOptions.getZone())) {
- region = getRegionFromZone(gcpOptions.getZone());
- }
- final String bucketName =
- "dataflow-staging-" + region + "-" + projectNumber;
- LOG.info("No staging location provided, attempting to use default bucket: {}",
- bucketName);
- Bucket bucket = new Bucket()
- .setName(bucketName)
- .setLocation(region);
- // Always try to create the bucket before checking access, so that we do not
- // race with other pipelines that may be attempting to do the same thing.
- try {
- gcpOptions.getGcsUtil().createBucket(projectId, bucket);
- } catch (FileAlreadyExistsException e) {
- LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName);
- } catch (IOException e) {
- throw new RuntimeException("Unable create default bucket.", e);
- }
-
- // Once the bucket is expected to exist, verify that it is correctly owned
- // by the project executing the job.
- try {
- long owner = gcpOptions.getGcsUtil().bucketOwner(
- GcsPath.fromComponents(bucketName, ""));
- checkArgument(
- owner == projectNumber,
- "Bucket owner does not match the project from --project:"
- + " %s vs. %s", owner, projectNumber);
- } catch (IOException e) {
- throw new RuntimeException(
- "Unable to determine the owner of the default bucket at gs://" + bucketName, e);
- }
- return "gs://" + bucketName;
- }
-
- @VisibleForTesting
- static String getRegionFromZone(String zone) {
- String[] zoneParts = zone.split("-");
- checkArgument(zoneParts.length >= 2, "Invalid zone provided: %s", zone);
- return zoneParts[0] + "-" + zoneParts[1];
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b35f46/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
deleted file mode 100644
index 02b402a..0000000
--- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.cloudresourcemanager.CloudResourceManager;
-import com.google.api.services.cloudresourcemanager.model.Project;
-import com.google.cloud.hadoop.util.ResilientOperation;
-import com.google.cloud.hadoop.util.RetryDeterminer;
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import org.apache.beam.sdk.extensions.gcp.options.CloudResourceManagerOptions;
-import org.apache.beam.sdk.options.DefaultValueFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provides operations on Google Cloud Platform Projects.
- */
-public class GcpProjectUtil {
- /**
- * A {@link DefaultValueFactory} able to create a {@link GcpProjectUtil} using
- * any transport flags specified on the {@link PipelineOptions}.
- */
- public static class GcpProjectUtilFactory implements DefaultValueFactory<GcpProjectUtil> {
- /**
- * Returns an instance of {@link GcpProjectUtil} based on the
- * {@link PipelineOptions}.
- */
- @Override
- public GcpProjectUtil create(PipelineOptions options) {
- LOG.debug("Creating new GcpProjectUtil");
- CloudResourceManagerOptions crmOptions = options.as(CloudResourceManagerOptions.class);
- return new GcpProjectUtil(
- Transport.newCloudResourceManagerClient(crmOptions).build());
- }
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(GcpProjectUtil.class);
-
- private static final FluentBackoff BACKOFF_FACTORY =
- FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
-
- /** Client for the CRM API. */
- private CloudResourceManager crmClient;
-
- private GcpProjectUtil(CloudResourceManager crmClient) {
- this.crmClient = crmClient;
- }
-
- // Use this only for testing purposes.
- @VisibleForTesting
- void setCrmClient(CloudResourceManager crmClient) {
- this.crmClient = crmClient;
- }
-
- /**
- * Returns the project number or throws an exception if the project does not
- * exist or has other access exceptions.
- */
- public long getProjectNumber(String projectId) throws IOException {
- return getProjectNumber(
- projectId,
- BACKOFF_FACTORY.backoff(),
- Sleeper.DEFAULT);
- }
-
- /**
- * Returns the project number or throws an error if the project does not
- * exist or has other access errors.
- */
- @VisibleForTesting
- long getProjectNumber(String projectId, BackOff backoff, Sleeper sleeper) throws IOException {
- CloudResourceManager.Projects.Get getProject =
- crmClient.projects().get(projectId);
- try {
- Project project = ResilientOperation.retry(
- ResilientOperation.getGoogleRequestCallable(getProject),
- backoff,
- RetryDeterminer.SOCKET_ERRORS,
- IOException.class,
- sleeper);
- return project.getProjectNumber();
- } catch (Exception e) {
- throw new IOException("Unable to get project number", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b35f46/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
index d334359..68b3818 100644
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
+++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java
@@ -21,152 +21,253 @@ import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects.Get;
+import com.google.api.services.cloudresourcemanager.model.Project;
+import com.google.api.services.storage.model.Bucket;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
+import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.DefaultProjectFactory;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.GcpTempLocationFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.RestoreSystemProperties;
+import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
/** Tests for {@link GcpOptions}. */
-@RunWith(JUnit4.class)
+@RunWith(Enclosed.class)
public class GcpOptionsTest {
- @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
- @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
- @Rule public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void testGetProjectFromCloudSdkConfigEnv() throws Exception {
- Map<String, String> environment =
- ImmutableMap.of("CLOUDSDK_CONFIG", tmpFolder.getRoot().getAbsolutePath());
- assertEquals("test-project",
- runGetProjectTest(tmpFolder.newFile("properties"), environment));
- }
- @Test
- public void testGetProjectFromAppDataEnv() throws Exception {
- Map<String, String> environment =
- ImmutableMap.of("APPDATA", tmpFolder.getRoot().getAbsolutePath());
- System.setProperty("os.name", "windows");
- assertEquals("test-project",
- runGetProjectTest(new File(tmpFolder.newFolder("gcloud"), "properties"),
- environment));
- }
+ /** Tests for the majority of methods. */
+ @RunWith(JUnit4.class)
+ public static class Common {
+ @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties();
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule public ExpectedException thrown = ExpectedException.none();
- @Test
- public void testGetProjectFromUserHomeEnvOld() throws Exception {
- Map<String, String> environment = ImmutableMap.of();
- System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
- assertEquals("test-project",
- runGetProjectTest(
- new File(tmpFolder.newFolder(".config", "gcloud"), "properties"),
- environment));
- }
+ @Test
+ public void testGetProjectFromCloudSdkConfigEnv() throws Exception {
+ Map<String, String> environment =
+ ImmutableMap.of("CLOUDSDK_CONFIG", tmpFolder.getRoot().getAbsolutePath());
+ assertEquals("test-project",
+ runGetProjectTest(tmpFolder.newFile("properties"), environment));
+ }
- @Test
- public void testGetProjectFromUserHomeEnv() throws Exception {
- Map<String, String> environment = ImmutableMap.of();
- System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
- assertEquals("test-project",
- runGetProjectTest(
- new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"),
- environment));
- }
+ @Test
+ public void testGetProjectFromAppDataEnv() throws Exception {
+ Map<String, String> environment =
+ ImmutableMap.of("APPDATA", tmpFolder.getRoot().getAbsolutePath());
+ System.setProperty("os.name", "windows");
+ assertEquals("test-project",
+ runGetProjectTest(new File(tmpFolder.newFolder("gcloud"), "properties"),
+ environment));
+ }
- @Test
- public void testGetProjectFromUserHomeOldAndNewPrefersNew() throws Exception {
- Map<String, String> environment = ImmutableMap.of();
- System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
- makePropertiesFileWithProject(new File(tmpFolder.newFolder(".config", "gcloud"), "properties"),
- "old-project");
- assertEquals("test-project",
- runGetProjectTest(
- new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"),
- environment));
- }
+ @Test
+ public void testGetProjectFromUserHomeEnvOld() throws Exception {
+ Map<String, String> environment = ImmutableMap.of();
+ System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+ assertEquals("test-project",
+ runGetProjectTest(
+ new File(tmpFolder.newFolder(".config", "gcloud"), "properties"),
+ environment));
+ }
- @Test
- public void testUnableToGetDefaultProject() throws Exception {
- System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
- DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
- when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.<String, String>of());
- assertNull(projectFactory.create(PipelineOptionsFactory.create()));
- }
+ @Test
+ public void testGetProjectFromUserHomeEnv() throws Exception {
+ Map<String, String> environment = ImmutableMap.of();
+ System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+ assertEquals("test-project", runGetProjectTest(
+ new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"),
+ environment));
+ }
- @Test
- public void testEmptyGcpTempLocation() throws Exception {
- GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
- options.setProject("");
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("--project is a required option");
- options.getGcpTempLocation();
- }
+ @Test
+ public void testGetProjectFromUserHomeOldAndNewPrefersNew() throws Exception {
+ Map<String, String> environment = ImmutableMap.of();
+ System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+ makePropertiesFileWithProject(
+ new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), "old-project");
+ assertEquals("test-project", runGetProjectTest(
+ new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"),
+ environment));
+ }
- @Test
- public void testDefaultGcpTempLocation() throws Exception {
- GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
- String tempLocation = "gs://bucket";
- options.setTempLocation(tempLocation);
- options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class);
- assertEquals(tempLocation, options.getGcpTempLocation());
- }
+ @Test
+ public void testUnableToGetDefaultProject() throws Exception {
+ System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath());
+ DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
+ when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.<String, String>of());
+ assertNull(projectFactory.create(PipelineOptionsFactory.create()));
+ }
- @Test
- public void testDefaultGcpTempLocationInvalid() throws Exception {
- GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
- options.setTempLocation("file://");
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(
- "Error constructing default value for gcpTempLocation: tempLocation is not"
- + " a valid GCS path");
- options.getGcpTempLocation();
- }
+ @Test
+ public void testEmptyGcpTempLocation() throws Exception {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ options.setGcpCredential(new TestCredential());
+ options.setProject("");
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("--project is a required option");
+ options.getGcpTempLocation();
+ }
- @Test
- public void testDefaultGcpTempLocationDoesNotExist() {
- GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
- String tempLocation = "gs://does/not/exist";
- options.setTempLocation(tempLocation);
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(
- "Error constructing default value for gcpTempLocation: tempLocation is not"
- + " a valid GCS path");
- thrown.expectCause(
- hasMessage(containsString("Output path does not exist or is not writeable")));
-
- options.getGcpTempLocation();
- }
+ @Test
+ public void testDefaultGcpTempLocation() throws Exception {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ String tempLocation = "gs://bucket";
+ options.setTempLocation(tempLocation);
+ options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class);
+ assertEquals(tempLocation, options.getGcpTempLocation());
+ }
+
+ @Test
+ public void testDefaultGcpTempLocationInvalid() throws Exception {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ options.setTempLocation("file://");
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Error constructing default value for gcpTempLocation: tempLocation is not"
+ + " a valid GCS path");
+ options.getGcpTempLocation();
+ }
+
+ @Test
+ public void testDefaultGcpTempLocationDoesNotExist() {
+ GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ String tempLocation = "gs://does/not/exist";
+ options.setTempLocation(tempLocation);
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Error constructing default value for gcpTempLocation: tempLocation is not"
+ + " a valid GCS path");
+ thrown.expectCause(
+ hasMessage(containsString("Output path does not exist or is not writeable")));
+
+ options.getGcpTempLocation();
+ }
- private static void makePropertiesFileWithProject(File path, String projectId)
- throws IOException {
- String properties = String.format("[core]%n"
- + "account = test-account@google.com%n"
- + "project = %s%n"
- + "%n"
- + "[dataflow]%n"
- + "magic = true%n", projectId);
- Files.write(properties, path, StandardCharsets.UTF_8);
+ private static void makePropertiesFileWithProject(File path, String projectId)
+ throws IOException {
+ String properties = String.format("[core]%n"
+ + "account = test-account@google.com%n"
+ + "project = %s%n"
+ + "%n"
+ + "[dataflow]%n"
+ + "magic = true%n", projectId);
+ Files.write(properties, path, StandardCharsets.UTF_8);
+ }
+
+ private static String runGetProjectTest(File path, Map<String, String> environment)
+ throws Exception {
+ makePropertiesFileWithProject(path, "test-project");
+ DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
+ when(projectFactory.getEnvironment()).thenReturn(environment);
+ return projectFactory.create(PipelineOptionsFactory.create());
+ }
}
- private static String runGetProjectTest(File path, Map<String, String> environment)
- throws Exception {
- makePropertiesFileWithProject(path, "test-project");
- DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory());
- when(projectFactory.getEnvironment()).thenReturn(environment);
- return projectFactory.create(PipelineOptionsFactory.create());
+ /** Tests related to determining the GCP temp location. */
+ @RunWith(JUnit4.class)
+ public static class GcpTempLocation {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+ @Mock private GcsUtil mockGcsUtil;
+ @Mock private CloudResourceManager mockCrmClient;
+ @Mock private Projects mockProjects;
+ @Mock private Get mockGet;
+ private Project fakeProject;
+ private PipelineOptions options;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ options = PipelineOptionsFactory.create();
+ options.as(GcsOptions.class).setGcsUtil(mockGcsUtil);
+ options.as(GcpOptions.class).setProject("foo");
+ options.as(GcpOptions.class).setZone("us-north1-a");
+ when(mockCrmClient.projects()).thenReturn(mockProjects);
+ when(mockProjects.get(any(String.class))).thenReturn(mockGet);
+ fakeProject = new Project().setProjectNumber(1L);
+ }
+
+ @Test
+ public void testCreateBucket() throws Exception {
+ doReturn(fakeProject).when(mockGet).execute();
+ when(mockGcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(1L);
+
+ String bucket = GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
+ assertEquals("gs://dataflow-staging-us-north1-1", bucket);
+ }
+
+ @Test
+ public void testCreateBucketProjectLookupFails() throws Exception {
+ doThrow(new IOException("badness")).when(mockGet).execute();
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Unable to verify project");
+ GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
+ }
+
+ @Test
+ public void testCreateBucketCreateBucketFails() throws Exception {
+ doReturn(fakeProject).when(mockGet).execute();
+ doThrow(new IOException("badness")).when(
+ mockGcsUtil).createBucket(any(String.class), any(Bucket.class));
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Unable create default bucket");
+ GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
+ }
+
+ @Test
+ public void testCannotGetBucketOwner() throws Exception {
+ doReturn(fakeProject).when(mockGet).execute();
+ when(mockGcsUtil.bucketOwner(any(GcsPath.class)))
+ .thenThrow(new IOException("badness"));
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Unable to determine the owner");
+ GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
+ }
+
+ @Test
+ public void testProjectMismatch() throws Exception {
+ doReturn(fakeProject).when(mockGet).execute();
+ when(mockGcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Bucket owner does not match the project");
+ GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient);
+ }
+
+ @Test
+ public void regionFromZone() throws Exception {
+ assertEquals("us-central1", GcpTempLocationFactory.getRegionFromZone("us-central1-a"));
+ assertEquals("asia-east", GcpTempLocationFactory.getRegionFromZone("asia-east-a"));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b35f46/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
deleted file mode 100644
index 65cb90b..0000000
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.when;
-
-import com.google.api.services.storage.model.Bucket;
-import java.io.IOException;
-import org.apache.beam.sdk.extensions.gcp.options.CloudResourceManagerOptions;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.MockitoAnnotations;
-import org.mockito.MockitoAnnotations.Mock;
-
-/** Tests for DefaultBucket. */
-@RunWith(JUnit4.class)
-public class DefaultBucketTest {
- @Rule public ExpectedException thrown = ExpectedException.none();
-
- private PipelineOptions options;
- @Mock
- private GcsUtil gcsUtil;
- @Mock
- private GcpProjectUtil gcpUtil;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- options = PipelineOptionsFactory.create();
- options.as(GcsOptions.class).setGcsUtil(gcsUtil);
- options.as(CloudResourceManagerOptions.class).setGcpProjectUtil(gcpUtil);
- options.as(GcpOptions.class).setProject("foo");
- options.as(GcpOptions.class).setZone("us-north1-a");
- }
-
- @Test
- public void testCreateBucket() {
- String bucket = DefaultBucket.tryCreateDefaultBucket(options);
- assertEquals("gs://dataflow-staging-us-north1-0", bucket);
- }
-
- @Test
- public void testCreateBucketProjectLookupFails() throws IOException {
- when(gcpUtil.getProjectNumber("foo")).thenThrow(new IOException("badness"));
-
- thrown.expect(RuntimeException.class);
- thrown.expectMessage("Unable to verify project");
- DefaultBucket.tryCreateDefaultBucket(options);
- }
-
- @Test
- public void testCreateBucketCreateBucketFails() throws IOException {
- doThrow(new IOException("badness")).when(
- gcsUtil).createBucket(any(String.class), any(Bucket.class));
-
- thrown.expect(RuntimeException.class);
- thrown.expectMessage("Unable create default bucket");
- DefaultBucket.tryCreateDefaultBucket(options);
- }
-
- @Test
- public void testCannotGetBucketOwner() throws IOException {
- when(gcsUtil.bucketOwner(any(GcsPath.class)))
- .thenThrow(new IOException("badness"));
-
- thrown.expect(RuntimeException.class);
- thrown.expectMessage("Unable to determine the owner");
- DefaultBucket.tryCreateDefaultBucket(options);
- }
-
- @Test
- public void testProjectMismatch() throws IOException {
- when(gcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L);
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Bucket owner does not match the project");
- DefaultBucket.tryCreateDefaultBucket(options);
- }
-
- @Test
- public void regionFromZone() throws IOException {
- assertEquals("us-central1", DefaultBucket.getRegionFromZone("us-central1-a"));
- assertEquals("asia-east", DefaultBucket.getRegionFromZone("asia-east-a"));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b35f46/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
deleted file mode 100644
index 253787d..0000000
--- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-import com.google.api.client.util.BackOff;
-import com.google.api.services.cloudresourcemanager.CloudResourceManager;
-import com.google.api.services.cloudresourcemanager.model.Project;
-import java.net.SocketTimeoutException;
-import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
-import org.apache.beam.sdk.extensions.gcp.options.CloudResourceManagerOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
-
-/** Test case for {@link GcpProjectUtil}. */
-@RunWith(JUnit4.class)
-public class GcpProjectUtilTest {
- @Rule public ExpectedException thrown = ExpectedException.none();
-
- private static CloudResourceManagerOptions crmOptionsWithTestCredential() {
- CloudResourceManagerOptions pipelineOptions =
- PipelineOptionsFactory.as(CloudResourceManagerOptions.class);
- pipelineOptions.setGcpCredential(new TestCredential());
- return pipelineOptions;
- }
-
- @Test
- public void testGetProjectNumber() throws Exception {
- CloudResourceManagerOptions pipelineOptions = crmOptionsWithTestCredential();
- GcpProjectUtil projectUtil = pipelineOptions.getGcpProjectUtil();
-
- CloudResourceManager.Projects mockProjects = Mockito.mock(
- CloudResourceManager.Projects.class);
- CloudResourceManager mockCrm = Mockito.mock(CloudResourceManager.class);
- projectUtil.setCrmClient(mockCrm);
-
- CloudResourceManager.Projects.Get mockProjectsGet =
- Mockito.mock(CloudResourceManager.Projects.Get.class);
-
- BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
- Project project = new Project();
- project.setProjectNumber(5L);
-
- when(mockCrm.projects()).thenReturn(mockProjects);
- when(mockProjects.get(any(String.class))).thenReturn(mockProjectsGet);
- when(mockProjectsGet.execute())
- .thenThrow(new SocketTimeoutException("SocketException"))
- .thenReturn(project);
-
- assertEquals(5L, projectUtil.getProjectNumber(
- "foo", mockBackOff, new FastNanoClockAndSleeper()));
- }
-}
[2/2] beam git commit: [BEAM-1871] Hide internal implementation
details of how we create a DefaultBucket for GCP Temp Location
Posted by lc...@apache.org.
[BEAM-1871] Hide internal implementation details of how we create a DefaultBucket for GCP Temp Location
This closes #2747
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f29444bf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f29444bf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f29444bf
Branch: refs/heads/master
Commit: f29444bf80281807736d01fea7f4238660d1c9a9
Parents: 47821ad c1b35f4
Author: Lukasz Cwik <lc...@google.com>
Authored: Sat Apr 29 09:11:58 2017 -0700
Committer: Lukasz Cwik <lc...@google.com>
Committed: Sat Apr 29 09:11:58 2017 -0700
----------------------------------------------------------------------
.../options/CloudResourceManagerOptions.java | 14 -
.../sdk/extensions/gcp/options/GcpOptions.java | 124 ++++++-
.../org/apache/beam/sdk/util/DefaultBucket.java | 105 ------
.../apache/beam/sdk/util/GcpProjectUtil.java | 106 ------
.../extensions/gcp/options/GcpOptionsTest.java | 325 ++++++++++++-------
.../apache/beam/sdk/util/DefaultBucketTest.java | 112 -------
.../beam/sdk/util/GcpProjectUtilTest.java | 77 -----
7 files changed, 335 insertions(+), 528 deletions(-)
----------------------------------------------------------------------