You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/10/03 15:33:54 UTC

[1/3] incubator-beam git commit: Add default bucket scaffolding.

Repository: incubator-beam
Updated Branches:
  refs/heads/master 202acd1d6 -> 2ee444d15


Add default bucket scaffolding.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d6e7c71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d6e7c71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d6e7c71

Branch: refs/heads/master
Commit: 9d6e7c71b6863a3a87eb3b4ad7b4a5ce75707955
Parents: ccd9bad
Author: sammcveety <sa...@gmail.com>
Authored: Fri Sep 30 21:24:58 2016 -0400
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Oct 3 08:20:12 2016 -0700

----------------------------------------------------------------------
 pom.xml                                         |  13 +++
 .../DataflowPipelineTranslatorTest.java         |   2 +-
 .../runners/dataflow/DataflowRunnerTest.java    |   7 +-
 sdks/java/core/pom.xml                          |   5 +
 .../options/CloudResourceManagerOptions.java    |  40 +++++++
 .../apache/beam/sdk/util/GcpProjectUtil.java    | 106 ++++++++++++++++++
 .../apache/beam/sdk/util/GcsPathValidator.java  |   2 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  67 ++++++-----
 .../org/apache/beam/sdk/util/Transport.java     |  17 +++
 .../dataflow/util/GcsPathValidatorTest.java     |   4 +-
 .../apache/beam/sdk/util/ApiSurfaceTest.java    |   1 +
 .../beam/sdk/util/GcpProjectUtilTest.java       |  76 +++++++++++++
 .../org/apache/beam/sdk/util/GcsUtilTest.java   | 112 ++++++++++++++++++-
 13 files changed, 413 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3cd5255..7295261 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
     <!-- If updating dependencies, please update any relevant javadoc offlineLinks -->
     <avro.version>1.8.1</avro.version>
     <bigquery.version>v2-rev295-1.22.0</bigquery.version>
+    <cloudresourcemanager.version>v1-rev6-1.22.0</cloudresourcemanager.version>
     <pubsubgrpc.version>0.1.0</pubsubgrpc.version>
     <clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version>
     <dataflow.version>v1b3-rev36-1.22.0</dataflow.version>
@@ -483,6 +484,18 @@
 
       <dependency>
         <groupId>com.google.apis</groupId>
+        <artifactId>google-api-services-cloudresourcemanager</artifactId>
+        <version>${cloudresourcemanager.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava-jdk5</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
+      <dependency>
+        <groupId>com.google.apis</groupId>
         <artifactId>google-api-services-pubsub</artifactId>
         <version>${pubsub.version}</version>
         <exclusions>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 2b7013d..98d2fb0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -155,7 +155,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
       }
     });
-    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
+    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
     when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
 
     DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 58a01aa..b0ee231 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -187,9 +187,9 @@ public class DataflowRunnerTest {
    * Build a mock {@link GcsUtil} with return values.
    *
    * @param bucketExist first return value
-   * @param bucketExists next return values
+   * @param bucketAccessible next return values
    */
-  private GcsUtil buildMockGcsUtil(Boolean bucketExist, Boolean... bucketExists)
+  private GcsUtil buildMockGcsUtil(Boolean bucketExist, Boolean... bucketAccessible)
       throws IOException {
     GcsUtil mockGcsUtil = mock(GcsUtil.class);
     when(mockGcsUtil.create(any(GcsPath.class), anyString()))
@@ -209,7 +209,8 @@ public class DataflowRunnerTest {
         return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
       }
     });
-    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(bucketExist, bucketExists);
+    when(mockGcsUtil.bucketAccessible(any(GcsPath.class)))
+        .thenReturn(bucketExist, bucketAccessible);
     return mockGcsUtil;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index c4d3e64..aa0ad09 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -369,6 +369,11 @@
 
     <dependency>
       <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-cloudresourcemanager</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
       <artifactId>google-api-services-pubsub</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
