You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/18 16:32:13 UTC
[1/2] beam git commit: Clean up DirectRunner Clock and TransformResult
Repository: beam
Updated Branches:
refs/heads/master 4f0146a7e -> d4ce94f4e
Clean up DirectRunner Clock and TransformResult
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4d98d43e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4d98d43e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4d98d43e
Branch: refs/heads/master
Commit: 4d98d43e5f3bb7b112986a7e3807a3fc2812b641
Parents: 4f0146a
Author: Charles Chen <cc...@google.com>
Authored: Tue Apr 18 15:18:40 2017 +0800
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Apr 18 09:32:00 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/direct/clock.py | 9 ++--
.../runners/direct/evaluation_context.py | 11 ++---
.../apache_beam/runners/direct/executor.py | 4 +-
.../runners/direct/transform_result.py | 45 +++++---------------
.../runners/direct/watermark_manager.py | 4 +-
5 files changed, 23 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4d98d43e/sdks/python/apache_beam/runners/direct/clock.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/clock.py b/sdks/python/apache_beam/runners/direct/clock.py
index 11e49cd..dd1800a 100644
--- a/sdks/python/apache_beam/runners/direct/clock.py
+++ b/sdks/python/apache_beam/runners/direct/clock.py
@@ -24,8 +24,7 @@ import time
class Clock(object):
- @property
- def now(self):
+ def time(self):
"""Returns the number of milliseconds since epoch."""
return int(time.time() * 1000)
@@ -36,12 +35,10 @@ class MockClock(Clock):
def __init__(self, now_in_ms):
self._now_in_ms = now_in_ms
- @property
- def now(self):
+ def time(self):
return self._now_in_ms
- @now.setter
- def now(self, value_in_ms):
+ def set_time(self, value_in_ms):
assert value_in_ms >= self._now_in_ms
self._now_in_ms = value_in_ms
http://git-wip-us.apache.org/repos/asf/beam/blob/4d98d43e/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 2169c7c..68d99d3 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -199,20 +199,21 @@ class EvaluationContext(object):
the committed bundles contained within the handled result.
"""
with self._lock:
- committed_bundles = self._commit_bundles(result.output_bundles)
+ committed_bundles = self._commit_bundles(
+ result.uncommitted_output_bundles)
self._watermark_manager.update_watermarks(
completed_bundle, result.transform, completed_timers,
committed_bundles, result.watermark_hold)
self._metrics.commit_logical(completed_bundle,
- result.logical_metric_updates())
+ result.logical_metric_updates)
# If the result is for a view, update side inputs container.
- if (result.output_bundles
- and result.output_bundles[0].pcollection
+ if (result.uncommitted_output_bundles
+ and result.uncommitted_output_bundles[0].pcollection
in self._pcollection_to_views):
for view in self._pcollection_to_views[
- result.output_bundles[0].pcollection]:
+ result.uncommitted_output_bundles[0].pcollection]:
for committed_bundle in committed_bundles:
# side_input must be materialized.
self._side_inputs_container.add_values(
http://git-wip-us.apache.org/repos/asf/beam/blob/4d98d43e/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index f6a1d7f..da06158 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -292,10 +292,10 @@ class TransformExecutor(ExecutorService.CallableTask):
with scoped_metrics_container:
result = evaluator.finish_bundle()
- result.metric_updates = metrics_container.get_cumulative()
+ result.logical_metric_updates = metrics_container.get_cumulative()
if self._evaluation_context.has_cache:
- for uncommitted_bundle in result.output_bundles:
+ for uncommitted_bundle in result.uncommitted_output_bundles:
self._evaluation_context.append_to_cache(
self._applied_transform, uncommitted_bundle.tag,
uncommitted_bundle.get_elements_iterable())
http://git-wip-us.apache.org/repos/asf/beam/blob/4d98d43e/sdks/python/apache_beam/runners/direct/transform_result.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_result.py b/sdks/python/apache_beam/runners/direct/transform_result.py
index 59597dc..8ae0aea 100644
--- a/sdks/python/apache_beam/runners/direct/transform_result.py
+++ b/sdks/python/apache_beam/runners/direct/transform_result.py
@@ -26,39 +26,14 @@ class TransformResult(object):
def __init__(self, applied_ptransform, uncommitted_output_bundles, state,
timer_update, counters, watermark_hold,
undeclared_tag_values=None):
- self._applied_ptransform = applied_ptransform
- self._uncommitted_output_bundles = uncommitted_output_bundles
- self._state = state
- self._timer_update = timer_update
- self._counters = counters
- self._watermark_hold = watermark_hold
+ self.transform = applied_ptransform
+ self.uncommitted_output_bundles = uncommitted_output_bundles
+ self.state = state
+ # TODO: timer update is currently unused.
+ self.timer_update = timer_update
+ self.counters = counters
+ self.watermark_hold = watermark_hold
# Only used when caching (materializing) all values is requested.
- self._undeclared_tag_values = undeclared_tag_values
- self.metric_updates = None
-
- def logical_metric_updates(self):
- return self.metric_updates
-
- @property
- def transform(self):
- return self._applied_ptransform
-
- @property
- def output_bundles(self):
- return self._uncommitted_output_bundles
-
- @property
- def state(self):
- return self._state
-
- @property
- def counters(self):
- return self._counters
-
- @property
- def watermark_hold(self):
- return self._watermark_hold
-
- @property
- def undeclared_tag_values(self):
- return self._undeclared_tag_values
+ self.undeclared_tag_values = undeclared_tag_values
+ # Populated by the TransformExecutor.
+ self.logical_metric_updates = None
http://git-wip-us.apache.org/repos/asf/beam/blob/4d98d43e/sdks/python/apache_beam/runners/direct/watermark_manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 1f18ef6..19d9085 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -210,12 +210,12 @@ class TransformWatermarks(object):
@property
def synchronized_processing_output_time(self):
- return self._clock.now
+ return self._clock.time()
def extract_fired_timers(self):
with self._lock:
if self._fired_timers:
- return False
+ return False
should_fire = (
self._earliest_hold < WatermarkManager.WATERMARK_POS_INF and
[2/2] beam git commit: This closes #2570
Posted by al...@apache.org.
This closes #2570
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4ce94f4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4ce94f4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4ce94f4
Branch: refs/heads/master
Commit: d4ce94f4e3aac12b78b48c34e6567f19f3eb53b2
Parents: 4f0146a 4d98d43
Author: Ahmet Altay <al...@google.com>
Authored: Tue Apr 18 09:32:04 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Apr 18 09:32:04 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/direct/clock.py | 9 ++--
.../runners/direct/evaluation_context.py | 11 ++---
.../apache_beam/runners/direct/executor.py | 4 +-
.../runners/direct/transform_result.py | 45 +++++---------------
.../runners/direct/watermark_manager.py | 4 +-
5 files changed, 23 insertions(+), 50 deletions(-)
----------------------------------------------------------------------