You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/08 03:11:25 UTC

[1/3] incubator-beam git commit: Register debuggee with Cloud Debugger prior to job submission to Cloud Dataflow

Repository: incubator-beam
Updated Branches:
  refs/heads/master 0be5a977d -> b864ce82a


Register debuggee with Cloud Debugger prior to job submission to Cloud Dataflow


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/532a7920
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/532a7920
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/532a7920

Branch: refs/heads/master
Commit: 532a79202110d06d48db3863ef41a6714cb32b6a
Parents: 0be5a97
Author: bchambers <bc...@google.com>
Authored: Wed Mar 2 17:38:49 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Mar 7 17:54:01 2016 -0800

----------------------------------------------------------------------
 pom.xml                                         |  1 +
 sdk/pom.xml                                     | 14 +++++
 .../sdk/options/CloudDebuggerOptions.java       |  9 +++-
 .../sdk/runners/DataflowPipelineRunner.java     | 57 ++++++++++++++++++--
 .../cloud/dataflow/sdk/util/Transport.java      |  9 ++++
 5 files changed, 85 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/532a7920/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f9dbab7..7e5e078 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@
     <avro.version>1.7.7</avro.version>
     <bigquery.version>v2-rev248-1.21.0</bigquery.version>
     <bigtable.version>0.2.3</bigtable.version>
+    <clouddebugger.version>v2-rev6-1.21.0</clouddebugger.version>
     <dataflow.version>v1b3-rev22-1.21.0</dataflow.version>
     <dataflow.proto.version>0.5.160222</dataflow.proto.version>
     <datastore.version>v1beta2-rev1-4.0.0</datastore.version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/532a7920/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/sdk/pom.xml b/sdk/pom.xml
index 8e7b463..f782b78 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -498,6 +498,20 @@
 
     <dependency>
       <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-clouddebugger</artifactId>
+      <version>${clouddebugger.version}</version>
+      <exclusions>
+        <!-- Exclude an old version of guava that is being pulled
+             in by a transitive dependency of google-api-client -->
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava-jdk5</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
       <artifactId>google-api-services-pubsub</artifactId>
       <version>${pubsub.version}</version>
       <exclusions>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/532a7920/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
index 62be4c9..2e1ad94 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
@@ -16,8 +16,11 @@
 
 package com.google.cloud.dataflow.sdk.options;
 
+import com.google.api.services.clouddebugger.v2.model.Debuggee;
 import com.google.cloud.dataflow.sdk.annotations.Experimental;
 
+import javax.annotation.Nullable;
+
 /**
  * Options for controlling Cloud Debugger.
  */
@@ -32,5 +35,9 @@ public interface CloudDebuggerOptions {
   @Description("Whether to enable the Cloud Debugger snapshot agent for the current job.")
   boolean getEnableCloudDebugger();
   void setEnableCloudDebugger(boolean enabled);
-}
 
+  @Description("The Cloud Debugger debugee to associate with. This should not be set directly.")
+  @Hidden
+  @Nullable Debuggee getDebuggee();
+  void setDebuggee(Debuggee debuggee);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/532a7920/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index 6eb6c2f..0612cca 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -23,6 +23,10 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.services.clouddebugger.v2.Clouddebugger;
+import com.google.api.services.clouddebugger.v2.model.Debuggee;
+import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest;
+import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
 import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.api.services.dataflow.model.Job;
