You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/07/31 17:54:44 UTC
[beam] branch master updated: Gracefully stop the TestStream RPC if
the job stops.
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3b66d82 Gracefully stop the TestStream RPC if the job stops.
new a72ec0a Merge pull request #12410 from [BEAM-10603] Gracefully stop the TestStream RPC if the job stops.
3b66d82 is described below
commit 3b66d8286ab798d25112609e041dbc2cf57e8a4f
Author: Sam Rohde <sr...@google.com>
AuthorDate: Mon Jul 27 10:53:39 2020 -0700
Gracefully stop the TestStream RPC if the job stops.
Change-Id: I1b25694c2f8f52d5335da889610ae1871deb0917
---
sdks/python/apache_beam/runners/direct/evaluation_context.py | 4 ++++
sdks/python/apache_beam/runners/direct/executor.py | 2 ++
sdks/python/apache_beam/runners/direct/test_stream_impl.py | 11 +++++++----
sdks/python/apache_beam/runners/direct/transform_evaluator.py | 5 ++++-
4 files changed, 17 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 100b0e5..48a99bd 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -275,6 +275,7 @@ class EvaluationContext(object):
self._metrics = DirectMetrics()
self._lock = threading.Lock()
+ self.shutdown_requested = False
def _initialize_keyed_states(self, root_transforms, value_to_consumers):
"""Initialize user state dicts.
@@ -453,6 +454,9 @@ class EvaluationContext(object):
return self._side_inputs_container.get_value_or_block_until_ready(
side_input, task, block_until)
+ def shutdown(self):
+ self.shutdown_requested = True
+
class DirectUnmergedState(InMemoryUnmergedState):
"""UnmergedState implementation for the DirectRunner."""
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index b23f385..1c5dff5 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -492,6 +492,8 @@ class _ExecutorServiceParallelExecutor(object):
def request_shutdown(self):
self.executor_service.shutdown()
+ self.executor_service.await_completion()
+ self.evaluation_context.shutdown()
def schedule_consumers(self, committed_bundle):
# type: (_Bundle) -> None
diff --git a/sdks/python/apache_beam/runners/direct/test_stream_impl.py b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
index a32a114..2897f30 100644
--- a/sdks/python/apache_beam/runners/direct/test_stream_impl.py
+++ b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
@@ -283,7 +283,7 @@ class _TestStream(PTransform):
channel.put(_EndOfStream())
@staticmethod
- def events_from_rpc(endpoint, output_tags, coder):
+ def events_from_rpc(endpoint, output_tags, coder, evaluation_context):
"""Yields the events received from the given endpoint.
This method starts a new thread that reads from the TestStreamService and
@@ -296,13 +296,16 @@ class _TestStream(PTransform):
"""
# Shared variable with the producer queue. This shuts down the producer if
# the consumer exits early.
- is_alive = True
+ shutdown_requested = False
+
+ def is_alive():
+ return not (shutdown_requested or evaluation_context.shutdown_requested)
# The shared queue that allows the producer and consumer to communicate.
channel = Queue() # type: Queue[Union[test_stream.Event, _EndOfStream]]
event_stream = Thread(
target=_TestStream._stream_events_from_rpc,
- args=(endpoint, output_tags, coder, channel, lambda: is_alive))
+ args=(endpoint, output_tags, coder, channel, is_alive))
event_stream.setDaemon(True)
event_stream.start()
@@ -322,7 +325,7 @@ class _TestStream(PTransform):
_LOGGER.warning(
'TestStream timed out waiting for new events from service.'
' Stopping pipeline.')
- is_alive = False
+ shutdown_requested = True
raise e
@staticmethod
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 89106ad..e7b969e 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -221,7 +221,10 @@ class _TestStreamRootBundleProvider(RootBundleProvider):
# TestStreamService.
if test_stream.endpoint:
_TestStreamEvaluator.event_stream = _TestStream.events_from_rpc(
- test_stream.endpoint, test_stream.output_tags, test_stream.coder)
+ test_stream.endpoint,
+ test_stream.output_tags,
+ test_stream.coder,
+ self._evaluation_context)
else:
_TestStreamEvaluator.event_stream = (
_TestStream.events_from_script(test_stream._events))