You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/06/08 17:09:22 UTC
[1/2] beam git commit: Avoid flakiness in data channel for empty
streams.
Repository: beam
Updated Branches:
refs/heads/master e066a9d6d -> d81ed2172
Avoid flakiness in data channel for empty streams.
As empty stream is used as end-of-stream marker, don't ever send
it as the data itself.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4ebebfdb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4ebebfdb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4ebebfdb
Branch: refs/heads/master
Commit: 4ebebfdb34de3e209c033de15e32cf67ab346d44
Parents: e066a9d
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Jun 7 23:00:43 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Jun 8 10:06:17 2017 -0700
----------------------------------------------------------------------
.../python/apache_beam/runners/worker/data_plane.py | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4ebebfdb/sdks/python/apache_beam/runners/worker/data_plane.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py
index 5edd0b4..7365db6 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -167,12 +167,18 @@ class _GrpcDataChannel(DataChannel):
yield data
def output_stream(self, instruction_id, target):
+ # TODO: Return an output stream that sends data
+ # to the Runner once a fixed size buffer is full.
+ # Currently we buffer all the data before sending
+ # any messages.
def add_to_send_queue(data):
- self._to_send.put(
- beam_fn_api_pb2.Elements.Data(
- instruction_reference=instruction_id,
- target=target,
- data=data))
+ if data:
+ self._to_send.put(
+ beam_fn_api_pb2.Elements.Data(
+ instruction_reference=instruction_id,
+ target=target,
+ data=data))
+ # End of stream marker.
self._to_send.put(
beam_fn_api_pb2.Elements.Data(
instruction_reference=instruction_id,
[2/2] beam git commit: Avoid flakiness in data channel for empty
streams.
Posted by lc...@apache.org.
Avoid flakiness in data channel for empty streams.
This closes #3324
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d81ed217
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d81ed217
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d81ed217
Branch: refs/heads/master
Commit: d81ed21726cb50962bc4fcfb57d50e11eacdcf3b
Parents: e066a9d 4ebebfd
Author: Luke Cwik <lc...@google.com>
Authored: Thu Jun 8 10:09:12 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Jun 8 10:09:12 2017 -0700
----------------------------------------------------------------------
.../python/apache_beam/runners/worker/data_plane.py | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------