You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tv...@apache.org on 2022/05/11 13:29:42 UTC

[beam] branch master updated: [BEAM-5492] Python Dataflow integration tests should export the pipeline console output to Jenkins Test Result section (#17530)

This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a1810537c45 [BEAM-5492] Python Dataflow integration tests should export the pipeline console output to Jenkins Test Result section (#17530)
a1810537c45 is described below

commit a1810537c454ea8bec51d83274b08311d2a5048f
Author: andoni-guzman <89...@users.noreply.github.com>
AuthorDate: Wed May 11 08:29:32 2022 -0500

    [BEAM-5492] Python Dataflow integration tests should export the pipeline console output to Jenkins Test Result section (#17530)
---
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py      | 9 +++++++--
 sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py | 8 +++++++-
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 13cbec6dc02..49f7251c055 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -1640,7 +1640,10 @@ class DataflowPipelineResult(PipelineResult):
     if not self.is_in_terminal_state():
       if not self.has_job:
         raise IOError('Failed to get the Dataflow job id.')
-
+      consoleUrl = (
+          "Console URL: https://console.cloud.google.com/"
+          f"dataflow/jobs/<RegionId>/{self.job_id()}"
+          "?project=<ProjectId>")
       thread = threading.Thread(
           target=DataflowRunner.poll_for_job_completion,
           args=(self._runner, self, duration))
@@ -1657,13 +1660,15 @@ class DataflowPipelineResult(PipelineResult):
       # is_in_terminal_state.
       terminated = self.is_in_terminal_state()
       assert duration or terminated, (
-          'Job did not reach to a terminal state after waiting indefinitely.')
+          'Job did not reach to a terminal state after waiting indefinitely. '
+          '{}'.format(consoleUrl))
 
       # TODO(BEAM-14291): Also run this check if wait_until_finish was called
       # after the pipeline completed.
       if terminated and self.state != PipelineState.DONE:
         # TODO(BEAM-1290): Consider converting this to an error log based on
         # theresolution of the issue.
+        _LOGGER.error(consoleUrl)
         raise DataflowRuntimeException(
             'Dataflow pipeline failed. State: %s, Error:\n%s' %
             (self.state, getattr(self._runner, 'last_error_msg', None)),
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index d4743a558f3..58bc05c3950 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -55,6 +55,8 @@ class TestDataflowRunner(DataflowRunner):
       # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
       # in some cases.
       print('Worker logs: %s' % self.build_console_url(options))
+      _LOGGER.info('Console log: ')
+      _LOGGER.info(self.build_console_url(options))
 
     try:
       self.wait_until_in_state(PipelineState.RUNNING)
@@ -84,7 +86,11 @@ class TestDataflowRunner(DataflowRunner):
 
   def wait_until_in_state(self, expected_state, timeout=WAIT_IN_STATE_TIMEOUT):
     """Wait until Dataflow pipeline enters a certain state."""
+    consoleUrl = (
+        "Console URL: https://console.cloud.google.com/dataflow/"
+        f"<regionId>/{self.result.job_id()}?project=<projectId>")
     if not self.result.has_job:
+      _LOGGER.error(consoleUrl)
       raise IOError('Failed to get the Dataflow job id.')
 
     start_time = time.time()
@@ -93,7 +99,7 @@ class TestDataflowRunner(DataflowRunner):
       if self.result.is_in_terminal_state() or job_state == expected_state:
         return job_state
       time.sleep(5)
-
+    _LOGGER.error(consoleUrl)
     raise RuntimeError(
         'Timeout after %d seconds while waiting for job %s '
         'enters expected state %s. Current state is %s.' %