You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2023/07/13 14:51:19 UTC
[beam] 01/01: Revert "[Python] use get_buffer to fetch buffer when the buffer is None (#27373)"
This is an automated email from the ASF dual-hosted git repository.
anandinguva pushed a commit to branch revert-27373-direct_runner_bug
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 6bfc208ba152d4de07d83e99e3a4968f22102eeb
Author: Anand Inguva <34...@users.noreply.github.com>
AuthorDate: Thu Jul 13 10:51:12 2023 -0400
Revert "[Python] use get_buffer to fetch buffer when the buffer is None (#27373)"
This reverts commit 63d5171b70fd05ea8c79cc549d1ec94b3eb4c57d.
---
CHANGES.md | 2 +-
.../runners/portability/fn_api_runner/fn_runner.py | 16 +++++-----------
.../runners/portability/fn_api_runner/fn_runner_test.py | 9 ---------
3 files changed, 6 insertions(+), 21 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index d6d46507f8a..7ed75c7ee05 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -78,7 +78,7 @@
## Bugfixes
-* Fixed DirectRunner bug in Python SDK where GroupByKey gets empty PCollection and fails when pipeline option `direct_num_workers!=1`. ([#27373](https://github.com/apache/beam/pull/27373))
+* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
## Known Issues
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index be7f99dc61f..8d957068d08 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -825,11 +825,10 @@ class FnApiRunner(runner.PipelineRunner):
buffers_to_clean = set()
known_consumers = set()
- for transform_id, buffer_id in (
- bundle_context_manager.stage_data_outputs.items()):
- for (consuming_stage_name, consuming_transform
- ) in runner_execution_context.buffer_id_to_consumer_pairs.get(
- buffer_id, []):
+ for _, buffer_id in bundle_context_manager.stage_data_outputs.items():
+ for (consuming_stage_name, consuming_transform) in \
+ runner_execution_context.buffer_id_to_consumer_pairs.get(buffer_id,
+ []):
buffer = runner_execution_context.pcoll_buffers.get(buffer_id, None)
if (buffer_id in runner_execution_context.pcoll_buffers and
@@ -841,11 +840,6 @@ class FnApiRunner(runner.PipelineRunner):
# so we create a copy of the buffer for every new stage.
runner_execution_context.pcoll_buffers[buffer_id] = buffer.copy()
buffer = runner_execution_context.pcoll_buffers[buffer_id]
- # When the buffer is not in the pcoll_buffers, it means that the
- # it could be an empty PCollection. In this case, get the buffer using
- # the buffer id and transform id
- if buffer is None:
- buffer = bundle_context_manager.get_buffer(buffer_id, transform_id)
# If the buffer has already been added to be consumed by
# (stage, transform), then we don't need to add it again. This case
@@ -860,7 +854,7 @@ class FnApiRunner(runner.PipelineRunner):
# MAX_TIMESTAMP for the downstream stage.
runner_execution_context.queues.watermark_pending_inputs.enque(
((consuming_stage_name, timestamp.MAX_TIMESTAMP),
- DataInput({consuming_transform: buffer}, {})))
+ DataInput({consuming_transform: buffer}, {}))) # type: ignore
for bid in buffers_to_clean:
if bid in runner_execution_context.pcoll_buffers:
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index b55c7162aea..ed09bb8f223 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -1831,15 +1831,6 @@ class FnApiRunnerTestWithGrpcAndMultiWorkers(FnApiRunnerTest):
p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
return p
- def test_group_by_key_with_empty_pcoll_elements(self):
- with self.create_pipeline() as p:
- res = (
- p
- | beam.Create([('test_key', 'test_value')])
- | beam.Filter(lambda x: False)
- | beam.GroupByKey())
- assert_that(res, equal_to([]))
-
def test_metrics(self):
raise unittest.SkipTest("This test is for a single worker only.")