You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2020/07/01 19:30:21 UTC

[beam] branch master updated: Change GroupIntoBatches to group for real

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 46276b5  Change GroupIntoBatches to group for real
     new 8f86cec  Merge pull request #12129 from aaltay/gib
46276b5 is described below

commit 46276b5191bd21359c05e03b692dbcb40e688e02
Author: Ahmet Altay <al...@google.com>
AuthorDate: Mon Jun 29 19:58:12 2020 -0700

    Change GroupIntoBatches to group for real
---
 CHANGES.md                                                   |  4 +++-
 .../snippets/transforms/aggregation/groupintobatches_test.py | 12 ++++++------
 sdks/python/apache_beam/transforms/util.py                   |  9 +++++++--
 3 files changed, 16 insertions(+), 9 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index a8aceeb..7586734 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -77,7 +77,9 @@
   `{"foo": "bar", "baz": null}`, whereas an implicit null like `{"foo": "bar"}` would raise an
   exception. Now both JSON strings will yield the same result by default. This behavior can be
   overriden with `RowJson.RowJsonDeserializer#withNullBehavior`.
-
+* Fixed a bug in `GroupIntoBatches` experimental transform in Python to actually group batches by key. 
+  This changes the output type for this transform ([BEAM-6696](https://issues.apache.org/jira/browse/BEAM-6696)).
+* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 
 ## Deprecations
 
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupintobatches_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupintobatches_test.py
index 374401d..76c94da 100644
--- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupintobatches_test.py
+++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupintobatches_test.py
@@ -31,14 +31,14 @@ from . import groupintobatches
 
 def check_batches_with_keys(actual):
   expected = '''[START batches_with_keys]
-[('spring', '🍓'), ('spring', '🥕'), ('spring', '🍆')]
-[('summer', '🥕'), ('summer', '🍅'), ('summer', '🌽')]
-[('spring', '🍅')]
-[('fall', '🥕'), ('fall', '🍅')]
-[('winter', '🍆')]
+('spring', ['🍓', '🥕', '🍆'])
+('summer', ['🥕', '🍅', '🌽'])
+('spring', ['🍅'])
+('fall', ['🥕', '🍅'])
+('winter', ['🍆'])
 [END batches_with_keys]'''.splitlines()[1:-1]
   assert_matches_stdout(
-      actual, expected, lambda batch: (batch[0][0], len(batch)))
+      actual, expected, lambda batch: (batch[0], len(batch[1])))
 
 
 @mock.patch('apache_beam.Pipeline', TestPipeline)
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index afb4192..2f68d08 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -741,6 +741,7 @@ def WithKeys(pcoll, k):
 
 @experimental()
 @typehints.with_input_types(Tuple[K, V])
+@typehints.with_output_types(Tuple[K, Iterable[V]])
 class GroupIntoBatches(PTransform):
   """PTransform that batches the input into desired batch size. Elements are
   buffered until they are equal to batch size provided in the argument at which
@@ -786,7 +787,9 @@ def _pardo_group_into_batches(batch_size, input_coder):
       count = count_state.read()
       if count >= batch_size:
         batch = [element for element in element_state.read()]
-        yield batch
+        key, _ = batch[0]
+        batch_values = [v for (k, v) in batch]
+        yield (key, batch_values)
         element_state.clear()
         count_state.clear()
 
@@ -797,7 +800,9 @@ def _pardo_group_into_batches(batch_size, input_coder):
         count_state=DoFn.StateParam(COUNT_STATE)):
       batch = [element for element in element_state.read()]
       if batch:
-        yield batch
+        key, _ = batch[0]
+        batch_values = [v for (k, v) in batch]
+        yield (key, batch_values)
         element_state.clear()
         count_state.clear()