You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/08/03 20:53:25 UTC

[1/3] incubator-beam git commit: Implement add_input for all CombineFns.

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk e834fa82b -> 65152cab8


Implement add_input for all CombineFns.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3ebf28c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3ebf28c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3ebf28c6

Branch: refs/heads/python-sdk
Commit: 3ebf28c6e0d17af3720076e33f88a0f126a89059
Parents: e834fa8
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Jul 26 01:15:55 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Aug 2 15:52:28 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/combiners.py | 16 ++++++++--------
 sdks/python/apache_beam/transforms/core.py      |  6 +++---
 2 files changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ebf28c6/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index 155dcc6..c3f0da1 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -132,6 +132,9 @@ class CountCombineFn(core.CombineFn):
   def create_accumulator(self):
     return 0
 
+  def add_input(self, accumulator, element):
+    return accumulator + 1
+
   def add_inputs(self, accumulator, elements):
     return accumulator + len(elements)
 
@@ -425,9 +428,9 @@ class _TupleCombineFnBase(core.CombineFn):
 
 class TupleCombineFn(_TupleCombineFnBase):
 
-  def add_inputs(self, accumulator, elements):
-    return [c.add_inputs(a, e)
-            for c, a, e in zip(self._combiners, accumulator, zip(*elements))]
+  def add_input(self, accumulator, element):
+    return [c.add_input(a, e)
+            for c, a, e in zip(self._combiners, accumulator, element)]
 
   def with_common_input(self):
     return SingleInputTupleCombineFn(*self._combiners)
@@ -435,8 +438,8 @@ class TupleCombineFn(_TupleCombineFnBase):
 
 class SingleInputTupleCombineFn(_TupleCombineFnBase):
 
-  def add_inputs(self, accumulator, elements):
-    return [c.add_inputs(a, elements)
+  def add_input(self, accumulator, element):
+    return [c.add_input(a, element)
             for c, a in zip(self._combiners, accumulator)]
 
 
@@ -522,9 +525,6 @@ def curry_combine_fn(fn, args, kwargs):
       def add_input(self, accumulator, element):
         return fn.add_input(accumulator, element, *args, **kwargs)
 
-      def add_inputs(self, accumulator, elements):
-        return fn.add_inputs(accumulator, elements, *args, **kwargs)
-
       def merge_accumulators(self, accumulators):
         return fn.merge_accumulators(accumulators, *args, **kwargs)
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ebf28c6/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 38b9cd2..da26205 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -270,7 +270,7 @@ class CombineFn(WithTypeHints):
   1. Input values are partitioned into one or more batches.
   2. For each batch, the create_accumulator method is invoked to create a fresh
      initial "accumulator" value representing the combination of zero values.
-  3. For each input value in the batch, the add_inputs method is invoked to
+  3. For each input value in the batch, the add_input method is invoked to
      combine more values with the accumulator for that batch.
   4. The merge_accumulators method is invoked to combine accumulators from
      separate batches into a single combined output accumulator value, once all
@@ -296,7 +296,7 @@ class CombineFn(WithTypeHints):
   def add_input(self, accumulator, element, *args, **kwargs):
     """Return result of folding element into accumulator.
 
-    CombineFn implementors must override either add_input or add_inputs.
+    CombineFn implementors must override add_input.
 
     Args:
       accumulator: the current accumulator
@@ -420,7 +420,7 @@ class CallableWrapperCombineFn(CombineFn):
     if accumulator is self._EMPTY:
       return self._fn(elements, *args, **kwargs)
     elif isinstance(elements, (list, tuple)):
-      return self._fn([accumulator] + elements, *args, **kwargs)
+      return self._fn([accumulator] + list(elements), *args, **kwargs)
     else:
       def union():
         yield accumulator


[3/3] incubator-beam git commit: Closes #770

Posted by ro...@apache.org.
Closes #770


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/65152cab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/65152cab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/65152cab

Branch: refs/heads/python-sdk
Commit: 65152cab8ea9be39c1e1a3caf25887eed6ed5c85
Parents: e834fa8 4a2239d
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Aug 3 13:53:03 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Aug 3 13:53:03 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/combiners.py | 28 ++++++++++++++------
 sdks/python/apache_beam/transforms/core.py      |  6 ++---
 2 files changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-beam git commit: Document TupleCombineFns

Posted by ro...@apache.org.
Document TupleCombineFns


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a2239d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a2239d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a2239d3

Branch: refs/heads/python-sdk
Commit: 4a2239d3701e13622998c71107d263c8966e73e1
Parents: 3ebf28c
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Aug 3 13:52:36 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Aug 3 13:52:36 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/transforms/combiners.py | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a2239d3/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index c3f0da1..a0604b8 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -427,6 +427,12 @@ class _TupleCombineFnBase(core.CombineFn):
 
 
 class TupleCombineFn(_TupleCombineFnBase):
+  """A combiner for combining tuples via a tuple of combiners.
+
+  Takes as input a tuple of N CombineFns and combines N-tuples by
+  combining the k-th element of each tuple with the k-th CombineFn,
+  outputting a new N-tuple of combined values.
+  """
 
   def add_input(self, accumulator, element):
     return [c.add_input(a, e)
@@ -437,6 +443,12 @@ class TupleCombineFn(_TupleCombineFnBase):
 
 
 class SingleInputTupleCombineFn(_TupleCombineFnBase):
+  """A combiner for combining a single value via a tuple of combiners.
+
+  Takes as input a tuple of N CombineFns and combines elements by
+  applying each CombineFn to each input, producing an N-tuple of
+  the outputs corresponding to each of the N CombineFn's outputs.
+  """
 
   def add_input(self, accumulator, element):
     return [c.add_input(a, element)