You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2023/07/13 14:51:19 UTC

[beam] 01/01: Revert "[Python] use get_buffer to fetch buffer when the buffer is None (#27373)"

This is an automated email from the ASF dual-hosted git repository.

anandinguva pushed a commit to branch revert-27373-direct_runner_bug
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6bfc208ba152d4de07d83e99e3a4968f22102eeb
Author: Anand Inguva <34...@users.noreply.github.com>
AuthorDate: Thu Jul 13 10:51:12 2023 -0400

    Revert "[Python] use get_buffer to fetch buffer when the buffer is None (#27373)"
    
    This reverts commit 63d5171b70fd05ea8c79cc549d1ec94b3eb4c57d.
---
 CHANGES.md                                               |  2 +-
 .../runners/portability/fn_api_runner/fn_runner.py       | 16 +++++-----------
 .../runners/portability/fn_api_runner/fn_runner_test.py  |  9 ---------
 3 files changed, 6 insertions(+), 21 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index d6d46507f8a..7ed75c7ee05 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -78,7 +78,7 @@
 
 ## Bugfixes
 
-* Fixed DirectRunner bug in Python SDK where GroupByKey gets empty PCollection and fails when pipeline option `direct_num_workers!=1`. ([#27373](https://github.com/apache/beam/pull/27373))
+* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
 
 ## Known Issues
 
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index be7f99dc61f..8d957068d08 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -825,11 +825,10 @@ class FnApiRunner(runner.PipelineRunner):
 
     buffers_to_clean = set()
     known_consumers = set()
-    for transform_id, buffer_id in (
-      bundle_context_manager.stage_data_outputs.items()):
-      for (consuming_stage_name, consuming_transform
-           ) in runner_execution_context.buffer_id_to_consumer_pairs.get(
-               buffer_id, []):
+    for _, buffer_id in bundle_context_manager.stage_data_outputs.items():
+      for (consuming_stage_name, consuming_transform) in \
+          runner_execution_context.buffer_id_to_consumer_pairs.get(buffer_id,
+                                                                   []):
         buffer = runner_execution_context.pcoll_buffers.get(buffer_id, None)
 
         if (buffer_id in runner_execution_context.pcoll_buffers and
@@ -841,11 +840,6 @@ class FnApiRunner(runner.PipelineRunner):
           # so we create a copy of the buffer for every new stage.
           runner_execution_context.pcoll_buffers[buffer_id] = buffer.copy()
           buffer = runner_execution_context.pcoll_buffers[buffer_id]
-        # When the buffer is not in the pcoll_buffers, it means that the
-        # it could be an empty PCollection. In this case, get the buffer using
-        # the buffer id and transform id
-        if buffer is None:
-          buffer = bundle_context_manager.get_buffer(buffer_id, transform_id)
 
         # If the buffer has already been added to be consumed by
         # (stage, transform), then we don't need to add it again. This case
@@ -860,7 +854,7 @@ class FnApiRunner(runner.PipelineRunner):
         # MAX_TIMESTAMP for the downstream stage.
         runner_execution_context.queues.watermark_pending_inputs.enque(
             ((consuming_stage_name, timestamp.MAX_TIMESTAMP),
-             DataInput({consuming_transform: buffer}, {})))
+             DataInput({consuming_transform: buffer}, {})))  # type: ignore
 
     for bid in buffers_to_clean:
       if bid in runner_execution_context.pcoll_buffers:
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index b55c7162aea..ed09bb8f223 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -1831,15 +1831,6 @@ class FnApiRunnerTestWithGrpcAndMultiWorkers(FnApiRunnerTest):
     p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
     return p
 
-  def test_group_by_key_with_empty_pcoll_elements(self):
-    with self.create_pipeline() as p:
-      res = (
-          p
-          | beam.Create([('test_key', 'test_value')])
-          | beam.Filter(lambda x: False)
-          | beam.GroupByKey())
-      assert_that(res, equal_to([]))
-
   def test_metrics(self):
     raise unittest.SkipTest("This test is for a single worker only.")