You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/10/03 15:33:54 UTC
[1/3] incubator-beam git commit: Add default bucket scaffolding.
Repository: incubator-beam
Updated Branches:
refs/heads/master 202acd1d6 -> 2ee444d15
Add default bucket scaffolding.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d6e7c71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d6e7c71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d6e7c71
Branch: refs/heads/master
Commit: 9d6e7c71b6863a3a87eb3b4ad7b4a5ce75707955
Parents: ccd9bad
Author: sammcveety <sa...@gmail.com>
Authored: Fri Sep 30 21:24:58 2016 -0400
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Oct 3 08:20:12 2016 -0700
----------------------------------------------------------------------
pom.xml | 13 +++
.../DataflowPipelineTranslatorTest.java | 2 +-
.../runners/dataflow/DataflowRunnerTest.java | 7 +-
sdks/java/core/pom.xml | 5 +
.../options/CloudResourceManagerOptions.java | 40 +++++++
.../apache/beam/sdk/util/GcpProjectUtil.java | 106 ++++++++++++++++++
.../apache/beam/sdk/util/GcsPathValidator.java | 2 +-
.../java/org/apache/beam/sdk/util/GcsUtil.java | 67 ++++++-----
.../org/apache/beam/sdk/util/Transport.java | 17 +++
.../dataflow/util/GcsPathValidatorTest.java | 4 +-
.../apache/beam/sdk/util/ApiSurfaceTest.java | 1 +
.../beam/sdk/util/GcpProjectUtilTest.java | 76 +++++++++++++
.../org/apache/beam/sdk/util/GcsUtilTest.java | 112 ++++++++++++++++++-
13 files changed, 413 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3cd5255..7295261 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
<!-- If updating dependencies, please update any relevant javadoc offlineLinks -->
<avro.version>1.8.1</avro.version>
<bigquery.version>v2-rev295-1.22.0</bigquery.version>
+ <cloudresourcemanager.version>v1-rev6-1.22.0</cloudresourcemanager.version>
<pubsubgrpc.version>0.1.0</pubsubgrpc.version>
<clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version>
<dataflow.version>v1b3-rev36-1.22.0</dataflow.version>
@@ -483,6 +484,18 @@
<dependency>
<groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-cloudresourcemanager</artifactId>
+ <version>${cloudresourcemanager.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-jdk5</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
<version>${pubsub.version}</version>
<exclusions>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 2b7013d..98d2fb0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -155,7 +155,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
}
});
- when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
+ when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 58a01aa..b0ee231 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -187,9 +187,9 @@ public class DataflowRunnerTest {
* Build a mock {@link GcsUtil} with return values.
*
* @param bucketExist first return value
- * @param bucketExists next return values
+ * @param bucketAccessible next return values
*/
- private GcsUtil buildMockGcsUtil(Boolean bucketExist, Boolean... bucketExists)
+ private GcsUtil buildMockGcsUtil(Boolean bucketExist, Boolean... bucketAccessible)
throws IOException {
GcsUtil mockGcsUtil = mock(GcsUtil.class);
when(mockGcsUtil.create(any(GcsPath.class), anyString()))
@@ -209,7 +209,8 @@ public class DataflowRunnerTest {
return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
}
});
- when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(bucketExist, bucketExists);
+ when(mockGcsUtil.bucketAccessible(any(GcsPath.class)))
+ .thenReturn(bucketExist, bucketAccessible);
return mockGcsUtil;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index c4d3e64..aa0ad09 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -369,6 +369,11 @@
<dependency>
<groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-cloudresourcemanager</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
new file mode 100644
index 0000000..ed532db
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.beam.sdk.util.GcpProjectUtil;
+
+/**
+ * Properties needed when using CloudResourceManager with the Beam SDK.
+ */
+@Description("Options that are used to configure CloudResourceManager. See "
+ + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.")
+public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions,
+ PipelineOptions, StreamingOptions {
+ /**
+ * The GcpProjectUtil instance that should be used to communicate with Google Cloud Storage.
+ */
+ @JsonIgnore
+ @Description("The GcpProjectUtil instance that should be used to communicate"
+ + " with Google Cloud Resource Manager.")
+ @Default.InstanceFactory(GcpProjectUtil.GcpProjectUtilFactory.class)
+ @Hidden
+ GcpProjectUtil getGcpProjectUtil();
+ void setGcpProjectUtil(GcpProjectUtil value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
new file mode 100644
index 0000000..beac4e4
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.cloudresourcemanager.model.Project;
+import com.google.cloud.hadoop.util.ResilientOperation;
+import com.google.cloud.hadoop.util.RetryDeterminer;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides operations on Google Cloud Platform Projects.
+ */
+public class GcpProjectUtil {
+ /**
+ * A {@link DefaultValueFactory} able to create a {@link GcpProjectUtil} using
+ * any transport flags specified on the {@link PipelineOptions}.
+ */
+ public static class GcpProjectUtilFactory implements DefaultValueFactory<GcpProjectUtil> {
+ /**
+ * Returns an instance of {@link GcpProjectUtil} based on the
+ * {@link PipelineOptions}.
+ */
+ @Override
+ public GcpProjectUtil create(PipelineOptions options) {
+ LOG.debug("Creating new GcpProjectUtil");
+ CloudResourceManagerOptions crmOptions = options.as(CloudResourceManagerOptions.class);
+ return new GcpProjectUtil(
+ Transport.newCloudResourceManagerClient(crmOptions).build());
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(GcpProjectUtil.class);
+
+ private static final FluentBackoff BACKOFF_FACTORY =
+ FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
+
+ /** Client for the CRM API. */
+ private CloudResourceManager crmClient;
+
+ private GcpProjectUtil(CloudResourceManager crmClient) {
+ this.crmClient = crmClient;
+ }
+
+ // Use this only for testing purposes.
+ @VisibleForTesting
+ void setCrmClient(CloudResourceManager crmClient) {
+ this.crmClient = crmClient;
+ }
+
+ /**
+ * Returns the project number or throws an exception if the project does not
+ * exist or has other access exceptions.
+ */
+ long getProjectNumber(String projectId) throws IOException {
+ return getProjectNumber(
+ projectId,
+ BACKOFF_FACTORY.backoff(),
+ Sleeper.DEFAULT);
+ }
+
+ /**
+ * Returns the project number or throws an error if the project does not
+ * exist or has other access errors.
+ */
+ @VisibleForTesting
+ long getProjectNumber(String projectId, BackOff backoff, Sleeper sleeper) throws IOException {
+ CloudResourceManager.Projects.Get getProject =
+ crmClient.projects().get(projectId);
+ try {
+ Project project = ResilientOperation.retry(
+ ResilientOperation.getGoogleRequestCallable(getProject),
+ backoff,
+ RetryDeterminer.SOCKET_ERRORS,
+ IOException.class,
+ sleeper);
+ return project.getProjectNumber();
+ } catch (Exception e) {
+ throw new IOException("Unable to get project number", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
index 89363ce..c8da4d8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
@@ -75,7 +75,7 @@ public class GcsPathValidator implements PathValidator {
private void verifyPathIsAccessible(String path, String errorMessage) {
GcsPath gcsPath = getGcsPath(path);
try {
- checkArgument(gcpOptions.getGcsUtil().bucketExists(gcsPath),
+ checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath),
errorMessage, path);
} catch (IOException e) {
throw new RuntimeException(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 4befb1a..ce4604b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -28,6 +28,7 @@ import com.google.api.client.http.HttpHeaders;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
@@ -49,6 +50,8 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.FileAlreadyExistsException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
@@ -338,11 +341,10 @@ public class GcsUtil {
}
/**
- * Returns whether the GCS bucket exists. If the bucket exists, it must
- * be accessible otherwise the permissions exception will be propagated.
+ * Returns whether the GCS bucket exists and is accessible.
*/
- public boolean bucketExists(GcsPath path) throws IOException {
- return bucketExists(
+ public boolean bucketAccessible(GcsPath path) throws IOException {
+ return bucketAccessible(
path,
BACKOFF_FACTORY.backoff(),
Sleeper.DEFAULT);
@@ -351,24 +353,23 @@ public class GcsUtil {
/**
* Returns the project number of the project which owns this bucket.
* If the bucket exists, it must be accessible otherwise the permissions
- * exception will be propagated.
+ * exception will be propagated. If the bucket does not exist, an exception
+ * will be thrown.
*/
public long bucketOwner(GcsPath path) throws IOException {
return getBucket(
path,
BACKOFF_FACTORY.backoff(),
- Sleeper.DEFAULT).getProjectNumber();
+ Sleeper.DEFAULT).getProjectNumber().longValue();
}
/**
- * Creates a bucket for the provided project or propagates an error.
+ * Creates a {@link Bucket} under the specified project in Cloud Storage or
+ * propagates an exception.
*/
- public void createBucket(GcsPath path, long projectNumber) throws IOException {
- return createBucket(
- path,
- projectNumber,
- BACKOFF_FACTORY.backoff(),
- Sleeper.DEFAULT);
+ public void createBucket(String projectId, Bucket bucket) throws IOException {
+ createBucket(
+ projectId, bucket, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
}
/**
@@ -376,18 +377,22 @@ public class GcsUtil {
* is inaccessible due to permissions.
*/
@VisibleForTesting
- boolean bucketExists(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
- return getBucket(path, backoff, sleeper) != null;
+ boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
+ try {
+ return getBucket(path, backoff, sleeper) != null;
+ } catch (AccessDeniedException | FileNotFoundException e) {
+ return false;
+ }
}
@VisibleForTesting
@Nullable
- Storage.Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
+ Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
Storage.Buckets.Get getBucket =
storageClient.buckets().get(path.getBucket());
try {
- Storage.Bucket bucket = ResilientOperation.retry(
+ Bucket bucket = ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(getBucket),
backoff,
new RetryDeterminer<IOException>() {
@@ -401,11 +406,14 @@ public class GcsUtil {
},
IOException.class,
sleeper);
-
+
return bucket;
} catch (GoogleJsonResponseException e) {
- if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {
- return null;
+ if (errorExtractor.accessDenied(e)) {
+ throw new AccessDeniedException(path.toString(), null, e.getMessage());
+ }
+ if (errorExtractor.itemNotFound(e)) {
+ throw new FileNotFoundException(e.getMessage());
}
throw e;
} catch (InterruptedException e) {
@@ -417,11 +425,10 @@ public class GcsUtil {
}
@VisibleForTesting
- void createBucket(GcsPath path, long projectNumber, BackOff backoff, Sleeper sleeper)
+ void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper)
throws IOException {
Storage.Buckets.Insert insertBucket =
- storageClient.buckets().insert(path.getBucket());
- insertBucket.setProject(String.valueOf(projectNumber));
+ storageClient.buckets().insert(projectId, bucket);
try {
ResilientOperation.retry(
@@ -429,8 +436,8 @@ public class GcsUtil {
backoff,
new RetryDeterminer<IOException>() {
@Override
- public boolean shouldRetry(IOException e) {
- if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {
+ public boolean shouldRetry(IOException e) {
+ if (errorExtractor.itemAlreadyExists(e) || errorExtractor.accessDenied(e)) {
return false;
}
return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
@@ -439,11 +446,19 @@ public class GcsUtil {
IOException.class,
sleeper);
return;
+ } catch (GoogleJsonResponseException e) {
+ if (errorExtractor.accessDenied(e)) {
+ throw new AccessDeniedException(bucket.getName(), null, e.getMessage());
+ }
+ if (errorExtractor.itemAlreadyExists(e)) {
+ throw new FileAlreadyExistsException(bucket.getName(), null, e.getMessage());
+ }
+ throw e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(
String.format("Error while attempting to create bucket gs://%s for rproject %s",
- path.getBucket(), projectNumber), e);
+ bucket.getName(), projectId), e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
index d824207..1f61299 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
@@ -24,6 +24,7 @@ import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.storage.Storage;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
@@ -33,6 +34,7 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.security.GeneralSecurityException;
import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
import org.apache.beam.sdk.options.GcsOptions;
import org.apache.beam.sdk.options.PubsubOptions;
@@ -121,6 +123,21 @@ public class Transport {
}
/**
+ * Returns a CloudResourceManager client builder using the specified
+ * {@link CloudResourceManagerOptions}.
+ */
+ public static CloudResourceManager.Builder
+ newCloudResourceManagerClient(CloudResourceManagerOptions options) {
+ return new CloudResourceManager.Builder(getTransport(), getJsonFactory(),
+ chainHttpRequestInitializer(
+ options.getGcpCredential(),
+ // Do not log 404. It clutters the output and is possibly even required by the caller.
+ new RetryHttpRequestInitializer(ImmutableList.of(404))))
+ .setApplicationName(options.getAppName())
+ .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+ }
+
+ /**
* Returns a Cloud Storage client builder using the specified {@link GcsOptions}.
*/
public static Storage.Builder
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
index 398fa63..adf4fc2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
@@ -57,7 +57,7 @@ public class GcsPathValidatorTest {
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
- when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
+ when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
options.setRunner(FakeRunner.class);
@@ -81,7 +81,7 @@ public class GcsPathValidatorTest {
@Test
public void testWhenBucketDoesNotExist() throws Exception {
- when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
+ when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(
"Could not find file gs://non-existent-bucket/location");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
index 4b76277..ea771b4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
@@ -76,6 +76,7 @@ public class ApiSurfaceTest {
inPackage("org.apache.beam"),
inPackage("com.google.api.client"),
inPackage("com.google.api.services.bigquery"),
+ inPackage("com.google.api.services.cloudresourcemanager"),
inPackage("com.google.api.services.dataflow"),
inPackage("com.google.api.services.pubsub"),
inPackage("com.google.api.services.storage"),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
new file mode 100644
index 0000000..23f0418
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.cloudresourcemanager.model.Project;
+import java.net.SocketTimeoutException;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/** Test case for {@link GcpProjectUtil}. */
+@RunWith(JUnit4.class)
+public class GcpProjectUtilTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private static CloudResourceManagerOptions crmOptionsWithTestCredential() {
+ CloudResourceManagerOptions pipelineOptions =
+ PipelineOptionsFactory.as(CloudResourceManagerOptions.class);
+ pipelineOptions.setGcpCredential(new TestCredential());
+ return pipelineOptions;
+ }
+
+ @Test
+ public void testGetProjectNumber() throws Exception {
+ CloudResourceManagerOptions pipelineOptions = crmOptionsWithTestCredential();
+ GcpProjectUtil projectUtil = pipelineOptions.getGcpProjectUtil();
+
+ CloudResourceManager.Projects mockProjects = Mockito.mock(
+ CloudResourceManager.Projects.class);
+ CloudResourceManager mockCrm = Mockito.mock(CloudResourceManager.class);
+ projectUtil.setCrmClient(mockCrm);
+
+ CloudResourceManager.Projects.Get mockProjectsGet =
+ Mockito.mock(CloudResourceManager.Projects.Get.class);
+
+ BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+ Project project = new Project();
+ project.setProjectNumber(5L);
+
+ when(mockCrm.projects()).thenReturn(mockProjects);
+ when(mockProjects.get(any(String.class))).thenReturn(mockProjectsGet);
+ when(mockProjectsGet.execute())
+ .thenThrow(new SocketTimeoutException("SocketException"))
+ .thenReturn(project);
+
+ assertEquals(5L, projectUtil.getProjectNumber(
+ "foo", mockBackOff, new FastNanoClockAndSleeper()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
index 9504b4c..df3bf6e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
import com.google.api.client.googleapis.batch.BatchRequest;
@@ -58,6 +59,7 @@ import java.io.IOException;
import java.math.BigInteger;
import java.net.SocketTimeoutException;
import java.nio.channels.SeekableByteChannel;
+import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -305,7 +307,7 @@ public class GcsUtilTest {
GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/accessdeniedfile");
GoogleJsonResponseException expectedException =
googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
- "Waves hand mysteriously", "These aren't the buckets your looking for");
+ "Waves hand mysteriously", "These aren't the buckets you're looking for");
when(mockStorage.objects()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn(
@@ -380,7 +382,57 @@ public class GcsUtilTest {
}
@Test
- public void testBucketExists() throws IOException {
+ public void testCreateBucket() throws IOException {
+ GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+ GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+ Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+
+ Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
+
+ BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+ when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+ when(mockStorageObjects.insert(
+ any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert);
+ when(mockStorageInsert.execute())
+ .thenThrow(new SocketTimeoutException("SocketException"))
+ .thenReturn(new Bucket());
+
+ gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
+ }
+
+ @Test
+ public void testCreateBucketAccessErrors() throws IOException {
+ GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+ GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+
+ Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+ Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
+
+ BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+ GoogleJsonResponseException expectedException =
+ googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
+ "Waves hand mysteriously", "These aren't the buckets you're looking for");
+
+ when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+ when(mockStorageObjects.insert(
+ any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert);
+ when(mockStorageInsert.execute())
+ .thenThrow(expectedException);
+
+ thrown.expect(AccessDeniedException.class);
+
+ gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
+ }
+
+ @Test
+ public void testBucketAccessible() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
@@ -398,7 +450,7 @@ public class GcsUtilTest {
.thenThrow(new SocketTimeoutException("SocketException"))
.thenReturn(new Bucket());
- assertTrue(gcsUtil.bucketExists(GcsPath.fromComponents("testbucket", "testobject"),
+ assertTrue(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff, new FastNanoClockAndSleeper()));
}
@@ -416,14 +468,14 @@ public class GcsUtilTest {
BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
GoogleJsonResponseException expectedException =
googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
- "Waves hand mysteriously", "These aren't the buckets your looking for");
+ "Waves hand mysteriously", "These aren't the buckets you're looking for");
when(mockStorage.buckets()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
when(mockStorageGet.execute())
.thenThrow(expectedException);
- assertFalse(gcsUtil.bucketExists(GcsPath.fromComponents("testbucket", "testobject"),
+ assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff, new FastNanoClockAndSleeper()));
}
@@ -446,11 +498,59 @@ public class GcsUtilTest {
.thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
"It don't exist", "Nothing here to see"));
- assertFalse(gcsUtil.bucketExists(GcsPath.fromComponents("testbucket", "testobject"),
+ assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"),
+ mockBackOff, new FastNanoClockAndSleeper()));
+ }
+
+ @Test
+ public void testGetBucket() throws IOException {
+ GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+ GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+
+ Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+ Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
+
+ BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+ when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+ when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+ when(mockStorageGet.execute())
+ .thenThrow(new SocketTimeoutException("SocketException"))
+ .thenReturn(new Bucket());
+
+ assertNotNull(gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff, new FastNanoClockAndSleeper()));
}
@Test
+ public void testGetBucketNotExists() throws IOException {
+ GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+ GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+ Storage mockStorage = Mockito.mock(Storage.class);
+ gcsUtil.setStorageClient(mockStorage);
+
+ Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+ Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
+
+ BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+ when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+ when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+ when(mockStorageGet.execute())
+ .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
+ "It don't exist", "Nothing here to see"));
+
+ thrown.expect(FileNotFoundException.class);
+ thrown.expectMessage("It don't exist");
+ gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"),
+ mockBackOff, new FastNanoClockAndSleeper());
+ }
+
+ @Test
public void testGCSChannelCloseIdempotent() throws IOException {
SeekableByteChannel channel =
new GoogleCloudStorageReadChannel(null, "dummybucket", "dummyobject", null,
[3/3] incubator-beam git commit: Add initial scaffolding for default
bucket (take 2)
Posted by lc...@apache.org.
Add initial scaffolding for default bucket (take 2)
This closes #1014
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ee444d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ee444d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ee444d1
Branch: refs/heads/master
Commit: 2ee444d15a25a1020413196684a0b711d3d7758b
Parents: 202acd1 9d6e7c7
Author: Luke Cwik <lc...@google.com>
Authored: Mon Oct 3 08:20:39 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Oct 3 08:20:39 2016 -0700
----------------------------------------------------------------------
pom.xml | 13 +++
.../DataflowPipelineTranslatorTest.java | 2 +-
.../runners/dataflow/DataflowRunnerTest.java | 7 +-
sdks/java/core/pom.xml | 5 +
.../options/CloudResourceManagerOptions.java | 40 +++++++
.../apache/beam/sdk/util/GcpProjectUtil.java | 106 ++++++++++++++++++
.../apache/beam/sdk/util/GcsPathValidator.java | 2 +-
.../java/org/apache/beam/sdk/util/GcsUtil.java | 94 ++++++++++++++--
.../org/apache/beam/sdk/util/Transport.java | 17 +++
.../dataflow/util/GcsPathValidatorTest.java | 4 +-
.../apache/beam/sdk/util/ApiSurfaceTest.java | 1 +
.../beam/sdk/util/GcpProjectUtilTest.java | 76 +++++++++++++
.../org/apache/beam/sdk/util/GcsUtilTest.java | 112 ++++++++++++++++++-
13 files changed, 457 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Add initial bucket stuff.
Posted by lc...@apache.org.
Add initial bucket stuff.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ccd9bad1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ccd9bad1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ccd9bad1
Branch: refs/heads/master
Commit: ccd9bad18dcc0a1df7ebc82f43e89ddec838c037
Parents: 202acd1
Author: sammcveety <sa...@gmail.com>
Authored: Sat Sep 17 21:19:53 2016 -0400
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Oct 3 08:20:12 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/util/GcsUtil.java | 67 +++++++++++++++++++-
1 file changed, 64 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ccd9bad1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 41c372e..4befb1a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -349,16 +349,45 @@ public class GcsUtil {
}
/**
+ * Returns the project number of the project which owns this bucket.
+ * If the bucket exists, it must be accessible otherwise the permissions
+ * exception will be propagated.
+ */
+ public long bucketOwner(GcsPath path) throws IOException {
+ return getBucket(
+ path,
+ BACKOFF_FACTORY.backoff(),
+ Sleeper.DEFAULT).getProjectNumber();
+ }
+
+ /**
+ * Creates a bucket for the provided project or propagates an error.
+ */
+ public void createBucket(GcsPath path, long projectNumber) throws IOException {
+ return createBucket(
+ path,
+ projectNumber,
+ BACKOFF_FACTORY.backoff(),
+ Sleeper.DEFAULT);
+ }
+
+ /**
* Returns whether the GCS bucket exists. This will return false if the bucket
* is inaccessible due to permissions.
*/
@VisibleForTesting
boolean bucketExists(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
+ return getBucket(path, backoff, sleeper) != null;
+ }
+
+ @VisibleForTesting
+ @Nullable
+ Storage.Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
Storage.Buckets.Get getBucket =
storageClient.buckets().get(path.getBucket());
try {
- ResilientOperation.retry(
+ Storage.Bucket bucket = ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(getBucket),
backoff,
new RetryDeterminer<IOException>() {
@@ -372,10 +401,11 @@ public class GcsUtil {
},
IOException.class,
sleeper);
- return true;
+
+ return bucket;
} catch (GoogleJsonResponseException e) {
if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {
- return false;
+ return null;
}
throw e;
} catch (InterruptedException e) {
@@ -386,6 +416,37 @@ public class GcsUtil {
}
}
+ @VisibleForTesting
+ void createBucket(GcsPath path, long projectNumber, BackOff backoff, Sleeper sleeper)
+ throws IOException {
+ Storage.Buckets.Insert insertBucket =
+ storageClient.buckets().insert(path.getBucket());
+ insertBucket.setProject(String.valueOf(projectNumber));
+
+ try {
+ ResilientOperation.retry(
+ ResilientOperation.getGoogleRequestCallable(insertBucket),
+ backoff,
+ new RetryDeterminer<IOException>() {
+ @Override
+ public boolean shouldRetry(IOException e) {
+ if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {
+ return false;
+ }
+ return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
+ }
+ },
+ IOException.class,
+ sleeper);
+ return;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ String.format("Error while attempting to create bucket gs://%s for rproject %s",
+ path.getBucket(), projectNumber), e);
+ }
+ }
+
private static void executeBatches(List<BatchRequest> batches) throws IOException {
ListeningExecutorService executor = MoreExecutors.listeningDecorator(
MoreExecutors.getExitingExecutorService(