You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/07/31 17:54:44 UTC

[beam] branch master updated: Gracefully stop the TestStream RPC if the job stops.

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b66d82  Gracefully stop the TestStream RPC if the job stops.
     new a72ec0a  Merge pull request #12410 from [BEAM-10603] Gracefully stop the TestStream RPC if the job stops.
3b66d82 is described below

commit 3b66d8286ab798d25112609e041dbc2cf57e8a4f
Author: Sam Rohde <sr...@google.com>
AuthorDate: Mon Jul 27 10:53:39 2020 -0700

    Gracefully stop the TestStream RPC if the job stops.
    
    Change-Id: I1b25694c2f8f52d5335da889610ae1871deb0917
---
 sdks/python/apache_beam/runners/direct/evaluation_context.py  |  4 ++++
 sdks/python/apache_beam/runners/direct/executor.py            |  2 ++
 sdks/python/apache_beam/runners/direct/test_stream_impl.py    | 11 +++++++----
 sdks/python/apache_beam/runners/direct/transform_evaluator.py |  5 ++++-
 4 files changed, 17 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 100b0e5..48a99bd 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -275,6 +275,7 @@ class EvaluationContext(object):
     self._metrics = DirectMetrics()
 
     self._lock = threading.Lock()
+    self.shutdown_requested = False
 
   def _initialize_keyed_states(self, root_transforms, value_to_consumers):
     """Initialize user state dicts.
@@ -453,6 +454,9 @@ class EvaluationContext(object):
     return self._side_inputs_container.get_value_or_block_until_ready(
         side_input, task, block_until)
 
+  def shutdown(self):
+    self.shutdown_requested = True
+
 
 class DirectUnmergedState(InMemoryUnmergedState):
   """UnmergedState implementation for the DirectRunner."""
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index b23f385..1c5dff5 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -492,6 +492,8 @@ class _ExecutorServiceParallelExecutor(object):
 
   def request_shutdown(self):
     self.executor_service.shutdown()
+    self.executor_service.await_completion()
+    self.evaluation_context.shutdown()
 
   def schedule_consumers(self, committed_bundle):
     # type: (_Bundle) -> None
diff --git a/sdks/python/apache_beam/runners/direct/test_stream_impl.py b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
index a32a114..2897f30 100644
--- a/sdks/python/apache_beam/runners/direct/test_stream_impl.py
+++ b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
@@ -283,7 +283,7 @@ class _TestStream(PTransform):
     channel.put(_EndOfStream())
 
   @staticmethod
-  def events_from_rpc(endpoint, output_tags, coder):
+  def events_from_rpc(endpoint, output_tags, coder, evaluation_context):
     """Yields the events received from the given endpoint.
 
     This method starts a new thread that reads from the TestStreamService and
@@ -296,13 +296,16 @@ class _TestStream(PTransform):
     """
     # Shared variable with the producer queue. This shuts down the producer if
     # the consumer exits early.
-    is_alive = True
+    shutdown_requested = False
+
+    def is_alive():
+      return not (shutdown_requested or evaluation_context.shutdown_requested)
 
     # The shared queue that allows the producer and consumer to communicate.
     channel = Queue()  # type: Queue[Union[test_stream.Event, _EndOfStream]]
     event_stream = Thread(
         target=_TestStream._stream_events_from_rpc,
-        args=(endpoint, output_tags, coder, channel, lambda: is_alive))
+        args=(endpoint, output_tags, coder, channel, is_alive))
     event_stream.setDaemon(True)
     event_stream.start()
 
@@ -322,7 +325,7 @@ class _TestStream(PTransform):
         _LOGGER.warning(
             'TestStream timed out waiting for new events from service.'
             ' Stopping pipeline.')
-        is_alive = False
+        shutdown_requested = True
         raise e
 
   @staticmethod
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 89106ad..e7b969e 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -221,7 +221,10 @@ class _TestStreamRootBundleProvider(RootBundleProvider):
     # TestStreamService.
     if test_stream.endpoint:
       _TestStreamEvaluator.event_stream = _TestStream.events_from_rpc(
-          test_stream.endpoint, test_stream.output_tags, test_stream.coder)
+          test_stream.endpoint,
+          test_stream.output_tags,
+          test_stream.coder,
+          self._evaluation_context)
     else:
       _TestStreamEvaluator.event_stream = (
           _TestStream.events_from_script(test_stream._events))