You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2020/07/01 19:30:21 UTC
[beam] branch master updated: Change GroupIntoBatches to group for
real
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 46276b5 Change GroupIntoBatches to group for real
new 8f86cec Merge pull request #12129 from aaltay/gib
46276b5 is described below
commit 46276b5191bd21359c05e03b692dbcb40e688e02
Author: Ahmet Altay <al...@google.com>
AuthorDate: Mon Jun 29 19:58:12 2020 -0700
Change GroupIntoBatches to group for real
---
CHANGES.md | 4 +++-
.../snippets/transforms/aggregation/groupintobatches_test.py | 12 ++++++------
sdks/python/apache_beam/transforms/util.py | 9 +++++++--
3 files changed, 16 insertions(+), 9 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index a8aceeb..7586734 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -77,7 +77,9 @@
`{"foo": "bar", "baz": null}`, whereas an implicit null like `{"foo": "bar"}` would raise an
exception. Now both JSON strings will yield the same result by default. This behavior can be
overriden with `RowJson.RowJsonDeserializer#withNullBehavior`.
-
+* Fixed a bug in `GroupIntoBatches` experimental transform in Python to actually group batches by key.
+ This changes the output type for this transform ([BEAM-6696](https://issues.apache.org/jira/browse/BEAM-6696)).
+* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
## Deprecations
diff --git a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupintobatches_test.py b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupintobatches_test.py
index 374401d..76c94da 100644
--- a/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupintobatches_test.py
+++ b/sdks/python/apache_beam/examples/snippets/transforms/aggregation/groupintobatches_test.py
@@ -31,14 +31,14 @@ from . import groupintobatches
def check_batches_with_keys(actual):
expected = '''[START batches_with_keys]
-[('spring', '🍓'), ('spring', '🥕'), ('spring', '🍆')]
-[('summer', '🥕'), ('summer', '🍅'), ('summer', '🌽')]
-[('spring', '🍅')]
-[('fall', '🥕'), ('fall', '🍅')]
-[('winter', '🍆')]
+('spring', ['🍓', '🥕', '🍆'])
+('summer', ['🥕', '🍅', '🌽'])
+('spring', ['🍅'])
+('fall', ['🥕', '🍅'])
+('winter', ['🍆'])
[END batches_with_keys]'''.splitlines()[1:-1]
assert_matches_stdout(
- actual, expected, lambda batch: (batch[0][0], len(batch)))
+ actual, expected, lambda batch: (batch[0], len(batch[1])))
@mock.patch('apache_beam.Pipeline', TestPipeline)
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index afb4192..2f68d08 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -741,6 +741,7 @@ def WithKeys(pcoll, k):
@experimental()
@typehints.with_input_types(Tuple[K, V])
+@typehints.with_output_types(Tuple[K, Iterable[V]])
class GroupIntoBatches(PTransform):
"""PTransform that batches the input into desired batch size. Elements are
buffered until they are equal to batch size provided in the argument at which
@@ -786,7 +787,9 @@ def _pardo_group_into_batches(batch_size, input_coder):
count = count_state.read()
if count >= batch_size:
batch = [element for element in element_state.read()]
- yield batch
+ key, _ = batch[0]
+ batch_values = [v for (k, v) in batch]
+ yield (key, batch_values)
element_state.clear()
count_state.clear()
@@ -797,7 +800,9 @@ def _pardo_group_into_batches(batch_size, input_coder):
count_state=DoFn.StateParam(COUNT_STATE)):
batch = [element for element in element_state.read()]
if batch:
- yield batch
+ key, _ = batch[0]
+ batch_values = [v for (k, v) in batch]
+ yield (key, batch_values)
element_state.clear()
count_state.clear()