You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/06/08 17:09:22 UTC

[1/2] beam git commit: Avoid flakiness in data channel for empty streams.

Repository: beam
Updated Branches:
  refs/heads/master e066a9d6d -> d81ed2172


Avoid flakiness in data channel for empty streams.

As empty stream is used as end-of-stream marker, don't ever send
it as the data itself.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4ebebfdb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4ebebfdb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4ebebfdb

Branch: refs/heads/master
Commit: 4ebebfdb34de3e209c033de15e32cf67ab346d44
Parents: e066a9d
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Jun 7 23:00:43 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Jun 8 10:06:17 2017 -0700

----------------------------------------------------------------------
 .../python/apache_beam/runners/worker/data_plane.py | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4ebebfdb/sdks/python/apache_beam/runners/worker/data_plane.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py
index 5edd0b4..7365db6 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -167,12 +167,18 @@ class _GrpcDataChannel(DataChannel):
         yield data
 
   def output_stream(self, instruction_id, target):
+    # TODO: Return an output stream that sends data
+    # to the Runner once a fixed size buffer is full.
+    # Currently we buffer all the data before sending
+    # any messages.
     def add_to_send_queue(data):
-      self._to_send.put(
-          beam_fn_api_pb2.Elements.Data(
-              instruction_reference=instruction_id,
-              target=target,
-              data=data))
+      if data:
+        self._to_send.put(
+            beam_fn_api_pb2.Elements.Data(
+                instruction_reference=instruction_id,
+                target=target,
+                data=data))
+      # End of stream marker.
       self._to_send.put(
           beam_fn_api_pb2.Elements.Data(
               instruction_reference=instruction_id,


[2/2] beam git commit: Avoid flakiness in data channel for empty streams.

Posted by lc...@apache.org.
Avoid flakiness in data channel for empty streams.

This closes #3324


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d81ed217
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d81ed217
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d81ed217

Branch: refs/heads/master
Commit: d81ed21726cb50962bc4fcfb57d50e11eacdcf3b
Parents: e066a9d 4ebebfd
Author: Luke Cwik <lc...@google.com>
Authored: Thu Jun 8 10:09:12 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Jun 8 10:09:12 2017 -0700

----------------------------------------------------------------------
 .../python/apache_beam/runners/worker/data_plane.py | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------