You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/22 21:57:03 UTC
[1/3] incubator-beam git commit: Fix the flaky
test_model_multiple_pcollections_partition test
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk b4187bd91 -> 3b5cd0efc
Fix the flaky test_model_multiple_pcollections_partition test
_NativeWriteEvaluator should ignore empty bundles that arrive after a
write. Write happens once the last bundle containing data is processed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/102e6773
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/102e6773
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/102e6773
Branch: refs/heads/python-sdk
Commit: 102e677375869386cc927ba649fddf6736455307
Parents: b83f12b
Author: Ahmet Altay <al...@google.com>
Authored: Thu Nov 17 17:48:13 2016 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Nov 17 17:48:13 2016 -0800
----------------------------------------------------------------------
.../runners/direct/transform_evaluator.py | 27 +++++++++++++-------
1 file changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/102e6773/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 093f183..5a79ab2 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -513,6 +513,11 @@ class _NativeWriteEvaluator(_TransformEvaluator):
return (self._execution_context.watermarks.input_watermark
== WatermarkManager.WATERMARK_POS_INF)
+ @property
+ def _has_already_produced_output(self):
+ return (self._execution_context.watermarks.output_watermark
+ == WatermarkManager.WATERMARK_POS_INF)
+
def start_bundle(self):
# state: [values]
self.state = (self._execution_context.existing_state
@@ -524,15 +529,19 @@ class _NativeWriteEvaluator(_TransformEvaluator):
def finish_bundle(self):
# TODO(altay): Do not wait until the last bundle to write in a single shard.
if self._is_final_bundle:
- if isinstance(self._sink, io.fileio.NativeTextFileSink):
- assert self._sink.num_shards in (0, 1)
- if self._sink.shard_name_template:
- self._sink.file_path += '-00000-of-00001'
- self._sink.file_path += self._sink.file_name_suffix
- self._sink.pipeline_options = self._evaluation_context.pipeline_options
- with self._sink.writer() as writer:
- for v in self.state:
- writer.Write(v.value)
+ if not self._has_already_produced_output:
+ if isinstance(self._sink, io.fileio.NativeTextFileSink):
+ assert self._sink.num_shards in (0, 1)
+ if self._sink.shard_name_template:
+ self._sink.file_path += '-00000-of-00001'
+ self._sink.file_path += self._sink.file_name_suffix
+ self._sink.pipeline_options = self._evaluation_context.pipeline_options
+ with self._sink.writer() as writer:
+ for v in self.state:
+ writer.Write(v.value)
+ else:
+ # Ignore empty bundles that arrive after the output is produced.
+ assert self.state == []
state = None
hold = WatermarkManager.WATERMARK_POS_INF
[3/3] incubator-beam git commit: This closes #1384
Posted by ke...@apache.org.
This closes #1384
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3b5cd0ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3b5cd0ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3b5cd0ef
Branch: refs/heads/python-sdk
Commit: 3b5cd0efc5f5e4b8fad34ee0d976e5e6ba501065
Parents: b4187bd d5a68ea
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 22 13:56:37 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 22 13:56:37 2016 -0800
----------------------------------------------------------------------
.../runners/direct/transform_evaluator.py | 33 ++++++++++++++------
1 file changed, 23 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: fixing reviewer comments
Posted by ke...@apache.org.
fixing reviewer comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d5a68ea8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d5a68ea8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d5a68ea8
Branch: refs/heads/python-sdk
Commit: d5a68ea839eda941b615e9bfe50107d9c9d7a53c
Parents: 102e677
Author: Ahmet Altay <al...@google.com>
Authored: Mon Nov 21 22:27:24 2016 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Nov 21 22:27:24 2016 -0800
----------------------------------------------------------------------
.../apache_beam/runners/direct/transform_evaluator.py | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d5a68ea8/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 5a79ab2..7a9a31f 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -527,9 +527,17 @@ class _NativeWriteEvaluator(_TransformEvaluator):
self.state.append(element)
def finish_bundle(self):
+ # finish_bundle will append incoming bundles in memory until all the bundles
+ # carrying data is processed. This is done to produce only a single output
+ # shard (some tests depends on this behavior). It is possible to have
+ # incoming empty bundles after the output is produced, these bundles will be
+ # ignored and would not generate additional output files.
# TODO(altay): Do not wait until the last bundle to write in a single shard.
if self._is_final_bundle:
- if not self._has_already_produced_output:
+ if self._has_already_produced_output:
+ # Ignore empty bundles that arrive after the output is produced.
+ assert self.state == []
+ else:
if isinstance(self._sink, io.fileio.NativeTextFileSink):
assert self._sink.num_shards in (0, 1)
if self._sink.shard_name_template:
@@ -539,10 +547,6 @@ class _NativeWriteEvaluator(_TransformEvaluator):
with self._sink.writer() as writer:
for v in self.state:
writer.Write(v.value)
- else:
- # Ignore empty bundles that arrive after the output is produced.
- assert self.state == []
-
state = None
hold = WatermarkManager.WATERMARK_POS_INF
else: