You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/20 20:06:51 UTC

[5/5] incubator-beam git commit: Add Cython DoFnContext and Receiver stubs.

Add Cython DoFnContext and Receiver stubs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8efc231c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8efc231c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8efc231c

Branch: refs/heads/python-sdk
Commit: 8efc231c867fcf5267ba21a8a33ee306b87d9945
Parents: dc42467
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Jul 19 12:22:08 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jul 20 13:06:21 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/common.pxd | 20 ++++++++++++---
 sdks/python/apache_beam/runners/common.py  | 34 ++++++++++++++++++++++++-
 2 files changed, 50 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8efc231c/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index 480c056..e855376 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -15,17 +15,31 @@
 # limitations under the License.
 #
 
-cdef type SideOutputValue, TimestampedValue, WindowedValue
+from apache_beam.utils.windowed_value cimport WindowedValue
+
+cdef type SideOutputValue, TimestampedValue
+
 
 cdef class DoFnRunner(object):
 
   cdef object dofn
   cdef object window_fn
-  cdef object context
+  cdef object context   # TODO(robertwb): Make this a DoFnContext
   cdef object tagged_receivers
   cdef object logger
   cdef object step_name
 
-  cdef object main_receivers
+  cdef object main_receivers   # TODO(robertwb): Make this a Receiver
 
   cpdef _process_outputs(self, element, results)
+
+
+cdef class DoFnContext(object):
+  cdef object label
+  cdef object state
+  cdef WindowedValue windowed_value
+  cdef set_element(self, WindowedValue windowed_value)
+
+
+cdef class Receiver(object):
+  cdef receive(self, WindowedValue windowed_value)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8efc231c/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 3c0c3f6..059359c 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -25,8 +25,8 @@ from apache_beam.internal import util
 from apache_beam.pvalue import SideOutputValue
 from apache_beam.transforms import core
 from apache_beam.transforms.window import TimestampedValue
-from apache_beam.transforms.window import WindowedValue
 from apache_beam.transforms.window import WindowFn
+from apache_beam.utils.windowed_value import WindowedValue
 
 
 class FakeLogger(object):
@@ -188,3 +188,35 @@ class DoFnState(object):
     """Looks up the counter for this aggregator, creating one if necessary."""
     return self._counter_factory.get_aggregator_counter(
         self.step_name, aggregator)
+
+
+class DoFnContext(object):
+
+  def __init__(self, label, element=None, state=None):
+    self.label = label
+    self.state = state
+    if element is not None:
+      self.set_element(element)
+
+  def set_element(self, windowed_value):
+    self.windowed_value = windowed_value
+
+  @property
+  def element(self):
+    return self.windowed_value.value
+
+  @property
+  def timestamp(self):
+    return self.windowed_value.timestamp
+
+  @property
+  def windows(self):
+    return self.windowed_value.windows
+
+  def aggregate_to(self, aggregator, input_value):
+    self.state.counter_for(aggregator).update(input_value)
+
+
+class Receiver(object):
+  def receive(self, windowed_value):
+    raise NotImplementedError