You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/08/03 20:53:25 UTC
[1/3] incubator-beam git commit: Implement add_input for all
CombineFns.
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk e834fa82b -> 65152cab8
Implement add_input for all CombineFns.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3ebf28c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3ebf28c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3ebf28c6
Branch: refs/heads/python-sdk
Commit: 3ebf28c6e0d17af3720076e33f88a0f126a89059
Parents: e834fa8
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jul 26 01:15:55 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Aug 2 15:52:28 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/combiners.py | 16 ++++++++--------
sdks/python/apache_beam/transforms/core.py | 6 +++---
2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ebf28c6/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index 155dcc6..c3f0da1 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -132,6 +132,9 @@ class CountCombineFn(core.CombineFn):
def create_accumulator(self):
return 0
+ def add_input(self, accumulator, element):
+ return accumulator + 1
+
def add_inputs(self, accumulator, elements):
return accumulator + len(elements)
@@ -425,9 +428,9 @@ class _TupleCombineFnBase(core.CombineFn):
class TupleCombineFn(_TupleCombineFnBase):
- def add_inputs(self, accumulator, elements):
- return [c.add_inputs(a, e)
- for c, a, e in zip(self._combiners, accumulator, zip(*elements))]
+ def add_input(self, accumulator, element):
+ return [c.add_input(a, e)
+ for c, a, e in zip(self._combiners, accumulator, element)]
def with_common_input(self):
return SingleInputTupleCombineFn(*self._combiners)
@@ -435,8 +438,8 @@ class TupleCombineFn(_TupleCombineFnBase):
class SingleInputTupleCombineFn(_TupleCombineFnBase):
- def add_inputs(self, accumulator, elements):
- return [c.add_inputs(a, elements)
+ def add_input(self, accumulator, element):
+ return [c.add_input(a, element)
for c, a in zip(self._combiners, accumulator)]
@@ -522,9 +525,6 @@ def curry_combine_fn(fn, args, kwargs):
def add_input(self, accumulator, element):
return fn.add_input(accumulator, element, *args, **kwargs)
- def add_inputs(self, accumulator, elements):
- return fn.add_inputs(accumulator, elements, *args, **kwargs)
-
def merge_accumulators(self, accumulators):
return fn.merge_accumulators(accumulators, *args, **kwargs)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ebf28c6/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 38b9cd2..da26205 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -270,7 +270,7 @@ class CombineFn(WithTypeHints):
1. Input values are partitioned into one or more batches.
2. For each batch, the create_accumulator method is invoked to create a fresh
initial "accumulator" value representing the combination of zero values.
- 3. For each input value in the batch, the add_inputs method is invoked to
+ 3. For each input value in the batch, the add_input method is invoked to
combine more values with the accumulator for that batch.
4. The merge_accumulators method is invoked to combine accumulators from
separate batches into a single combined output accumulator value, once all
@@ -296,7 +296,7 @@ class CombineFn(WithTypeHints):
def add_input(self, accumulator, element, *args, **kwargs):
"""Return result of folding element into accumulator.
- CombineFn implementors must override either add_input or add_inputs.
+ CombineFn implementors must override add_input.
Args:
accumulator: the current accumulator
@@ -420,7 +420,7 @@ class CallableWrapperCombineFn(CombineFn):
if accumulator is self._EMPTY:
return self._fn(elements, *args, **kwargs)
elif isinstance(elements, (list, tuple)):
- return self._fn([accumulator] + elements, *args, **kwargs)
+ return self._fn([accumulator] + list(elements), *args, **kwargs)
else:
def union():
yield accumulator
[3/3] incubator-beam git commit: Closes #770
Posted by ro...@apache.org.
Closes #770
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/65152cab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/65152cab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/65152cab
Branch: refs/heads/python-sdk
Commit: 65152cab8ea9be39c1e1a3caf25887eed6ed5c85
Parents: e834fa8 4a2239d
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Aug 3 13:53:03 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Aug 3 13:53:03 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/combiners.py | 28 ++++++++++++++------
sdks/python/apache_beam/transforms/core.py | 6 ++---
2 files changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Document TupleCombineFns
Posted by ro...@apache.org.
Document TupleCombineFns
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a2239d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a2239d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a2239d3
Branch: refs/heads/python-sdk
Commit: 4a2239d3701e13622998c71107d263c8966e73e1
Parents: 3ebf28c
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Aug 3 13:52:36 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Aug 3 13:52:36 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/combiners.py | 12 ++++++++++++
1 file changed, 12 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a2239d3/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index c3f0da1..a0604b8 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -427,6 +427,12 @@ class _TupleCombineFnBase(core.CombineFn):
class TupleCombineFn(_TupleCombineFnBase):
+ """A combiner for combining tuples via a tuple of combiners.
+
+ Takes as input a tuple of N CombineFns and combines N-tuples by
+ combining the k-th element of each tuple with the k-th CombineFn,
+ outputting a new N-tuple of combined values.
+ """
def add_input(self, accumulator, element):
return [c.add_input(a, e)
@@ -437,6 +443,12 @@ class TupleCombineFn(_TupleCombineFnBase):
class SingleInputTupleCombineFn(_TupleCombineFnBase):
+ """A combiner for combining a single value via a tuple of combiners.
+
+ Takes as input a tuple of N CombineFns and combines elements by
+ applying each CombineFn to each input, producing an N-tuple of
+ the outputs corresponding to each of the N CombineFn's outputs.
+ """
def add_input(self, accumulator, element):
return [c.add_input(a, element)