You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/28 14:17:26 UTC

[GitHub] [beam] mxm opened a new pull request #11556: [BEAM-9841] Support finite wait on PortableRunner's PipelineResult

mxm opened a new pull request #11556:
URL: https://github.com/apache/beam/pull/11556


   Other runners in the Python SDK support waiting for a finite amount of time on
   the PipelineResult, and so should PortableRunner.
   
   We blocked the pipeline before for local executions. Instead, we can handle the
   shutdown via a shutdown hook. This will allow us to use the feature even in
   local execution mode.
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11556: [BEAM-9841] Support finite wait on PortableRunner's PipelineResult

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11556:
URL: https://github.com/apache/beam/pull/11556#issuecomment-622977323


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11556: [BEAM-9841] Support finite wait on PortableRunner's PipelineResult

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11556:
URL: https://github.com/apache/beam/pull/11556#issuecomment-623347018


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #11556: [BEAM-9841] Support finite wait on PortableRunner's PipelineResult

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11556:
URL: https://github.com/apache/beam/pull/11556#discussion_r418947573



##########
File path: sdks/python/apache_beam/runners/portability/portable_runner.py
##########
@@ -535,7 +537,7 @@ def _last_error_message(self):
     else:
       return 'unknown error'
 
-  def wait_until_finish(self):
+  def wait_until_finish(self, duration=None):

Review comment:
       Agree that the parameter name/default is not optimally chosen. Added a docstring.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11556: [BEAM-9841] Support finite wait on PortableRunner's PipelineResult

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11556:
URL: https://github.com/apache/beam/pull/11556#issuecomment-623347136


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11556: [BEAM-9841] Support finite wait on PortableRunner's PipelineResult

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11556:
URL: https://github.com/apache/beam/pull/11556#issuecomment-622978296


   Run PythonDocker PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11556: [BEAM-9841] Support finite wait on PortableRunner's PipelineResult

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11556:
URL: https://github.com/apache/beam/pull/11556#issuecomment-623153923


   Failing tests are blocked on #11597.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #11556: [BEAM-9841] Support finite wait on PortableRunner's PipelineResult

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #11556:
URL: https://github.com/apache/beam/pull/11556#discussion_r416974908



##########
File path: sdks/python/apache_beam/runners/portability/portable_runner.py
##########
@@ -433,13 +435,12 @@ def run_pipeline(self, pipeline, options):
         state_stream,
         cleanup_callbacks)
     if cleanup_callbacks:
-      # We wait here to ensure that we run the cleanup callbacks.
+      # Register an exit handler to ensure cleanup on exit.
+      atexit.register(functools.partial(result._cleanup, on_exit=True))
       _LOGGER.info(
-          'Waiting until the pipeline has finished because the '
-          'environment "%s" has started a component necessary for the '
-          'execution.',
+          'Environment "%s" has started a component necessary for the '
+          'execution. Be sure to call wait_until_finish()',

Review comment:
       We prefer the use of Python's `with` syntax instead of calling `wait_until_finish` explicitly.

##########
File path: sdks/python/apache_beam/runners/portability/portable_runner.py
##########
@@ -535,7 +537,7 @@ def _last_error_message(self):
     else:
       return 'unknown error'
 
-  def wait_until_finish(self):
+  def wait_until_finish(self, duration=None):

Review comment:
       A comment explaining the units for duration (milliseconds?) and that `duration=None` actually means "wait forever" would be helpful. (I find this naming somewhat counter-intuitive, but it's too late to change now.)

##########
File path: sdks/python/apache_beam/runners/portability/portable_runner.py
##########
@@ -557,23 +559,47 @@ def read_messages():
     t.daemon = True
     t.start()
 
+    if duration:
+      t2 = threading.Thread(

Review comment:
       Please give `t` and `t2` descriptive variable names.

##########
File path: sdks/python/apache_beam/runners/portability/portable_runner.py
##########
@@ -557,23 +559,47 @@ def read_messages():
     t.daemon = True
     t.start()
 
+    if duration:
+      t2 = threading.Thread(
+          target=functools.partial(self._observe, t),
+          name='wait_until_finish_state_observer')
+      t2.daemon = True
+      t2.start()
+      start_time = time.time()
+      duration_secs = duration / 1000
+      while time.time() - start_time < duration_secs and t2.is_alive():
+        time.sleep(1)
+    else:
+      self._observe(t)
+
+    if self._runtime_exception:
+      raise self._runtime_exception
+
+    return self._state
+
+  def _observe(self, message_thread):
     try:
       for state_response in self._state_stream:
         self._state = self._runner_api_state_to_pipeline_state(
             state_response.state)
         if state_response.state in TERMINAL_STATES:
           # Wait for any last messages.
-          t.join(10)
+          message_thread.join(10)
           break
       if self._state != runner.PipelineState.DONE:
-        raise RuntimeError(
+        self._runtime_exception = RuntimeError(
             'Pipeline %s failed in state %s: %s' %
             (self._job_id, self._state, self._last_error_message()))
-      return self._state
+    except Exception as e:
+      self._runtime_exception = e
     finally:
       self._cleanup()
 
-  def _cleanup(self):
+  def _cleanup(self, on_exit=False):
+    if on_exit and self._cleanup_callbacks:
+      _LOGGER.info(
+          'Running cleanup on exit. If your local pipeline should continue '
+          'running, be sure to call pipeline.run().wait_until_finish().')

Review comment:
       `with` (see above comment)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org