You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2022/05/11 13:29:42 UTC
[beam] branch master updated: [BEAM-5492] Python Dataflow integration tests should export the pipeline console output to Jenkins Test Result section (#17530)
This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a1810537c45 [BEAM-5492] Python Dataflow integration tests should export the pipeline console output to Jenkins Test Result section (#17530)
a1810537c45 is described below
commit a1810537c454ea8bec51d83274b08311d2a5048f
Author: andoni-guzman <89...@users.noreply.github.com>
AuthorDate: Wed May 11 08:29:32 2022 -0500
[BEAM-5492] Python Dataflow integration tests should export the pipeline console output to Jenkins Test Result section (#17530)
---
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 9 +++++++--
sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py | 8 +++++++-
2 files changed, 14 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 13cbec6dc02..49f7251c055 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -1640,7 +1640,10 @@ class DataflowPipelineResult(PipelineResult):
if not self.is_in_terminal_state():
if not self.has_job:
raise IOError('Failed to get the Dataflow job id.')
-
+ consoleUrl = (
+ "Console URL: https://console.cloud.google.com/"
+ f"dataflow/jobs/<RegionId>/{self.job_id()}"
+ "?project=<ProjectId>")
thread = threading.Thread(
target=DataflowRunner.poll_for_job_completion,
args=(self._runner, self, duration))
@@ -1657,13 +1660,15 @@ class DataflowPipelineResult(PipelineResult):
# is_in_terminal_state.
terminated = self.is_in_terminal_state()
assert duration or terminated, (
- 'Job did not reach to a terminal state after waiting indefinitely.')
+ 'Job did not reach to a terminal state after waiting indefinitely. '
+ '{}'.format(consoleUrl))
# TODO(BEAM-14291): Also run this check if wait_until_finish was called
# after the pipeline completed.
if terminated and self.state != PipelineState.DONE:
# TODO(BEAM-1290): Consider converting this to an error log based on
# theresolution of the issue.
+ _LOGGER.error(consoleUrl)
raise DataflowRuntimeException(
'Dataflow pipeline failed. State: %s, Error:\n%s' %
(self.state, getattr(self._runner, 'last_error_msg', None)),
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index d4743a558f3..58bc05c3950 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -55,6 +55,8 @@ class TestDataflowRunner(DataflowRunner):
# TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
# in some cases.
print('Worker logs: %s' % self.build_console_url(options))
+ _LOGGER.info('Console log: ')
+ _LOGGER.info(self.build_console_url(options))
try:
self.wait_until_in_state(PipelineState.RUNNING)
@@ -84,7 +86,11 @@ class TestDataflowRunner(DataflowRunner):
def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT):
"""Wait until Dataflow pipeline enters a certain state."""
+ consoleUrl = (
+ "Console URL: https://console.cloud.google.com/dataflow/"
+ f"<regionId>/{self.result.job_id()}?project=<projectId>")
if not self.result.has_job:
+ _LOGGER.error(consoleUrl)
raise IOError('Failed to get the Dataflow job id.')
start_time = time.time()
@@ -93,7 +99,7 @@ class TestDataflowRunner(DataflowRunner):
if self.result.is_in_terminal_state() or job_state == expected_state:
return job_state
time.sleep(5)
-
+ _LOGGER.error(consoleUrl)
raise RuntimeError(
'Timeout after %d seconds while waiting for job %s '
'enters expected state %s. Current state is %s.' %