You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2023/07/13 14:51:18 UTC

[beam] branch revert-27373-direct_runner_bug created (now 6bfc208ba15)

This is an automated email from the ASF dual-hosted git repository.

anandinguva pushed a change to branch revert-27373-direct_runner_bug
in repository https://gitbox.apache.org/repos/asf/beam.git


      at 6bfc208ba15 Revert "[Python] use get_buffer to fetch buffer when the buffer is None (#27373)"

This branch includes the following new commits:

     new 6bfc208ba15 Revert "[Python] use get_buffer to fetch buffer when the buffer is None (#27373)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Revert "[Python] use get_buffer to fetch buffer when the buffer is None (#27373)"

Posted by an...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

anandinguva pushed a commit to branch revert-27373-direct_runner_bug
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6bfc208ba152d4de07d83e99e3a4968f22102eeb
Author: Anand Inguva <34...@users.noreply.github.com>
AuthorDate: Thu Jul 13 10:51:12 2023 -0400

    Revert "[Python] use get_buffer to fetch buffer when the buffer is None (#27373)"
    
    This reverts commit 63d5171b70fd05ea8c79cc549d1ec94b3eb4c57d.
---
 CHANGES.md                                               |  2 +-
 .../runners/portability/fn_api_runner/fn_runner.py       | 16 +++++-----------
 .../runners/portability/fn_api_runner/fn_runner_test.py  |  9 ---------
 3 files changed, 6 insertions(+), 21 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index d6d46507f8a..7ed75c7ee05 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -78,7 +78,7 @@
 
 ## Bugfixes
 
-* Fixed DirectRunner bug in Python SDK where GroupByKey gets empty PCollection and fails when pipeline option `direct_num_workers!=1`. ([#27373](https://github.com/apache/beam/pull/27373))
+* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
 
 ## Known Issues
 
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index be7f99dc61f..8d957068d08 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -825,11 +825,10 @@ class FnApiRunner(runner.PipelineRunner):
 
     buffers_to_clean = set()
     known_consumers = set()
-    for transform_id, buffer_id in (
-      bundle_context_manager.stage_data_outputs.items()):
-      for (consuming_stage_name, consuming_transform
-           ) in runner_execution_context.buffer_id_to_consumer_pairs.get(
-               buffer_id, []):
+    for _, buffer_id in bundle_context_manager.stage_data_outputs.items():
+      for (consuming_stage_name, consuming_transform) in \
+          runner_execution_context.buffer_id_to_consumer_pairs.get(buffer_id,
+                                                                   []):
         buffer = runner_execution_context.pcoll_buffers.get(buffer_id, None)
 
         if (buffer_id in runner_execution_context.pcoll_buffers and
@@ -841,11 +840,6 @@ class FnApiRunner(runner.PipelineRunner):
           # so we create a copy of the buffer for every new stage.
           runner_execution_context.pcoll_buffers[buffer_id] = buffer.copy()
           buffer = runner_execution_context.pcoll_buffers[buffer_id]
-        # When the buffer is not in the pcoll_buffers, it means that the
-        # it could be an empty PCollection. In this case, get the buffer using
-        # the buffer id and transform id
-        if buffer is None:
-          buffer = bundle_context_manager.get_buffer(buffer_id, transform_id)
 
         # If the buffer has already been added to be consumed by
         # (stage, transform), then we don't need to add it again. This case
@@ -860,7 +854,7 @@ class FnApiRunner(runner.PipelineRunner):
         # MAX_TIMESTAMP for the downstream stage.
         runner_execution_context.queues.watermark_pending_inputs.enque(
             ((consuming_stage_name, timestamp.MAX_TIMESTAMP),
-             DataInput({consuming_transform: buffer}, {})))
+             DataInput({consuming_transform: buffer}, {})))  # type: ignore
 
     for bid in buffers_to_clean:
       if bid in runner_execution_context.pcoll_buffers:
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index b55c7162aea..ed09bb8f223 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -1831,15 +1831,6 @@ class FnApiRunnerTestWithGrpcAndMultiWorkers(FnApiRunnerTest):
     p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
     return p
 
-  def test_group_by_key_with_empty_pcoll_elements(self):
-    with self.create_pipeline() as p:
-      res = (
-          p
-          | beam.Create([('test_key', 'test_value')])
-          | beam.Filter(lambda x: False)
-          | beam.GroupByKey())
-      assert_that(res, equal_to([]))
-
   def test_metrics(self):
     raise unittest.SkipTest("This test is for a single worker only.")