You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2022/05/25 17:48:14 UTC

[beam] branch master updated: [BEAM-9324] Fix incompatibility of direct runner with cython (#17728)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a73f32d5a07 [BEAM-9324] Fix incompatibility of direct runner with cython (#17728)
a73f32d5a07 is described below

commit a73f32d5a075a145d643f4311c55274374692f4e
Author: Yi Hu <ya...@google.com>
AuthorDate: Wed May 25 13:48:07 2022 -0400

    [BEAM-9324] Fix incompatibility of direct runner with cython (#17728)
    
    * [BEAM-9324] Fix incompatibility of direct runner with cython
    
    * documented workaround for future clean up
---
 .../apache_beam/runners/worker/operations.py       | 36 +++++++++++++++-------
 1 file changed, 25 insertions(+), 11 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index f46a4a18369..09706fe7187 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -78,9 +78,7 @@ try:
 except ImportError:
 
   class FakeCython(object):
-    @staticmethod
-    def cast(type, value):
-      return value
+    compiled = False
 
   globals()['cython'] = FakeCython()
 
@@ -93,6 +91,22 @@ SdfSplitResultsPrimary = Tuple['DoOperation', 'SplitResultPrimary']
 SdfSplitResultsResidual = Tuple['DoOperation', 'SplitResultResidual']
 
 
+# TODO(BEAM-9324) Remove these workarounds once upgraded to Cython 3
+def _cast_to_operation(value):
+  if cython.compiled:
+    return cython.cast(Operation, value)
+  else:
+    return value
+
+
+# TODO(BEAM-9324) Remove these workarounds once upgraded to Cython 3
+def _cast_to_receiver(value):
+  if cython.compiled:
+    return cython.cast(Receiver, value)
+  else:
+    return value
+
+
 class ConsumerSet(Receiver):
   """A ConsumerSet represents a graph edge between two Operation nodes.
 
@@ -307,7 +321,7 @@ class GeneralPurposeConsumerSet(ConsumerSet):
     self.update_counters_start(windowed_value)
 
     for consumer in self.element_consumers:
-      cython.cast(Operation, consumer).process(windowed_value)
+      _cast_to_operation(consumer).process(windowed_value)
 
     # TODO: Do this branching when contstructing ConsumerSet
     if self.has_batch_consumers:
@@ -324,10 +338,10 @@ class GeneralPurposeConsumerSet(ConsumerSet):
       for wv in windowed_batch.as_windowed_values(
           self.producer_batch_converter.explode_batch):
         for consumer in self.element_consumers:
-          cython.cast(Operation, consumer).process(wv)
+          _cast_to_operation(consumer).process(wv)
 
     for consumer in self.passthrough_batch_consumers:
-      cython.cast(Operation, consumer).process_batch(windowed_batch)
+      _cast_to_operation(consumer).process_batch(windowed_batch)
 
     for (consumer_batch_converter,
          consumers) in self.other_batch_consumers.items():
@@ -342,7 +356,7 @@ class GeneralPurposeConsumerSet(ConsumerSet):
             "This is very inefficient, consider re-structuring your pipeline "
             "or adding a DoFn to directly convert between these types.",
             InefficientExecutionWarning)
-        cython.cast(Operation, consumer).process_batch(
+        _cast_to_operation(consumer).process_batch(
             windowed_batch.with_values(
                 consumer_batch_converter.produce_batch(
                     self.producer_batch_converter.explode_batch(
@@ -358,13 +372,13 @@ class GeneralPurposeConsumerSet(ConsumerSet):
       for windowed_batch in WindowedBatch.from_windowed_values(
           self._batched_elements, produce_fn=batch_converter.produce_batch):
         for consumer in consumers:
-          cython.cast(Operation, consumer).process_batch(windowed_batch)
+          _cast_to_operation(consumer).process_batch(windowed_batch)
 
     for consumer in self.passthrough_batch_consumers:
       for windowed_batch in WindowedBatch.from_windowed_values(
           self._batched_elements,
           produce_fn=self.producer_batch_converter.produce_batch):
-        cython.cast(Operation, consumer).process_batch(windowed_batch)
+        _cast_to_operation(consumer).process_batch(windowed_batch)
 
     self._batched_elements = []
 
@@ -495,7 +509,7 @@ class Operation(object):
 
     """Finish operation."""
     for receiver in self.receivers:
-      cython.cast(Receiver, receiver).flush()
+      _cast_to_receiver(receiver).flush()
 
   def teardown(self):
     # type: () -> None
@@ -511,7 +525,7 @@ class Operation(object):
 
   def output(self, windowed_value, output_index=0):
     # type: (WindowedValue, int) -> None
-    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
+    _cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
 
   def add_receiver(self, operation, output_index=0):
     # type: (Operation, int) -> None