You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/08 03:11:25 UTC
[1/3] incubator-beam git commit: Register debuggee with Cloud
Debugger prior to job submission to Cloud Dataflow
Repository: incubator-beam
Updated Branches:
refs/heads/master 0be5a977d -> b864ce82a
Register debuggee with Cloud Debugger prior to job submission to Cloud Dataflow
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/532a7920
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/532a7920
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/532a7920
Branch: refs/heads/master
Commit: 532a79202110d06d48db3863ef41a6714cb32b6a
Parents: 0be5a97
Author: bchambers <bc...@google.com>
Authored: Wed Mar 2 17:38:49 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Mar 7 17:54:01 2016 -0800
----------------------------------------------------------------------
pom.xml | 1 +
sdk/pom.xml | 14 +++++
.../sdk/options/CloudDebuggerOptions.java | 9 +++-
.../sdk/runners/DataflowPipelineRunner.java | 57 ++++++++++++++++++--
.../cloud/dataflow/sdk/util/Transport.java | 9 ++++
5 files changed, 85 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/532a7920/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f9dbab7..7e5e078 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@
<avro.version>1.7.7</avro.version>
<bigquery.version>v2-rev248-1.21.0</bigquery.version>
<bigtable.version>0.2.3</bigtable.version>
+ <clouddebugger.version>v2-rev6-1.21.0</clouddebugger.version>
<dataflow.version>v1b3-rev22-1.21.0</dataflow.version>
<dataflow.proto.version>0.5.160222</dataflow.proto.version>
<datastore.version>v1beta2-rev1-4.0.0</datastore.version>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/532a7920/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/sdk/pom.xml b/sdk/pom.xml
index 8e7b463..f782b78 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -498,6 +498,20 @@
<dependency>
<groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-clouddebugger</artifactId>
+ <version>${clouddebugger.version}</version>
+ <exclusions>
+ <!-- Exclude an old version of guava that is being pulled
+ in by a transitive dependency of google-api-client -->
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava-jdk5</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
<version>${pubsub.version}</version>
<exclusions>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/532a7920/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
index 62be4c9..2e1ad94 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
@@ -16,8 +16,11 @@
package com.google.cloud.dataflow.sdk.options;
+import com.google.api.services.clouddebugger.v2.model.Debuggee;
import com.google.cloud.dataflow.sdk.annotations.Experimental;
+import javax.annotation.Nullable;
+
/**
* Options for controlling Cloud Debugger.
*/
@@ -32,5 +35,9 @@ public interface CloudDebuggerOptions {
@Description("Whether to enable the Cloud Debugger snapshot agent for the current job.")
boolean getEnableCloudDebugger();
void setEnableCloudDebugger(boolean enabled);
-}
+ @Description("The Cloud Debugger debugee to associate with. This should not be set directly.")
+ @Hidden
+ @Nullable Debuggee getDebuggee();
+ void setDebuggee(Debuggee debuggee);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/532a7920/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index 6eb6c2f..0612cca 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -23,6 +23,10 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.services.clouddebugger.v2.Clouddebugger;
+import com.google.api.services.clouddebugger.v2.model.Debuggee;
+import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest;
+import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
@@ -168,6 +172,8 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import javax.annotation.Nullable;
+
/**
* A {@link PipelineRunner} that executes the operations in the
* pipeline by first translating them to the Dataflow representation
@@ -420,6 +426,43 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
return super.apply(new AssignWindows<>(transform), input);
}
+ private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniquifier) {
+ if (!options.getEnableCloudDebugger()) {
+ return;
+ }
+
+ if (options.getDebuggee() != null) {
+ throw new RuntimeException("Should not specify the debuggee");
+ }
+
+ Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build();
+ Debuggee debuggee = registerDebuggee(debuggerClient, uniquifier);
+ options.setDebuggee(debuggee);
+ }
+
+ private Debuggee registerDebuggee(Clouddebugger debuggerClient, String uniquifier) {
+ RegisterDebuggeeRequest registerReq = new RegisterDebuggeeRequest();
+ registerReq.setDebuggee(new Debuggee()
+ .setProject(options.getProject())
+ .setUniquifier(uniquifier)
+ .setDescription(uniquifier)
+ .setAgentVersion("google.com/cloud-dataflow-java/v1"));
+
+ try {
+ RegisterDebuggeeResponse registerResponse =
+ debuggerClient.controller().debuggees().register(registerReq).execute();
+ Debuggee debuggee = registerResponse.getDebuggee();
+ if (debuggee.getStatus() != null && debuggee.getStatus().getIsError()) {
+ throw new RuntimeException("Unable to register with the debugger: " +
+ debuggee.getStatus().getDescription().getFormat());
+ }
+
+ return debuggee;
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to register with the debugger: ", e);
+ }
+ }
+
@Override
public DataflowPipelineJob run(Pipeline pipeline) {
logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
@@ -428,9 +471,7 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
+ "related to Google Compute Engine usage and other Google Cloud Services.");
List<DataflowPackage> packages = options.getStager().stageFiles();
- JobSpecification jobSpecification =
- translator.translate(pipeline, this, packages);
- Job newJob = jobSpecification.getJob();
+
// Set a unique client_request_id in the CreateJob request.
// This is used to ensure idempotence of job creation across retried
@@ -442,6 +483,15 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
int randomNum = new Random().nextInt(9000) + 1000;
String requestId = DateTimeFormat.forPattern("YYYYMMddHHmmssmmm").withZone(DateTimeZone.UTC)
.print(DateTimeUtils.currentTimeMillis()) + "_" + randomNum;
+
+ // Try to create a debuggee ID. This must happen before the job is translated since it may
+ // update the options.
+ DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+ maybeRegisterDebuggee(dataflowOptions, requestId);
+
+ JobSpecification jobSpecification =
+ translator.translate(pipeline, this, packages);
+ Job newJob = jobSpecification.getJob();
newJob.setClientRequestId(requestId);
String version = DataflowReleaseInfo.getReleaseInfo().getVersion();
@@ -450,7 +500,6 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
newJob.getEnvironment().setUserAgent(DataflowReleaseInfo.getReleaseInfo());
// The Dataflow Service may write to the temporary directory directly, so
// must be verified.
- DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
if (!Strings.isNullOrEmpty(options.getTempLocation())) {
newJob.getEnvironment().setTempStoragePrefix(
dataflowOptions.getPathValidator().verifyPath(options.getTempLocation()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/532a7920/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
index 7735a9e..15fe286 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
@@ -23,6 +23,7 @@ import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.clouddebugger.v2.Clouddebugger;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.storage.Storage;
@@ -148,6 +149,14 @@ public class Transport {
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
}
+ public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) {
+ return new Clouddebugger.Builder(getTransport(),
+ getJsonFactory(),
+ chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
+ .setApplicationName(options.getAppName())
+ .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+ }
+
/**
* Returns a Dataflow client that does not automatically retry failed
* requests.
[3/3] incubator-beam git commit: This closes #15
Posted by da...@apache.org.
This closes #15
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b864ce82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b864ce82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b864ce82
Branch: refs/heads/master
Commit: b864ce82a2f2046de6acf064e2b92aa132a9310a
Parents: 0be5a97 890e3a0
Author: Davor Bonaci <da...@google.com>
Authored: Mon Mar 7 18:10:55 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Mar 7 18:10:55 2016 -0800
----------------------------------------------------------------------
pom.xml | 1 +
sdk/pom.xml | 14 +++++
.../sdk/coders/protobuf/ProtoCoder.java | 2 -
.../sdk/options/CloudDebuggerOptions.java | 9 +++-
.../sdk/runners/DataflowPipelineRunner.java | 55 ++++++++++++++++++--
.../cloud/dataflow/sdk/util/Transport.java | 9 ++++
6 files changed, 83 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Fixup CL
Posted by da...@apache.org.
Fixup CL
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/890e3a00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/890e3a00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/890e3a00
Branch: refs/heads/master
Commit: 890e3a004392fdb7bc30a7976edd57d26669bb76
Parents: 532a792
Author: Davor Bonaci <da...@google.com>
Authored: Mon Mar 7 18:03:09 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Mar 7 18:03:09 2016 -0800
----------------------------------------------------------------------
.../google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java | 2 --
.../cloud/dataflow/sdk/runners/DataflowPipelineRunner.java | 4 +---
2 files changed, 1 insertion(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/890e3a00/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
index 9e4bb50..111c24d 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/ProtoCoder.java
@@ -17,8 +17,6 @@ package com.google.cloud.dataflow.sdk.coders.protobuf;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.api.services.datastore.DatastoreV1;
-import com.google.api.services.datastore.DatastoreV1.Entity;
import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/890e3a00/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index 0612cca..c90b904 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -172,8 +172,6 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-import javax.annotation.Nullable;
-
/**
* A {@link PipelineRunner} that executes the operations in the
* pipeline by first translating them to the Dataflow representation
@@ -430,7 +428,7 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
if (!options.getEnableCloudDebugger()) {
return;
}
-
+
if (options.getDebuggee() != null) {
throw new RuntimeException("Should not specify the debuggee");
}