You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/28 23:15:01 UTC

[GitHub] [beam] ibzib commented on a change in pull request #11556: [BEAM-9841] Support finite wait on PortableRunner's PipelineResult

ibzib commented on a change in pull request #11556:
URL: https://github.com/apache/beam/pull/11556#discussion_r416974908



##########
File path: sdks/python/apache_beam/runners/portability/portable_runner.py
##########
@@ -433,13 +435,12 @@ def run_pipeline(self, pipeline, options):
         state_stream,
         cleanup_callbacks)
     if cleanup_callbacks:
-      # We wait here to ensure that we run the cleanup callbacks.
+      # Register an exit handler to ensure cleanup on exit.
+      atexit.register(functools.partial(result._cleanup, on_exit=True))
       _LOGGER.info(
-          'Waiting until the pipeline has finished because the '
-          'environment "%s" has started a component necessary for the '
-          'execution.',
+          'Environment "%s" has started a component necessary for the '
+          'execution. Be sure to call wait_until_finish()',

Review comment:
       We prefer the use of Python's `with` syntax instead of calling `wait_until_finish` explicitly.

##########
File path: sdks/python/apache_beam/runners/portability/portable_runner.py
##########
@@ -535,7 +537,7 @@ def _last_error_message(self):
     else:
       return 'unknown error'
 
-  def wait_until_finish(self):
+  def wait_until_finish(self, duration=None):

Review comment:
       A comment explaining the units for duration (milliseconds?) and that `duration=None` actually means "wait forever" would be helpful. (I find this naming somewhat counter-intuitive, but it's too late to change now.)

##########
File path: sdks/python/apache_beam/runners/portability/portable_runner.py
##########
@@ -557,23 +559,47 @@ def read_messages():
     t.daemon = True
     t.start()
 
+    if duration:
+      t2 = threading.Thread(

Review comment:
       Please give `t` and `t2` descriptive variable names.

##########
File path: sdks/python/apache_beam/runners/portability/portable_runner.py
##########
@@ -557,23 +559,47 @@ def read_messages():
     t.daemon = True
     t.start()
 
+    if duration:
+      t2 = threading.Thread(
+          target=functools.partial(self._observe, t),
+          name='wait_until_finish_state_observer')
+      t2.daemon = True
+      t2.start()
+      start_time = time.time()
+      duration_secs = duration / 1000
+      while time.time() - start_time < duration_secs and t2.is_alive():
+        time.sleep(1)
+    else:
+      self._observe(t)
+
+    if self._runtime_exception:
+      raise self._runtime_exception
+
+    return self._state
+
+  def _observe(self, message_thread):
     try:
       for state_response in self._state_stream:
         self._state = self._runner_api_state_to_pipeline_state(
             state_response.state)
         if state_response.state in TERMINAL_STATES:
           # Wait for any last messages.
-          t.join(10)
+          message_thread.join(10)
           break
       if self._state != runner.PipelineState.DONE:
-        raise RuntimeError(
+        self._runtime_exception = RuntimeError(
             'Pipeline %s failed in state %s: %s' %
             (self._job_id, self._state, self._last_error_message()))
-      return self._state
+    except Exception as e:
+      self._runtime_exception = e
     finally:
       self._cleanup()
 
-  def _cleanup(self):
+  def _cleanup(self, on_exit=False):
+    if on_exit and self._cleanup_callbacks:
+      _LOGGER.info(
+          'Running cleanup on exit. If your local pipeline should continue '
+          'running, be sure to call pipeline.run().wait_until_finish().')

Review comment:
       `with` (see above comment)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org