You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 19:53:25 UTC
[29/50] [abbrv] beam git commit: Accept Region in Dataflow Monitoring
Page URL
Accept Region in Dataflow Monitoring Page URL
Update Google Cloud Dataflow FE URLs from the Dataflow Runners to
regionalized paths.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/111603a9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/111603a9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/111603a9
Branch: refs/heads/DSL_SQL
Commit: 111603a9952f415fa1386046f7a2d3bde5b6532d
Parents: 2d5b6d7
Author: Robert Burke <ro...@frantil.com>
Authored: Tue Jun 27 15:41:56 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Jul 18 14:49:56 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowPipelineJob.java | 14 ++++++++++++--
.../beam/runners/dataflow/DataflowRunner.java | 3 ++-
.../beam/runners/dataflow/util/MonitoringUtil.java | 16 +++++++++++++---
.../dataflow/BatchStatefulParDoOverridesTest.java | 1 +
.../dataflow/DataflowPipelineTranslatorTest.java | 1 +
.../runners/dataflow/internal/apiclient.py | 7 +++++--
.../runners/dataflow/test_dataflow_runner.py | 5 +++--
7 files changed, 37 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/111603a9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index e30d426..e736373 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -169,6 +169,13 @@ public class DataflowPipelineJob implements PipelineResult {
}
/**
+ * Get the region this job exists in.
+ */
+ public String getRegion() {
+ return dataflowOptions.getRegion();
+ }
+
+ /**
* Returns a new {@link DataflowPipelineJob} for the job that replaced this one, if applicable.
*
* @throws IllegalStateException if called before the job has terminated or if the job terminated
@@ -344,7 +351,9 @@ public class DataflowPipelineJob implements PipelineResult {
getJobId(),
getReplacedByJob().getJobId(),
MonitoringUtil.getJobMonitoringPageURL(
- getReplacedByJob().getProjectId(), getReplacedByJob().getJobId()));
+ getReplacedByJob().getProjectId(),
+ getRegion(),
+ getReplacedByJob().getJobId()));
break;
default:
LOG.info("Job {} failed with status {}.", getJobId(), state);
@@ -422,7 +431,8 @@ public class DataflowPipelineJob implements PipelineResult {
"Failed to cancel job in state %s, "
+ "please go to the Developers Console to cancel it manually: %s",
state,
- MonitoringUtil.getJobMonitoringPageURL(getProjectId(), getJobId()));
+ MonitoringUtil.getJobMonitoringPageURL(
+ getProjectId(), getRegion(), getJobId()));
LOG.warn(errorMsg);
throw new IOException(errorMsg, e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/111603a9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 8935759..57a5ea5 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -679,7 +679,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
LOG.info("To access the Dataflow monitoring console, please navigate to {}",
- MonitoringUtil.getJobMonitoringPageURL(options.getProject(), jobResult.getId()));
+ MonitoringUtil.getJobMonitoringPageURL(
+ options.getProject(), options.getRegion(), jobResult.getId()));
System.out.println("Submitted job: " + jobResult.getId());
LOG.info("To cancel the job using the 'gcloud' tool, run:\n> {}",
http://git-wip-us.apache.org/repos/asf/beam/blob/111603a9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
index 759387c..780a979 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java
@@ -180,14 +180,24 @@ public class MonitoringUtil {
return allMessages;
}
+ /**
+ * @deprecated this method defaults the region to "us-central1". Prefer using the overload with
+ * an explicit regionId parameter.
+ */
+ @Deprecated
public static String getJobMonitoringPageURL(String projectName, String jobId) {
+ return getJobMonitoringPageURL(projectName, "us-central1", jobId);
+ }
+
+ public static String getJobMonitoringPageURL(String projectName, String regionId, String jobId) {
try {
// Project name is allowed in place of the project id: the user will be redirected to a URL
// that has the project name replaced with project id.
return String.format(
- "https://console.developers.google.com/project/%s/dataflow/job/%s",
- URLEncoder.encode(projectName, "UTF-8"),
- URLEncoder.encode(jobId, "UTF-8"));
+ "https://console.cloud.google.com/dataflow/jobsDetail/locations/%s/jobs/%s?project=%s",
+ URLEncoder.encode(regionId, "UTF-8"),
+ URLEncoder.encode(jobId, "UTF-8"),
+ URLEncoder.encode(projectName, "UTF-8"));
} catch (UnsupportedEncodingException e) {
// Should never happen.
throw new AssertionError("UTF-8 encoding is not supported by the environment", e);
http://git-wip-us.apache.org/repos/asf/beam/blob/111603a9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
index d2ab357..e62a8b8 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java
@@ -161,6 +161,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable {
options.setGcpCredential(new TestCredential());
options.setJobName("some-job-name");
options.setProject("some-project");
+ options.setRegion("some-region");
options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString());
options.setFilesToStage(new LinkedList<String>());
options.setGcsUtil(mockGcsUtil);
http://git-wip-us.apache.org/repos/asf/beam/blob/111603a9/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 43b2788..9a0bdf8 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -200,6 +200,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
options.setGcpCredential(new TestCredential());
options.setJobName("some-job-name");
options.setProject("some-project");
+ options.setRegion("some-region");
options.setTempLocation(GcsPath.fromComponents("somebucket", "some/path").toString());
options.setFilesToStage(new LinkedList<String>());
options.setDataflowClient(buildMockDataflow(new IsValidCreateRequest()));
http://git-wip-us.apache.org/repos/asf/beam/blob/111603a9/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 33dfe19..dcaf74e 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -500,8 +500,11 @@ class DataflowApplicationClient(object):
logging.info('Created job with id: [%s]', response.id)
logging.info(
'To access the Dataflow monitoring console, please navigate to '
- 'https://console.developers.google.com/project/%s/dataflow/job/%s',
- self.google_cloud_options.project, response.id)
+ 'https://console.cloud.google.com/dataflow/jobsDetail'
+ '/locations/%s/jobs/%s?project=%s',
+ self.google_cloud_options.region,
+ response.id,
+ self.google_cloud_options.project)
return response
http://git-wip-us.apache.org/repos/asf/beam/blob/111603a9/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index b339882..96e6a66 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -38,12 +38,13 @@ class TestDataflowRunner(DataflowRunner):
self.result = super(TestDataflowRunner, self).run(pipeline)
if self.result.has_job:
project = pipeline._options.view_as(GoogleCloudOptions).project
+ region_id = pipeline._options.view_as(GoogleCloudOptions).region
job_id = self.result.job_id()
# TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
# in some cases.
print (
- 'Found: https://console.cloud.google.com/dataflow/job/%s?project=%s' %
- (job_id, project))
+ 'Found: https://console.cloud.google.com/dataflow/jobsDetail'
+ '/locations/%s/jobs/%s?project=%s' % (region_id, job_id, project))
self.result.wait_until_finish()
if on_success_matcher: