You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/18 16:32:13 UTC

[1/2] beam git commit: Clean up DirectRunner Clock and TransformResult

Repository: beam
Updated Branches:
  refs/heads/master 4f0146a7e -> d4ce94f4e


Clean up DirectRunner Clock and TransformResult


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4d98d43e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4d98d43e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4d98d43e

Branch: refs/heads/master
Commit: 4d98d43e5f3bb7b112986a7e3807a3fc2812b641
Parents: 4f0146a
Author: Charles Chen <cc...@google.com>
Authored: Tue Apr 18 15:18:40 2017 +0800
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Apr 18 09:32:00 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/direct/clock.py |  9 ++--
 .../runners/direct/evaluation_context.py        | 11 ++---
 .../apache_beam/runners/direct/executor.py      |  4 +-
 .../runners/direct/transform_result.py          | 45 +++++---------------
 .../runners/direct/watermark_manager.py         |  4 +-
 5 files changed, 23 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4d98d43e/sdks/python/apache_beam/runners/direct/clock.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/clock.py b/sdks/python/apache_beam/runners/direct/clock.py
index 11e49cd..dd1800a 100644
--- a/sdks/python/apache_beam/runners/direct/clock.py
+++ b/sdks/python/apache_beam/runners/direct/clock.py
@@ -24,8 +24,7 @@ import time
 
 class Clock(object):
 
-  @property
-  def now(self):
+  def time(self):
     """Returns the number of milliseconds since epoch."""
     return int(time.time() * 1000)
 
@@ -36,12 +35,10 @@ class MockClock(Clock):
   def __init__(self, now_in_ms):
     self._now_in_ms = now_in_ms
 
-  @property
-  def now(self):
+  def time(self):
     return self._now_in_ms
 
-  @now.setter
-  def now(self, value_in_ms):
+  def set_time(self, value_in_ms):
     assert value_in_ms >= self._now_in_ms
     self._now_in_ms = value_in_ms
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4d98d43e/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 2169c7c..68d99d3 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -199,20 +199,21 @@ class EvaluationContext(object):
       the committed bundles contained within the handled result.
     """
     with self._lock:
-      committed_bundles = self._commit_bundles(result.output_bundles)
+      committed_bundles = self._commit_bundles(
+          result.uncommitted_output_bundles)
       self._watermark_manager.update_watermarks(
           completed_bundle, result.transform, completed_timers,
           committed_bundles, result.watermark_hold)
 
       self._metrics.commit_logical(completed_bundle,
-                                   result.logical_metric_updates())
+                                   result.logical_metric_updates)
 
       # If the result is for a view, update side inputs container.
-      if (result.output_bundles
-          and result.output_bundles[0].pcollection
+      if (result.uncommitted_output_bundles
+          and result.uncommitted_output_bundles[0].pcollection
           in self._pcollection_to_views):
         for view in self._pcollection_to_views[
-            result.output_bundles[0].pcollection]:
+            result.uncommitted_output_bundles[0].pcollection]:
           for committed_bundle in committed_bundles:
             # side_input must be materialized.
             self._side_inputs_container.add_values(

http://git-wip-us.apache.org/repos/asf/beam/blob/4d98d43e/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index f6a1d7f..da06158 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -292,10 +292,10 @@ class TransformExecutor(ExecutorService.CallableTask):
 
       with scoped_metrics_container:
         result = evaluator.finish_bundle()
-        result.metric_updates = metrics_container.get_cumulative()
+        result.logical_metric_updates = metrics_container.get_cumulative()
 
       if self._evaluation_context.has_cache:
-        for uncommitted_bundle in result.output_bundles:
+        for uncommitted_bundle in result.uncommitted_output_bundles:
           self._evaluation_context.append_to_cache(
               self._applied_transform, uncommitted_bundle.tag,
               uncommitted_bundle.get_elements_iterable())

http://git-wip-us.apache.org/repos/asf/beam/blob/4d98d43e/sdks/python/apache_beam/runners/direct/transform_result.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_result.py b/sdks/python/apache_beam/runners/direct/transform_result.py
index 59597dc..8ae0aea 100644
--- a/sdks/python/apache_beam/runners/direct/transform_result.py
+++ b/sdks/python/apache_beam/runners/direct/transform_result.py
@@ -26,39 +26,14 @@ class TransformResult(object):
   def __init__(self, applied_ptransform, uncommitted_output_bundles, state,
                timer_update, counters, watermark_hold,
                undeclared_tag_values=None):
-    self._applied_ptransform = applied_ptransform
-    self._uncommitted_output_bundles = uncommitted_output_bundles
-    self._state = state
-    self._timer_update = timer_update
-    self._counters = counters
-    self._watermark_hold = watermark_hold
+    self.transform = applied_ptransform
+    self.uncommitted_output_bundles = uncommitted_output_bundles
+    self.state = state
+    # TODO: timer update is currently unused.
+    self.timer_update = timer_update
+    self.counters = counters
+    self.watermark_hold = watermark_hold
     # Only used when caching (materializing) all values is requested.
-    self._undeclared_tag_values = undeclared_tag_values
-    self.metric_updates = None
-
-  def logical_metric_updates(self):
-    return self.metric_updates
-
-  @property
-  def transform(self):
-    return self._applied_ptransform
-
-  @property
-  def output_bundles(self):
-    return self._uncommitted_output_bundles
-
-  @property
-  def state(self):
-    return self._state
-
-  @property
-  def counters(self):
-    return self._counters
-
-  @property
-  def watermark_hold(self):
-    return self._watermark_hold
-
-  @property
-  def undeclared_tag_values(self):
-    return self._undeclared_tag_values
+    self.undeclared_tag_values = undeclared_tag_values
+    # Populated by the TransformExecutor.
+    self.logical_metric_updates = None

http://git-wip-us.apache.org/repos/asf/beam/blob/4d98d43e/sdks/python/apache_beam/runners/direct/watermark_manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 1f18ef6..19d9085 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -210,12 +210,12 @@ class TransformWatermarks(object):
 
   @property
   def synchronized_processing_output_time(self):
-    return self._clock.now
+    return self._clock.time()
 
   def extract_fired_timers(self):
     with self._lock:
       if self._fired_timers:
-        return  False
+        return False
 
       should_fire = (
           self._earliest_hold < WatermarkManager.WATERMARK_POS_INF and


[2/2] beam git commit: This closes #2570

Posted by al...@apache.org.
This closes #2570


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4ce94f4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4ce94f4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4ce94f4

Branch: refs/heads/master
Commit: d4ce94f4e3aac12b78b48c34e6567f19f3eb53b2
Parents: 4f0146a 4d98d43
Author: Ahmet Altay <al...@google.com>
Authored: Tue Apr 18 09:32:04 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Apr 18 09:32:04 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/direct/clock.py |  9 ++--
 .../runners/direct/evaluation_context.py        | 11 ++---
 .../apache_beam/runners/direct/executor.py      |  4 +-
 .../runners/direct/transform_result.py          | 45 +++++---------------
 .../runners/direct/watermark_manager.py         |  4 +-
 5 files changed, 23 insertions(+), 50 deletions(-)
----------------------------------------------------------------------