@@ -168,6 +172,8 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
+import javax.annotation.Nullable;
+
 /**
  * A {@link PipelineRunner} that executes the operations in the
  * pipeline by first translating them to the Dataflow representation
@@ -420,6 +426,43 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
     return super.apply(new AssignWindows<>(transform), input);
   }
 
+  private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniquifier) {
+    if (!options.getEnableCloudDebugger()) {
+      return;
+    }
+    
+    if (options.getDebuggee() != null) {
+      throw new RuntimeException("Should not specify the debuggee");
+    }
+
+    Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build();
+    Debuggee debuggee = registerDebuggee(debuggerClient, uniquifier);
+    options.setDebuggee(debuggee);
+  }
+
+  private Debuggee registerDebuggee(Clouddebugger debuggerClient, String uniquifier) {
+    RegisterDebuggeeRequest registerReq = new RegisterDebuggeeRequest();
+    registerReq.setDebuggee(new Debuggee()
+        .setProject(options.getProject())
+        .setUniquifier(uniquifier)
+        .setDescription(uniquifier)
+        .setAgentVersion("google.com/cloud-dataflow-java/v1"));
+
+    try {
+      RegisterDebuggeeResponse registerResponse =
+          debuggerClient.controller().debuggees().register(registerReq).execute();
+      Debuggee debuggee = registerResponse.getDebuggee();
+      if (debuggee.getStatus() != null && debuggee.getStatus().getIsError()) {
+        throw new RuntimeException("Unable to register with the debugger: " +
+            debuggee.getStatus().getDescription().getFormat());
+      }
+
+      return debuggee;
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to register with the debugger: ", e);
+    }
+  }
+
   @Override
   public DataflowPipelineJob run(Pipeline pipeline) {
     logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
@@ -428,9 +471,7 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
         + "related to Google Compute Engine usage and other Google Cloud Services.");
 
     List<DataflowPackage> packages = options.getStager().stageFiles();
-    JobSpecification jobSpecification =
-        translator.translate(pipeline, this, packages);
-    Job newJob = jobSpecification.getJob();
+
 
     // Set a unique client_request_id in the CreateJob request.
     // This is used to ensure idempotence of job creation across retried
@@ -442,6 +483,15 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
     int randomNum = new Random().nextInt(9000) + 1000;
     String requestId = DateTimeFormat.forPattern("YYYYMMddHHmmssmmm").withZone(DateTimeZone.UTC)
         .print(DateTimeUtils.currentTimeMillis()) + "_" + randomNum;
+
+    // Try to create a debuggee ID. This must happen before the job is translated since it may
+    // update the options.
+    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+    maybeRegisterDebuggee(dataflowOptions, requestId);
+
+    JobSpecification jobSpecification =
+        translator.translate(pipeline, this, packages);
+    Job newJob = jobSpecification.getJob();
     newJob.setClientRequestId(requestId);
 
     String version = DataflowReleaseInfo.getReleaseInfo().getVersion();
@@ -450,7 +500,6 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
     newJob.getEnvironment().setUserAgent(DataflowReleaseInfo.getReleaseInfo());
     // The Dataflow Service may write to the temporary directory directly, so
     // must be verified.
-    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
     if (!Strings.isNullOrEmpty(options.getTempLocation())) {
       newJob.getEnvironment().setTempStoragePrefix(
           dataflowOptions.getPathValidator().verifyPath(options.getTempLocation()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/532a7920/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
index 7735a9e..15fe286 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
@@ -23,6 +23,7 @@ import com.google.api.client.http.HttpTransport;
 import com.google.api.client.json.JsonFactory;
 import com.google.api.client.json.jackson2.JacksonFactory;
 import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.clouddebugger.v2.Clouddebugger;
 import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.pubsub.Pubsub;
 import com.google.api.services.storage.Storage;
@@ -148,6 +149,14 @@ public class Transport {
         .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
   }
 
+  public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) {
+    return new Clouddebugger.Builder(getTransport(),
+        getJsonFactory(),
+        chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
+        .setApplicationName(options.getAppName())
+        .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+  }
+
   /**
    * Returns a Dataflow client that does not automatically retry failed
    * requests.


[3/3] incubator-beam git commit: This closes #15

Posted by da...@apache.org.
This closes #15


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b864ce82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b864ce82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b864ce82

Branch: refs/heads/master
Commit: b864ce82a2f2046de6acf064e2b92aa132a9310a
Parents: 0be5a97 890e3a0
Author: Davor Bonaci <da...@google.com>
Authored: Mon Mar 7 18:10:55 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Mar 7 18:10:55 2016 -0800

----------------------------------------------------------------------
 pom.xml                                         |  1 +
 sdk/pom.xml                                     | 14 +++++
 .../sdk/coders/protobuf/ProtoCoder.java         |  2 -
 .../sdk/options/CloudDebuggerOptions.java       |  9 +++-
 .../sdk/runners/DataflowPipelineRunner.java     | 55 ++++++++++++++++++--
 .../cloud/dataflow/sdk/util/Transport.java      |  9 ++++
 6 files changed, 83 insertions(+), 7 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Fixup CL

Posted by da...@apache.org.
Fixup CL


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/890e3a00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/890e3a00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/890e3a00

Branch: refs/heads/master
Commit: 890e3a004392fdb7bc30a7976edd57d26669bb76
Parents: 532a792
Author: Davor Bonaci <da...@google.com>
Authored: Mon Mar 7 18:03:09 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Mar 7 18:03:09 2016 -0800

----------------------------------------------------------------------
 .../google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java    | 2 --
 .../cloud/dataflow/sdk/runners/DataflowPipelineRunner.java       | 4 +---
 2 files changed, 1 insertion(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/890e3a00/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
index 9e4bb50..111c24d 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
@@ -17,8 +17,6 @@ package com.google.cloud.dataflow.sdk.coders.protobuf;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.api.services.datastore.DatastoreV1;
-import com.google.api.services.datastore.DatastoreV1.Entity;
 import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
 import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
 import com.google.cloud.dataflow.sdk.coders.Coder;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/890e3a00/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index 0612cca..c90b904 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -172,8 +172,6 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 
-import javax.annotation.Nullable;
-
 /**
  * A {@link PipelineRunner} that executes the operations in the
  * pipeline by first translating them to the Dataflow representation
@@ -430,7 +428,7 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
     if (!options.getEnableCloudDebugger()) {
       return;
     }
-    
+
     if (options.getDebuggee() != null) {
       throw new RuntimeException("Should not specify the debuggee");
     }