You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/22 21:57:03 UTC

[1/3] incubator-beam git commit: Fix the flaky test_model_multiple_pcollections_partition test

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk b4187bd91 -> 3b5cd0efc


Fix the flaky test_model_multiple_pcollections_partition test

_NativeWriteEvaluator should ignore empty bundles that arrive after a
write. Write happens once the last bundle containing data is processed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/102e6773
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/102e6773
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/102e6773

Branch: refs/heads/python-sdk
Commit: 102e677375869386cc927ba649fddf6736455307
Parents: b83f12b
Author: Ahmet Altay <al...@google.com>
Authored: Thu Nov 17 17:48:13 2016 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Nov 17 17:48:13 2016 -0800

----------------------------------------------------------------------
 .../runners/direct/transform_evaluator.py       | 27 +++++++++++++-------
 1 file changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/102e6773/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 093f183..5a79ab2 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -513,6 +513,11 @@ class _NativeWriteEvaluator(_TransformEvaluator):
     return (self._execution_context.watermarks.input_watermark
             == WatermarkManager.WATERMARK_POS_INF)
 
+  @property
+  def _has_already_produced_output(self):
+    return (self._execution_context.watermarks.output_watermark
+            == WatermarkManager.WATERMARK_POS_INF)
+
   def start_bundle(self):
     # state: [values]
     self.state = (self._execution_context.existing_state
@@ -524,15 +529,19 @@ class _NativeWriteEvaluator(_TransformEvaluator):
   def finish_bundle(self):
     # TODO(altay): Do not wait until the last bundle to write in a single shard.
     if self._is_final_bundle:
-      if isinstance(self._sink, io.fileio.NativeTextFileSink):
-        assert self._sink.num_shards in (0, 1)
-        if self._sink.shard_name_template:
-          self._sink.file_path += '-00000-of-00001'
-          self._sink.file_path += self._sink.file_name_suffix
-      self._sink.pipeline_options = self._evaluation_context.pipeline_options
-      with self._sink.writer() as writer:
-        for v in self.state:
-          writer.Write(v.value)
+      if not self._has_already_produced_output:
+        if isinstance(self._sink, io.fileio.NativeTextFileSink):
+          assert self._sink.num_shards in (0, 1)
+          if self._sink.shard_name_template:
+            self._sink.file_path += '-00000-of-00001'
+            self._sink.file_path += self._sink.file_name_suffix
+        self._sink.pipeline_options = self._evaluation_context.pipeline_options
+        with self._sink.writer() as writer:
+          for v in self.state:
+            writer.Write(v.value)
+      else:
+        # Ignore empty bundles that arrive after the output is produced.
+        assert self.state == []
 
       state = None
       hold = WatermarkManager.WATERMARK_POS_INF


[3/3] incubator-beam git commit: This closes #1384

Posted by ke...@apache.org.
This closes #1384


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3b5cd0ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3b5cd0ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3b5cd0ef

Branch: refs/heads/python-sdk
Commit: 3b5cd0efc5f5e4b8fad34ee0d976e5e6ba501065
Parents: b4187bd d5a68ea
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 22 13:56:37 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 22 13:56:37 2016 -0800

----------------------------------------------------------------------
 .../runners/direct/transform_evaluator.py       | 33 ++++++++++++++------
 1 file changed, 23 insertions(+), 10 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: fixing reviewer comments

Posted by ke...@apache.org.
fixing reviewer comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d5a68ea8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d5a68ea8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d5a68ea8

Branch: refs/heads/python-sdk
Commit: d5a68ea839eda941b615e9bfe50107d9c9d7a53c
Parents: 102e677
Author: Ahmet Altay <al...@google.com>
Authored: Mon Nov 21 22:27:24 2016 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Nov 21 22:27:24 2016 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/direct/transform_evaluator.py | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d5a68ea8/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 5a79ab2..7a9a31f 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -527,9 +527,17 @@ class _NativeWriteEvaluator(_TransformEvaluator):
     self.state.append(element)
 
   def finish_bundle(self):
+    # finish_bundle will append incoming bundles in memory until all the bundles
+    # carrying data is processed. This is done to produce only a single output
+    # shard (some tests depends on this behavior). It is possible to have
+    # incoming empty bundles after the output is produced, these bundles will be
+    # ignored and would not generate additional output files.
     # TODO(altay): Do not wait until the last bundle to write in a single shard.
     if self._is_final_bundle:
-      if not self._has_already_produced_output:
+      if self._has_already_produced_output:
+        # Ignore empty bundles that arrive after the output is produced.
+        assert self.state == []
+      else:
         if isinstance(self._sink, io.fileio.NativeTextFileSink):
           assert self._sink.num_shards in (0, 1)
           if self._sink.shard_name_template:
@@ -539,10 +547,6 @@ class _NativeWriteEvaluator(_TransformEvaluator):
         with self._sink.writer() as writer:
           for v in self.state:
             writer.Write(v.value)
-      else:
-        # Ignore empty bundles that arrive after the output is produced.
-        assert self.state == []
-
       state = None
       hold = WatermarkManager.WATERMARK_POS_INF
     else: