You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/01/18 20:56:49 UTC
[1/2] beam git commit: Add a default bucket to GcpOptions
Repository: beam
Updated Branches:
refs/heads/master faa2277b5 -> d13549334
Add a default bucket to GcpOptions
If gcpStagingLocation is not specified, construct a stable Default
Bucket. This bucket is used to stage GCP artifacts.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f29a4761
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f29a4761
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f29a4761
Branch: refs/heads/master
Commit: f29a4761220d2e9fa5752af79c6bce690903c760
Parents: faa2277
Author: Sam McVeety <sg...@google.com>
Authored: Wed Jan 18 12:20:10 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jan 18 12:56:04 2017 -0800
----------------------------------------------------------------------
.../options/DataflowPipelineOptionsTest.java | 4 +-
.../org/apache/beam/sdk/options/GcpOptions.java | 36 ++++--
.../org/apache/beam/sdk/util/DefaultBucket.java | 105 +++++++++++++++++
.../apache/beam/sdk/util/GcpProjectUtil.java | 2 +-
.../java/org/apache/beam/sdk/util/GcsUtil.java | 4 +-
.../apache/beam/sdk/options/GcpOptionsTest.java | 4 +-
.../apache/beam/sdk/util/DefaultBucketTest.java | 112 +++++++++++++++++++
7 files changed, 250 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
index 9dacfb2..30eee0e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java
@@ -183,9 +183,9 @@ public class DataflowPipelineOptionsTest {
@Test
public void testDefaultStagingLocationUnset() {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setProject("");
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Error constructing default value for stagingLocation: "
- + "failed to retrieve gcpTempLocation.");
+ thrown.expectMessage("Error constructing default value for stagingLocation");
options.getStagingLocation();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
index 042f4b4..c04e4f0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java
@@ -17,12 +17,11 @@
*/
package org.apache.beam.sdk.options;
-import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.auth.Credentials;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
@@ -34,6 +33,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.beam.sdk.util.CredentialFactory;
+import org.apache.beam.sdk.util.DefaultBucket;
import org.apache.beam.sdk.util.GcpCredentialFactory;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.PathValidator;
@@ -62,6 +62,17 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
void setProject(String value);
/**
+ * GCP <a href="https://developers.google.com/compute/docs/zones"
+ * >availability zone</a> for operations.
+ *
+ * <p>Default is set on a per-service basis.
+ */
+ @Description("GCP availability zone for running GCP operations. "
+ + "Default is up to the individual service.")
+ String getZone();
+ void setZone(String value);
+
+ /**
* The class of the credential factory that should be created and used to create
* credentials. If gcpCredential has not been set explicitly, an instance of this class will
* be constructed and used as a credential factory.
@@ -197,15 +208,18 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions {
@Nullable
public String create(PipelineOptions options) {
String tempLocation = options.getTempLocation();
- checkArgument(!Strings.isNullOrEmpty(options.getTempLocation()),
- "Error constructing default value for gcpTempLocation: tempLocation is not set");
- try {
- PathValidator validator = options.as(GcsOptions.class).getPathValidator();
- validator.validateOutputFilePrefixSupported(tempLocation);
- } catch (Exception e) {
- throw new IllegalArgumentException(String.format(
- "Error constructing default value for gcpTempLocation: tempLocation is not"
- + " a valid GCS path, %s. ", tempLocation), e);
+ if (isNullOrEmpty(tempLocation)) {
+ tempLocation = DefaultBucket.tryCreateDefaultBucket(options);
+ options.setTempLocation(tempLocation);
+ } else {
+ try {
+ PathValidator validator = options.as(GcsOptions.class).getPathValidator();
+ validator.validateOutputFilePrefixSupported(tempLocation);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format(
+ "Error constructing default value for gcpTempLocation: tempLocation is not"
+ + " a valid GCS path, %s. ", tempLocation), e);
+ }
}
return tempLocation;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
new file mode 100644
index 0000000..75954c0
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+import com.google.api.services.storage.model.Bucket;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for handling default GCS buckets.
+ */
+public class DefaultBucket {
+ static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
+
+ static final String DEFAULT_REGION = "us-central1";
+
+ /**
+ * Creates a default bucket or verifies the existence and proper access control
+ * of an existing default bucket. Returns the location if successful.
+ */
+ public static String tryCreateDefaultBucket(PipelineOptions options) {
+ GcsOptions gcpOptions = options.as(GcsOptions.class);
+
+ final String projectId = gcpOptions.getProject();
+ checkArgument(!isNullOrEmpty(projectId),
+ "--project is a required option.");
+
+ // Look up the project number, to create a default bucket with a stable
+ // name with no special characters.
+ long projectNumber = 0L;
+ try {
+ projectNumber = gcpOptions.as(CloudResourceManagerOptions.class)
+ .getGcpProjectUtil().getProjectNumber(projectId);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to verify project with ID " + projectId, e);
+ }
+ String region = DEFAULT_REGION;
+ if (!isNullOrEmpty(gcpOptions.getZone())) {
+ region = getRegionFromZone(gcpOptions.getZone());
+ }
+ final String bucketName =
+ "dataflow-staging-" + region + "-" + projectNumber;
+ LOG.info("No staging location provided, attempting to use default bucket: {}",
+ bucketName);
+ Bucket bucket = new Bucket()
+ .setName(bucketName)
+ .setLocation(region);
+ // Always try to create the bucket before checking access, so that we do not
+ // race with other pipelines that may be attempting to do the same thing.
+ try {
+ gcpOptions.getGcsUtil().createBucket(projectId, bucket);
+ } catch (FileAlreadyExistsException e) {
+ LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable create default bucket.", e);
+ }
+
+ // Once the bucket is expected to exist, verify that it is correctly owned
+ // by the project executing the job.
+ try {
+ long owner = gcpOptions.getGcsUtil().bucketOwner(
+ GcsPath.fromComponents(bucketName, ""));
+ checkArgument(
+ owner == projectNumber,
+ "Bucket owner does not match the project from --project:"
+ + " %s vs. %s", owner, projectNumber);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Unable to determine the owner of the default bucket at gs://" + bucketName, e);
+ }
+ return "gs://" + bucketName;
+ }
+
+ @VisibleForTesting
+ static String getRegionFromZone(String zone) {
+ String[] zoneParts = zone.split("-");
+ checkArgument(zoneParts.length >= 2, "Invalid zone provided: %s", zone);
+ return zoneParts[0] + "-" + zoneParts[1];
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
index beac4e4..f73afe0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
@@ -76,7 +76,7 @@ public class GcpProjectUtil {
* Returns the project number or throws an exception if the project does not
* exist or has other access exceptions.
*/
- long getProjectNumber(String projectId) throws IOException {
+ public long getProjectNumber(String projectId) throws IOException {
return getProjectNumber(
projectId,
BACKOFF_FACTORY.backoff(),
http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 521673c..a10ea28 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -451,7 +451,9 @@ public class GcsUtil {
void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper)
throws IOException {
Storage.Buckets.Insert insertBucket =
- storageClient.buckets().insert(projectId, bucket);
+ storageClient.buckets().insert(projectId, bucket);
+ insertBucket.setPredefinedAcl("projectPrivate");
+ insertBucket.setPredefinedDefaultObjectAcl("projectPrivate");
try {
ResilientOperation.retry(
http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
index 7854d67..288383e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
@@ -109,9 +109,9 @@ public class GcpOptionsTest {
@Test
public void testEmptyGcpTempLocation() throws Exception {
GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class);
+ options.setProject("");
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(
- "Error constructing default value for gcpTempLocation: tempLocation is not set");
+ thrown.expectMessage("--project is a required option");
options.getGcpTempLocation();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f29a4761/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
new file mode 100644
index 0000000..395e1f3
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.storage.model.Bucket;
+import java.io.IOException;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.MockitoAnnotations;
+import org.mockito.MockitoAnnotations.Mock;
+
+/** Tests for DefaultBucket. */
+@RunWith(JUnit4.class)
+public class DefaultBucketTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private PipelineOptions options;
+ @Mock
+ private GcsUtil gcsUtil;
+ @Mock
+ private GcpProjectUtil gcpUtil;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ options = PipelineOptionsFactory.create();
+ options.as(GcsOptions.class).setGcsUtil(gcsUtil);
+ options.as(CloudResourceManagerOptions.class).setGcpProjectUtil(gcpUtil);
+ options.as(GcpOptions.class).setProject("foo");
+ options.as(GcpOptions.class).setZone("us-north1-a");
+ }
+
+ @Test
+ public void testCreateBucket() {
+ String bucket = DefaultBucket.tryCreateDefaultBucket(options);
+ assertEquals("gs://dataflow-staging-us-north1-0", bucket);
+ }
+
+ @Test
+ public void testCreateBucketProjectLookupFails() throws IOException {
+ when(gcpUtil.getProjectNumber("foo")).thenThrow(new IOException("badness"));
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Unable to verify project");
+ DefaultBucket.tryCreateDefaultBucket(options);
+ }
+
+ @Test
+ public void testCreateBucketCreateBucketFails() throws IOException {
+ doThrow(new IOException("badness")).when(
+ gcsUtil).createBucket(any(String.class), any(Bucket.class));
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Unable create default bucket");
+ DefaultBucket.tryCreateDefaultBucket(options);
+ }
+
+ @Test
+ public void testCannotGetBucketOwner() throws IOException {
+ when(gcsUtil.bucketOwner(any(GcsPath.class)))
+ .thenThrow(new IOException("badness"));
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Unable to determine the owner");
+ DefaultBucket.tryCreateDefaultBucket(options);
+ }
+
+ @Test
+ public void testProjectMismatch() throws IOException {
+ when(gcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Bucket owner does not match the project");
+ DefaultBucket.tryCreateDefaultBucket(options);
+ }
+
+ @Test
+ public void regionFromZone() throws IOException {
+ assertEquals("us-central1", DefaultBucket.getRegionFromZone("us-central1-a"));
+ assertEquals("asia-east", DefaultBucket.getRegionFromZone("asia-east-a"));
+ }
+}
[2/2] beam git commit: This closes #1041
Posted by tg...@apache.org.
This closes #1041
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1354933
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1354933
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1354933
Branch: refs/heads/master
Commit: d13549334bfa45656b4efa5494364889ebc77f28
Parents: faa2277 f29a476
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jan 18 12:56:35 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jan 18 12:56:35 2017 -0800
----------------------------------------------------------------------
.../options/DataflowPipelineOptionsTest.java | 4 +-
.../org/apache/beam/sdk/options/GcpOptions.java | 36 ++++--
.../org/apache/beam/sdk/util/DefaultBucket.java | 105 +++++++++++++++++
.../apache/beam/sdk/util/GcpProjectUtil.java | 2 +-
.../java/org/apache/beam/sdk/util/GcsUtil.java | 4 +-
.../apache/beam/sdk/options/GcpOptionsTest.java | 4 +-
.../apache/beam/sdk/util/DefaultBucketTest.java | 112 +++++++++++++++++++
7 files changed, 250 insertions(+), 17 deletions(-)
----------------------------------------------------------------------