You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/05/15 20:53:37 UTC

[beam] branch master updated: Change TestStreamImpl to a producer/consumer pattern

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a69c9a  Change TestStreamImpl to a producer/consumer pattern
     new 578694b  Merge pull request #11634 from Change TestStreamImpl to a producer/consumer pattern
4a69c9a is described below

commit 4a69c9a5cdc06b61583c7a2d09eb773fb2f8c240
Author: Sam Rohde <ro...@gmail.com>
AuthorDate: Thu May 7 14:46:22 2020 -0700

    Change TestStreamImpl to a producer/consumer pattern
    
    GRPC streaming RPCs are blocking with no non-blocking API. This changes the TestStreamImpl from RPC to a producer/consumer design with a timeout on reading from the producer queue.
    
    Change-Id: Ib6b8dad0a22db7cb4c3971e550cef003ef035562
---
 .../apache_beam/runners/direct/test_stream_impl.py | 70 ++++++++++++++++++----
 1 file changed, 58 insertions(+), 12 deletions(-)

diff --git a/sdks/python/apache_beam/runners/direct/test_stream_impl.py b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
index 318b2a3..6f1516b 100644
--- a/sdks/python/apache_beam/runners/direct/test_stream_impl.py
+++ b/sdks/python/apache_beam/runners/direct/test_stream_impl.py
@@ -30,6 +30,9 @@ from __future__ import print_function
 
 import itertools
 import logging
+from queue import Empty as EmptyException
+from queue import Queue
+from threading import Thread
 
 from apache_beam import ParDo
 from apache_beam import coders
@@ -61,6 +64,10 @@ except ImportError:
 _LOGGER = logging.getLogger(__name__)
 
 
+class _EndOfStream:
+  pass
+
+
 class _WatermarkController(PTransform):
   """A runner-overridable PTransform Primitive to control the watermark.
 
@@ -263,8 +270,12 @@ class _TestStream(PTransform):
     return itertools.chain(events)
 
   @staticmethod
-  def events_from_rpc(endpoint, output_tags, coder):
+  def _stream_events_from_rpc(endpoint, output_tags, coder, channel, is_alive):
     """Yields the events received from the given endpoint.
+
+    This is the producer thread that reads events from the TestStreamService and
+    puts them onto the shared queue. At the end of the stream, an _EndOfStream
+    is placed on the channel to signify a successful end.
     """
     stub_channel = grpc.insecure_channel(endpoint)
     stub = beam_runner_api_pb2_grpc.TestStreamServiceStub(stub_channel)
@@ -273,20 +284,55 @@ class _TestStream(PTransform):
     event_request = beam_runner_api_pb2.EventsRequest(
         output_ids=[str(tag) for tag in output_tags])
 
-    event_stream = stub.Events(event_request, timeout=30)
-    try:
-      while True:
-        yield _TestStream.test_stream_payload_to_events(
-            next(event_stream), coder)
-    except StopIteration:
-      return
-    except grpc.RpcError as e:
-      if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
+    event_stream = stub.Events(event_request)
+    for e in event_stream:
+      channel.put(_TestStream.test_stream_payload_to_events(e, coder))
+      if not is_alive():
+        return
+    channel.put(_EndOfStream())
+
+  @staticmethod
+  def events_from_rpc(endpoint, output_tags, coder):
+    """Yields the events received from the given endpoint.
+
+    This method starts a new thread that reads from the TestStreamService and
+    puts the events onto a shared queue. This method then yields all elements
+    from the queue. Unfortunately, this is necessary because the GRPC API does
+    not allow for non-blocking calls when utilizing a streaming RPC. It is
+    officially suggested from the docs to use a producer/consumer pattern to
+    handle streaming RPCs. By doing so, this gives this method control over when
+    to cancel reading from the RPC if the server takes too long to respond.
+    """
+    # Shared variable with the producer queue. This shuts down the producer if
+    # the consumer exits early.
+    is_alive = True
+
+    # The shared queue that allows the producer and consumer to communicate.
+    channel = Queue()  # type: Queue[Union[test_stream.Event, _EndOfStream]]
+    event_stream = Thread(
+        target=_TestStream._stream_events_from_rpc,
+        args=(endpoint, output_tags, coder, channel, lambda: is_alive))
+    event_stream.setDaemon(True)
+    event_stream.start()
+
+    # This pumps the shared queue for events until the _EndOfStream sentinel is
+    # reached. If the TestStreamService takes longer than expected, the queue
+    # will timeout and an EmptyException will be raised. This also sets the
+    # shared is_alive sentinel to shut down the producer.
+    while True:
+      try:
+        # Raise an EmptyException if there are no events during the last timeout
+        # period.
+        event = channel.get(timeout=30)
+        if isinstance(event, _EndOfStream):
+          break
+        yield event
+      except EmptyException as e:
         _LOGGER.warning(
             'TestStream timed out waiting for new events from service.'
             ' Stopping pipeline.')
-        return
-      raise e
+        is_alive = False
+        raise e
 
   @staticmethod
   def test_stream_payload_to_events(payload, coder):