You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/05/15 20:53:37 UTC
[beam] branch master updated: Change TestStreamImpl to a
producer/consumer pattern
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4a69c9a Change TestStreamImpl to a producer/consumer pattern
new 578694b Merge pull request #11634 from Change TestStreamImpl to a producer/consumer pattern
4a69c9a is described below
commit 4a69c9a5cdc06b61583c7a2d09eb773fb2f8c240
Author: Sam Rohde <ro...@gmail.com>
AuthorDate: Thu May 7 14:46:22 2020 -0700
Change TestStreamImpl to a producer/consumer pattern
GRPC streaming RPCs are blocking with no non-blocking API. This changes the TestStreamImpl from RPC to a producer/consumer design with a timeout on reading from the producer queue.
Change-Id: Ib6b8dad0a22db7cb4c3971e550cef003ef035562
---
.../apache_beam/runners/direct/test_stream_impl.py | 70 ++++++++++++++++++----
1 file changed, 58 insertions(+), 12 deletions(-)
diff --git a/sdks/python/apache_beam/runners/direct/test_stream_impl.py b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
index 318b2a3..6f1516b 100644
--- a/sdks/python/apache_beam/runners/direct/test_stream_impl.py
+++ b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
@@ -30,6 +30,9 @@ from __future__ import print_function
import itertools
import logging
+from queue import Empty as EmptyException
+from queue import Queue
+from threading import Thread
from apache_beam import ParDo
from apache_beam import coders
@@ -61,6 +64,10 @@ except ImportError:
_LOGGER = logging.getLogger(__name__)
+class _EndOfStream:
+ pass
+
+
class _WatermarkController(PTransform):
"""A runner-overridable PTransform Primitive to control the watermark.
@@ -263,8 +270,12 @@ class _TestStream(PTransform):
return itertools.chain(events)
@staticmethod
- def events_from_rpc(endpoint, output_tags, coder):
+ def _stream_events_from_rpc(endpoint, output_tags, coder, channel, is_alive):
"""Yields the events received from the given endpoint.
+
+ This is the producer thread that reads events from the TestStreamService and
+ puts them onto the shared queue. At the end of the stream, an _EndOfStream
+ is placed on the channel to signify a successful end.
"""
stub_channel = grpc.insecure_channel(endpoint)
stub = beam_runner_api_pb2_grpc.TestStreamServiceStub(stub_channel)
@@ -273,20 +284,55 @@ class _TestStream(PTransform):
event_request = beam_runner_api_pb2.EventsRequest(
output_ids=[str(tag) for tag in output_tags])
- event_stream = stub.Events(event_request, timeout=30)
- try:
- while True:
- yield _TestStream.test_stream_payload_to_events(
- next(event_stream), coder)
- except StopIteration:
- return
- except grpc.RpcError as e:
- if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
+ event_stream = stub.Events(event_request)
+ for e in event_stream:
+ channel.put(_TestStream.test_stream_payload_to_events(e, coder))
+ if not is_alive():
+ return
+ channel.put(_EndOfStream())
+
+ @staticmethod
+ def events_from_rpc(endpoint, output_tags, coder):
+ """Yields the events received from the given endpoint.
+
+ This method starts a new thread that reads from the TestStreamService and
+ puts the events onto a shared queue. This method then yields all elements
+ from the queue. Unfortunately, this is necessary because the GRPC API does
+ not allow for non-blocking calls when utilizing a streaming RPC. It is
+ officially suggested from the docs to use a producer/consumer pattern to
+ handle streaming RPCs. By doing so, this gives this method control over when
+ to cancel reading from the RPC if the server takes too long to respond.
+ """
+ # Shared variable with the producer queue. This shuts down the producer if
+ # the consumer exits early.
+ is_alive = True
+
+ # The shared queue that allows the producer and consumer to communicate.
+ channel = Queue() # type: Queue[Union[test_stream.Event, _EndOfStream]]
+ event_stream = Thread(
+ target=_TestStream._stream_events_from_rpc,
+ args=(endpoint, output_tags, coder, channel, lambda: is_alive))
+ event_stream.setDaemon(True)
+ event_stream.start()
+
+ # This pumps the shared queue for events until the _EndOfStream sentinel is
+ # reached. If the TestStreamService takes longer than expected, the queue
+ # will timeout and an EmptyException will be raised. This also sets the
+ # shared is_alive sentinel to shut down the producer.
+ while True:
+ try:
+ # Raise an EmptyException if there are no events during the last timeout
+ # period.
+ event = channel.get(timeout=30)
+ if isinstance(event, _EndOfStream):
+ break
+ yield event
+ except EmptyException as e:
_LOGGER.warning(
'TestStream timed out waiting for new events from service.'
' Stopping pipeline.')
- return
- raise e
+ is_alive = False
+ raise e
@staticmethod
def test_stream_payload_to_events(payload, coder):