new file mode 100644
index 0000000..ed532db
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CloudResourceManagerOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.options;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.beam.sdk.util.GcpProjectUtil;
+
+/**
+ * Properties needed when using CloudResourceManager with the Beam SDK.
+ */
+@Description("Options that are used to configure CloudResourceManager. See "
+    + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.")
+public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions,
+    PipelineOptions, StreamingOptions {
+  /**
+   * The GcpProjectUtil instance that should be used to communicate with Google Cloud Storage.
+   */
+  @JsonIgnore
+  @Description("The GcpProjectUtil instance that should be used to communicate"
+               + " with Google Cloud Resource Manager.")
+  @Default.InstanceFactory(GcpProjectUtil.GcpProjectUtilFactory.class)
+  @Hidden
+  GcpProjectUtil getGcpProjectUtil();
+  void setGcpProjectUtil(GcpProjectUtil value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
new file mode 100644
index 0000000..beac4e4
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.cloudresourcemanager.model.Project;
+import com.google.cloud.hadoop.util.ResilientOperation;
+import com.google.cloud.hadoop.util.RetryDeterminer;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides operations on Google Cloud Platform Projects.
+ */
+public class GcpProjectUtil {
+  /**
+   * A {@link DefaultValueFactory} able to create a {@link GcpProjectUtil} using
+   * any transport flags specified on the {@link PipelineOptions}.
+   */
+  public static class GcpProjectUtilFactory implements DefaultValueFactory<GcpProjectUtil> {
+    /**
+     * Returns an instance of {@link GcpProjectUtil} based on the
+     * {@link PipelineOptions}.
+     */
+    @Override
+    public GcpProjectUtil create(PipelineOptions options) {
+      LOG.debug("Creating new GcpProjectUtil");
+      CloudResourceManagerOptions crmOptions = options.as(CloudResourceManagerOptions.class);
+      return new GcpProjectUtil(
+          Transport.newCloudResourceManagerClient(crmOptions).build());
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(GcpProjectUtil.class);
+
+  private static final FluentBackoff BACKOFF_FACTORY =
+      FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
+
+  /** Client for the CRM API. */
+  private CloudResourceManager crmClient;
+
+  private GcpProjectUtil(CloudResourceManager crmClient) {
+    this.crmClient = crmClient;
+  }
+
+  // Use this only for testing purposes.
+  @VisibleForTesting
+  void setCrmClient(CloudResourceManager crmClient) {
+    this.crmClient = crmClient;
+  }
+
+  /**
+   * Returns the project number or throws an exception if the project does not
+   * exist or has other access exceptions.
+   */
+  long getProjectNumber(String projectId) throws IOException {
+    return getProjectNumber(
+      projectId,
+      BACKOFF_FACTORY.backoff(),
+      Sleeper.DEFAULT);
+  }
+
+  /**
+   * Returns the project number or throws an error if the project does not
+   * exist or has other access errors.
+   */
+  @VisibleForTesting
+  long getProjectNumber(String projectId, BackOff backoff, Sleeper sleeper) throws IOException {
+      CloudResourceManager.Projects.Get getProject =
+          crmClient.projects().get(projectId);
+      try {
+        Project project = ResilientOperation.retry(
+            ResilientOperation.getGoogleRequestCallable(getProject),
+            backoff,
+            RetryDeterminer.SOCKET_ERRORS,
+            IOException.class,
+            sleeper);
+        return project.getProjectNumber();
+      } catch (Exception e) {
+        throw new IOException("Unable to get project number", e);
+     }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
index 89363ce..c8da4d8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
@@ -75,7 +75,7 @@ public class GcsPathValidator implements PathValidator {
   private void verifyPathIsAccessible(String path, String errorMessage) {
     GcsPath gcsPath = getGcsPath(path);
     try {
-      checkArgument(gcpOptions.getGcsUtil().bucketExists(gcsPath),
+      checkArgument(gcpOptions.getGcsUtil().bucketAccessible(gcsPath),
         errorMessage, path);
     } catch (IOException e) {
       throw new RuntimeException(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 4befb1a..ce4604b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -28,6 +28,7 @@ import com.google.api.client.http.HttpHeaders;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.model.Bucket;
 import com.google.api.services.storage.model.Objects;
 import com.google.api.services.storage.model.StorageObject;
 import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
@@ -49,6 +50,8 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.FileAlreadyExistsException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -338,11 +341,10 @@ public class GcsUtil {
   }
 
   /**
-   * Returns whether the GCS bucket exists. If the bucket exists, it must
-   * be accessible otherwise the permissions exception will be propagated.
+   * Returns whether the GCS bucket exists and is accessible.
    */
-  public boolean bucketExists(GcsPath path) throws IOException {
-    return bucketExists(
+  public boolean bucketAccessible(GcsPath path) throws IOException {
+    return bucketAccessible(
         path,
         BACKOFF_FACTORY.backoff(),
         Sleeper.DEFAULT);
@@ -351,24 +353,23 @@ public class GcsUtil {
   /**
    * Returns the project number of the project which owns this bucket.
    * If the bucket exists, it must be accessible otherwise the permissions
-   * exception will be propagated.
+   * exception will be propagated.  If the bucket does not exist, an exception
+   * will be thrown.
    */
   public long bucketOwner(GcsPath path) throws IOException {
     return getBucket(
         path,
         BACKOFF_FACTORY.backoff(),
-        Sleeper.DEFAULT).getProjectNumber();
+        Sleeper.DEFAULT).getProjectNumber().longValue();
   }
 
   /**
-   * Creates a bucket for the provided project or propagates an error.
+   * Creates a {@link Bucket} under the specified project in Cloud Storage or
+   * propagates an exception.
    */
-  public void createBucket(GcsPath path, long projectNumber) throws IOException {
-    return createBucket(
-        path,
-        projectNumber,
-        BACKOFF_FACTORY.backoff(),
-        Sleeper.DEFAULT);
+  public void createBucket(String projectId, Bucket bucket) throws IOException {
+    createBucket(
+        projectId, bucket, BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
   }
 
   /**
@@ -376,18 +377,22 @@ public class GcsUtil {
    * is inaccessible due to permissions.
    */
   @VisibleForTesting
-  boolean bucketExists(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
-    return getBucket(path, backoff, sleeper) != null;
+  boolean bucketAccessible(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
+    try {
+      return getBucket(path, backoff, sleeper) != null;
+    } catch (AccessDeniedException | FileNotFoundException e) {
+      return false;
+    }
   }
 
   @VisibleForTesting
   @Nullable
-  Storage.Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
+  Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
     Storage.Buckets.Get getBucket =
         storageClient.buckets().get(path.getBucket());
 
       try {
-        Storage.Bucket bucket = ResilientOperation.retry(
+        Bucket bucket = ResilientOperation.retry(
             ResilientOperation.getGoogleRequestCallable(getBucket),
             backoff,
             new RetryDeterminer<IOException>() {
@@ -401,11 +406,14 @@ public class GcsUtil {
             },
             IOException.class,
             sleeper);
-        
+
         return bucket;
       } catch (GoogleJsonResponseException e) {
-        if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {
-          return null;
+        if (errorExtractor.accessDenied(e)) {
+          throw new AccessDeniedException(path.toString(), null, e.getMessage());
+        }
+        if (errorExtractor.itemNotFound(e)) {
+          throw new FileNotFoundException(e.getMessage());
         }
         throw e;
       } catch (InterruptedException e) {
@@ -417,11 +425,10 @@ public class GcsUtil {
   }
 
   @VisibleForTesting
-  void createBucket(GcsPath path, long projectNumber, BackOff backoff, Sleeper sleeper)
+  void createBucket(String projectId, Bucket bucket, BackOff backoff, Sleeper sleeper)
         throws IOException {
     Storage.Buckets.Insert insertBucket =
-        storageClient.buckets().insert(path.getBucket());
-    insertBucket.setProject(String.valueOf(projectNumber));
+      storageClient.buckets().insert(projectId, bucket);
 
     try {
       ResilientOperation.retry(
@@ -429,8 +436,8 @@ public class GcsUtil {
         backoff,
         new RetryDeterminer<IOException>() {
           @Override
-            public boolean shouldRetry(IOException e) {
-            if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {
+          public boolean shouldRetry(IOException e) {
+            if (errorExtractor.itemAlreadyExists(e) || errorExtractor.accessDenied(e)) {
               return false;
             }
             return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
@@ -439,11 +446,19 @@ public class GcsUtil {
         IOException.class,
         sleeper);
       return;
+    } catch (GoogleJsonResponseException e) {
+      if (errorExtractor.accessDenied(e)) {
+        throw new AccessDeniedException(bucket.getName(), null, e.getMessage());
+      }
+      if (errorExtractor.itemAlreadyExists(e)) {
+        throw new FileAlreadyExistsException(bucket.getName(), null, e.getMessage());
+      }
+      throw e;
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new IOException(
         String.format("Error while attempting to create bucket gs://%s for rproject %s",
-                      path.getBucket(), projectNumber), e);
+                      bucket.getName(), projectId), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
index d824207..1f61299 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java
@@ -24,6 +24,7 @@ import com.google.api.client.http.HttpTransport;
 import com.google.api.client.json.JsonFactory;
 import com.google.api.client.json.jackson2.JacksonFactory;
 import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
 import com.google.api.services.pubsub.Pubsub;
 import com.google.api.services.storage.Storage;
 import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
@@ -33,6 +34,7 @@ import java.net.MalformedURLException;
 import java.net.URL;
 import java.security.GeneralSecurityException;
 import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PubsubOptions;
 
@@ -121,6 +123,21 @@ public class Transport {
   }
 
   /**
+   * Returns a CloudResourceManager client builder using the specified
+   * {@link CloudResourceManagerOptions}.
+   */
+  public static CloudResourceManager.Builder
+      newCloudResourceManagerClient(CloudResourceManagerOptions options) {
+    return new CloudResourceManager.Builder(getTransport(), getJsonFactory(),
+        chainHttpRequestInitializer(
+            options.getGcpCredential(),
+            // Do not log 404. It clutters the output and is possibly even required by the caller.
+            new RetryHttpRequestInitializer(ImmutableList.of(404))))
+        .setApplicationName(options.getAppName())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
+  /**
    * Returns a Cloud Storage client builder using the specified {@link GcsOptions}.
    */
   public static Storage.Builder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
index 398fa63..adf4fc2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java
@@ -57,7 +57,7 @@ public class GcsPathValidatorTest {
   @Before
   public void setUp() throws Exception {
     MockitoAnnotations.initMocks(this);
-    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true);
+    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true);
     when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod();
     GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class);
     options.setRunner(FakeRunner.class);
@@ -81,7 +81,7 @@ public class GcsPathValidatorTest {
 
   @Test
   public void testWhenBucketDoesNotExist() throws Exception {
-    when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(false);
+    when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false);
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage(
         "Could not find file gs://non-existent-bucket/location");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
index 4b76277..ea771b4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
@@ -76,6 +76,7 @@ public class ApiSurfaceTest {
           inPackage("org.apache.beam"),
           inPackage("com.google.api.client"),
           inPackage("com.google.api.services.bigquery"),
+          inPackage("com.google.api.services.cloudresourcemanager"),
           inPackage("com.google.api.services.dataflow"),
           inPackage("com.google.api.services.pubsub"),
           inPackage("com.google.api.services.storage"),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
new file mode 100644
index 0000000..23f0418
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.services.cloudresourcemanager.CloudResourceManager;
+import com.google.api.services.cloudresourcemanager.model.Project;
+import java.net.SocketTimeoutException;
+import org.apache.beam.sdk.options.CloudResourceManagerOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/** Test case for {@link GcpProjectUtil}. */
+@RunWith(JUnit4.class)
+public class GcpProjectUtilTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private static CloudResourceManagerOptions crmOptionsWithTestCredential() {
+    CloudResourceManagerOptions pipelineOptions =
+        PipelineOptionsFactory.as(CloudResourceManagerOptions.class);
+    pipelineOptions.setGcpCredential(new TestCredential());
+    return pipelineOptions;
+  }
+
+  @Test
+  public void testGetProjectNumber() throws Exception {
+    CloudResourceManagerOptions pipelineOptions = crmOptionsWithTestCredential();
+    GcpProjectUtil projectUtil = pipelineOptions.getGcpProjectUtil();
+
+    CloudResourceManager.Projects mockProjects = Mockito.mock(
+        CloudResourceManager.Projects.class);
+    CloudResourceManager mockCrm = Mockito.mock(CloudResourceManager.class);
+    projectUtil.setCrmClient(mockCrm);
+
+    CloudResourceManager.Projects.Get mockProjectsGet =
+        Mockito.mock(CloudResourceManager.Projects.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+    Project project = new Project();
+    project.setProjectNumber(5L);
+
+    when(mockCrm.projects()).thenReturn(mockProjects);
+    when(mockProjects.get(any(String.class))).thenReturn(mockProjectsGet);
+    when(mockProjectsGet.execute())
+      .thenThrow(new SocketTimeoutException("SocketException"))
+      .thenReturn(project);
+
+    assertEquals(5L, projectUtil.getProjectNumber(
+        "foo", mockBackOff, new FastNanoClockAndSleeper()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
index 9504b4c..df3bf6e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.when;
 
 import com.google.api.client.googleapis.batch.BatchRequest;
@@ -58,6 +59,7 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.net.SocketTimeoutException;
 import java.nio.channels.SeekableByteChannel;
+import java.nio.file.AccessDeniedException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -305,7 +307,7 @@ public class GcsUtilTest {
     GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/accessdeniedfile");
     GoogleJsonResponseException expectedException =
         googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
-            "Waves hand mysteriously", "These aren't the buckets your looking for");
+            "Waves hand mysteriously", "These aren't the buckets you're looking for");
 
     when(mockStorage.objects()).thenReturn(mockStorageObjects);
     when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())).thenReturn(
@@ -380,7 +382,57 @@ public class GcsUtilTest {
   }
 
   @Test
-  public void testBucketExists() throws IOException {
+  public void testCreateBucket() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.insert(
+           any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert);
+    when(mockStorageInsert.execute())
+        .thenThrow(new SocketTimeoutException("SocketException"))
+        .thenReturn(new Bucket());
+
+    gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
+  }
+
+  @Test
+  public void testCreateBucketAccessErrors() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+    GoogleJsonResponseException expectedException =
+        googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
+            "Waves hand mysteriously", "These aren't the buckets you're looking for");
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.insert(
+           any(String.class), any(Bucket.class))).thenReturn(mockStorageInsert);
+    when(mockStorageInsert.execute())
+        .thenThrow(expectedException);
+
+    thrown.expect(AccessDeniedException.class);
+
+    gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
+  }
+
+  @Test
+  public void testBucketAccessible() throws IOException {
     GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
     GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
 
@@ -398,7 +450,7 @@ public class GcsUtilTest {
         .thenThrow(new SocketTimeoutException("SocketException"))
         .thenReturn(new Bucket());
 
-    assertTrue(gcsUtil.bucketExists(GcsPath.fromComponents("testbucket", "testobject"),
+    assertTrue(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"),
         mockBackOff, new FastNanoClockAndSleeper()));
   }
 
@@ -416,14 +468,14 @@ public class GcsUtilTest {
     BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
     GoogleJsonResponseException expectedException =
         googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN,
-            "Waves hand mysteriously", "These aren't the buckets your looking for");
+            "Waves hand mysteriously", "These aren't the buckets you're looking for");
 
     when(mockStorage.buckets()).thenReturn(mockStorageObjects);
     when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
     when(mockStorageGet.execute())
         .thenThrow(expectedException);
 
-    assertFalse(gcsUtil.bucketExists(GcsPath.fromComponents("testbucket", "testobject"),
+    assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"),
         mockBackOff, new FastNanoClockAndSleeper()));
   }
 
@@ -446,11 +498,59 @@ public class GcsUtilTest {
         .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
             "It don't exist", "Nothing here to see"));
 
-    assertFalse(gcsUtil.bucketExists(GcsPath.fromComponents("testbucket", "testobject"),
+    assertFalse(gcsUtil.bucketAccessible(GcsPath.fromComponents("testbucket", "testobject"),
+        mockBackOff, new FastNanoClockAndSleeper()));
+  }
+
+  @Test
+  public void testGetBucket() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+        .thenThrow(new SocketTimeoutException("SocketException"))
+        .thenReturn(new Bucket());
+
+    assertNotNull(gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"),
         mockBackOff, new FastNanoClockAndSleeper()));
   }
 
   @Test
+  public void testGetBucketNotExists() throws IOException {
+    GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
+    GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
+
+    Storage mockStorage = Mockito.mock(Storage.class);
+    gcsUtil.setStorageClient(mockStorage);
+
+    Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
+    Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
+
+    BackOff mockBackOff = FluentBackoff.DEFAULT.backoff();
+
+    when(mockStorage.buckets()).thenReturn(mockStorageObjects);
+    when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
+    when(mockStorageGet.execute())
+        .thenThrow(googleJsonResponseException(HttpStatusCodes.STATUS_CODE_NOT_FOUND,
+            "It don't exist", "Nothing here to see"));
+
+    thrown.expect(FileNotFoundException.class);
+    thrown.expectMessage("It don't exist");
+    gcsUtil.getBucket(GcsPath.fromComponents("testbucket", "testobject"),
+                      mockBackOff, new FastNanoClockAndSleeper());
+  }
+
+  @Test
   public void testGCSChannelCloseIdempotent() throws IOException {
     SeekableByteChannel channel =
         new GoogleCloudStorageReadChannel(null, "dummybucket", "dummyobject", null,



[3/3] incubator-beam git commit: Add initial scaffolding for default bucket (take 2)

Posted by lc...@apache.org.
Add initial scaffolding for default bucket (take 2)

This closes #1014


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ee444d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ee444d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ee444d1

Branch: refs/heads/master
Commit: 2ee444d15a25a1020413196684a0b711d3d7758b
Parents: 202acd1 9d6e7c7
Author: Luke Cwik <lc...@google.com>
Authored: Mon Oct 3 08:20:39 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Oct 3 08:20:39 2016 -0700

----------------------------------------------------------------------
 pom.xml                                         |  13 +++
 .../DataflowPipelineTranslatorTest.java         |   2 +-
 .../runners/dataflow/DataflowRunnerTest.java    |   7 +-
 sdks/java/core/pom.xml                          |   5 +
 .../options/CloudResourceManagerOptions.java    |  40 +++++++
 .../apache/beam/sdk/util/GcpProjectUtil.java    | 106 ++++++++++++++++++
 .../apache/beam/sdk/util/GcsPathValidator.java  |   2 +-
 .../java/org/apache/beam/sdk/util/GcsUtil.java  |  94 ++++++++++++++--
 .../org/apache/beam/sdk/util/Transport.java     |  17 +++
 .../dataflow/util/GcsPathValidatorTest.java     |   4 +-
 .../apache/beam/sdk/util/ApiSurfaceTest.java    |   1 +
 .../beam/sdk/util/GcpProjectUtilTest.java       |  76 +++++++++++++
 .../org/apache/beam/sdk/util/GcsUtilTest.java   | 112 ++++++++++++++++++-
 13 files changed, 457 insertions(+), 22 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Add initial bucket stuff.

Posted by lc...@apache.org.
Add initial bucket stuff.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ccd9bad1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ccd9bad1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ccd9bad1

Branch: refs/heads/master
Commit: ccd9bad18dcc0a1df7ebc82f43e89ddec838c037
Parents: 202acd1
Author: sammcveety <sa...@gmail.com>
Authored: Sat Sep 17 21:19:53 2016 -0400
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Oct 3 08:20:12 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/GcsUtil.java  | 67 +++++++++++++++++++-
 1 file changed, 64 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ccd9bad1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 41c372e..4befb1a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -349,16 +349,45 @@ public class GcsUtil {
   }
 
   /**
+   * Returns the project number of the project which owns this bucket.
+   * If the bucket exists, it must be accessible otherwise the permissions
+   * exception will be propagated.
+   */
+  public long bucketOwner(GcsPath path) throws IOException {
+    return getBucket(
+        path,
+        BACKOFF_FACTORY.backoff(),
+        Sleeper.DEFAULT).getProjectNumber();
+  }
+
+  /**
+   * Creates a bucket for the provided project or propagates an error.
+   */
+  public void createBucket(GcsPath path, long projectNumber) throws IOException {
+    return createBucket(
+        path,
+        projectNumber,
+        BACKOFF_FACTORY.backoff(),
+        Sleeper.DEFAULT);
+  }
+
+  /**
    * Returns whether the GCS bucket exists. This will return false if the bucket
    * is inaccessible due to permissions.
    */
   @VisibleForTesting
   boolean bucketExists(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
+    return getBucket(path, backoff, sleeper) != null;
+  }
+
+  @VisibleForTesting
+  @Nullable
+  Storage.Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException {
     Storage.Buckets.Get getBucket =
         storageClient.buckets().get(path.getBucket());
 
       try {
-        ResilientOperation.retry(
+        Storage.Bucket bucket = ResilientOperation.retry(
             ResilientOperation.getGoogleRequestCallable(getBucket),
             backoff,
             new RetryDeterminer<IOException>() {
@@ -372,10 +401,11 @@ public class GcsUtil {
             },
             IOException.class,
             sleeper);
-        return true;
+        
+        return bucket;
       } catch (GoogleJsonResponseException e) {
         if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {
-          return false;
+          return null;
         }
         throw e;
       } catch (InterruptedException e) {
@@ -386,6 +416,37 @@ public class GcsUtil {
      }
   }
 
+  @VisibleForTesting
+  void createBucket(GcsPath path, long projectNumber, BackOff backoff, Sleeper sleeper)
+        throws IOException {
+    Storage.Buckets.Insert insertBucket =
+        storageClient.buckets().insert(path.getBucket());
+    insertBucket.setProject(String.valueOf(projectNumber));
+
+    try {
+      ResilientOperation.retry(
+        ResilientOperation.getGoogleRequestCallable(insertBucket),
+        backoff,
+        new RetryDeterminer<IOException>() {
+          @Override
+            public boolean shouldRetry(IOException e) {
+            if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) {
+              return false;
+            }
+            return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e);
+          }
+        },
+        IOException.class,
+        sleeper);
+      return;
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException(
+        String.format("Error while attempting to create bucket gs://%s for rproject %s",
+                      path.getBucket(), projectNumber), e);
+    }
+  }
+
   private static void executeBatches(List<BatchRequest> batches) throws IOException {
     ListeningExecutorService executor = MoreExecutors.listeningDecorator(
         MoreExecutors.getExitingExecutorService(