You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/12 16:55:34 UTC
[19/50] [abbrv] beam git commit: Refine Python DirectRunner watermark
advancement behavior
Refine Python DirectRunner watermark advancement behavior
This change helps prepare for streaming pipeline execution.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3e049020
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3e049020
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3e049020
Branch: refs/heads/gearpump-runner
Commit: 3e04902008b410269b23179dc2146623ff1fbd0a
Parents: d81ed21
Author: Charles Chen <cc...@google.com>
Authored: Wed Jun 7 17:46:36 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Jun 8 10:55:44 2017 -0700
----------------------------------------------------------------------
.../runners/direct/watermark_manager.py | 20 +++++++++++++++++---
sdks/python/apache_beam/utils/timestamp.py | 5 +++++
2 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3e049020/sdks/python/apache_beam/runners/direct/watermark_manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 3a13539..0d7cd4f 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -25,6 +25,7 @@ from apache_beam import pipeline
from apache_beam import pvalue
from apache_beam.utils.timestamp import MAX_TIMESTAMP
from apache_beam.utils.timestamp import MIN_TIMESTAMP
+from apache_beam.utils.timestamp import TIME_GRANULARITY
class WatermarkManager(object):
@@ -193,9 +194,22 @@ class _TransformWatermarks(object):
def refresh(self):
with self._lock:
- pending_holder = (WatermarkManager.WATERMARK_NEG_INF
- if self._pending else
- WatermarkManager.WATERMARK_POS_INF)
+ min_pending_timestamp = WatermarkManager.WATERMARK_POS_INF
+ has_pending_elements = False
+ for input_bundle in self._pending:
+ # TODO(ccy): we can have the Bundle class keep track of the minimum
+ # timestamp so we don't have to do an iteration here.
+ for wv in input_bundle.get_elements_iterable():
+ has_pending_elements = True
+ if wv.timestamp < min_pending_timestamp:
+ min_pending_timestamp = wv.timestamp
+
+ # If there is a pending element with a certain timestamp, we can at most
+ # advance our watermark to the maximum timestamp less than that
+ # timestamp.
+ pending_holder = WatermarkManager.WATERMARK_POS_INF
+ if has_pending_elements:
+ pending_holder = min_pending_timestamp - TIME_GRANULARITY
input_watermarks = [
tw.output_watermark for tw in self._input_transform_watermarks]
http://git-wip-us.apache.org/repos/asf/beam/blob/3e049020/sdks/python/apache_beam/utils/timestamp.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py
index 5d1b48c..b3e840e 100644
--- a/sdks/python/apache_beam/utils/timestamp.py
+++ b/sdks/python/apache_beam/utils/timestamp.py
@@ -208,3 +208,8 @@ class Duration(object):
def __mod__(self, other):
other = Duration.of(other)
return Duration(micros=self.micros % other.micros)
+
+
+# The minimum granularity / interval expressible in a Timestamp / Duration
+# object.
+TIME_GRANULARITY = Duration(micros=1)