You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/20 20:06:51 UTC
[5/5] incubator-beam git commit: Add Cython DoFnContext and Receiver
stubs.
Add Cython DoFnContext and Receiver stubs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8efc231c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8efc231c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8efc231c
Branch: refs/heads/python-sdk
Commit: 8efc231c867fcf5267ba21a8a33ee306b87d9945
Parents: dc42467
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Jul 19 12:22:08 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Jul 20 13:06:21 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/runners/common.pxd | 20 ++++++++++++---
sdks/python/apache_beam/runners/common.py | 34 ++++++++++++++++++++++++-
2 files changed, 50 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8efc231c/sdks/python/apache_beam/runners/common.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index 480c056..e855376 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -15,17 +15,31 @@
# limitations under the License.
#
-cdef type SideOutputValue, TimestampedValue, WindowedValue
+from apache_beam.utils.windowed_value cimport WindowedValue
+
+cdef type SideOutputValue, TimestampedValue
+
cdef class DoFnRunner(object):
cdef object dofn
cdef object window_fn
- cdef object context
+ cdef object context # TODO(robertwb): Make this a DoFnContext
cdef object tagged_receivers
cdef object logger
cdef object step_name
- cdef object main_receivers
+ cdef object main_receivers # TODO(robertwb): Make this a Receiver
cpdef _process_outputs(self, element, results)
+
+
+cdef class DoFnContext(object):
+ cdef object label
+ cdef object state
+ cdef WindowedValue windowed_value
+ cdef set_element(self, WindowedValue windowed_value)
+
+
+cdef class Receiver(object):
+ cdef receive(self, WindowedValue windowed_value)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8efc231c/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 3c0c3f6..059359c 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -25,8 +25,8 @@ from apache_beam.internal import util
from apache_beam.pvalue import SideOutputValue
from apache_beam.transforms import core
from apache_beam.transforms.window import TimestampedValue
-from apache_beam.transforms.window import WindowedValue
from apache_beam.transforms.window import WindowFn
+from apache_beam.utils.windowed_value import WindowedValue
class FakeLogger(object):
@@ -188,3 +188,35 @@ class DoFnState(object):
"""Looks up the counter for this aggregator, creating one if necessary."""
return self._counter_factory.get_aggregator_counter(
self.step_name, aggregator)
+
+
+class DoFnContext(object):
+
+ def __init__(self, label, element=None, state=None):
+ self.label = label
+ self.state = state
+ if element is not None:
+ self.set_element(element)
+
+ def set_element(self, windowed_value):
+ self.windowed_value = windowed_value
+
+ @property
+ def element(self):
+ return self.windowed_value.value
+
+ @property
+ def timestamp(self):
+ return self.windowed_value.timestamp
+
+ @property
+ def windows(self):
+ return self.windowed_value.windows
+
+ def aggregate_to(self, aggregator, input_value):
+ self.state.counter_for(aggregator).update(input_value)
+
+
+class Receiver(object):
+ def receive(self, windowed_value):
+ raise NotImplementedError