You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2022/05/25 17:48:14 UTC
[beam] branch master updated: [BEAM-9324] Fix incompatibility of direct runner with cython (#17728)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a73f32d5a07 [BEAM-9324] Fix incompatibility of direct runner with cython (#17728)
a73f32d5a07 is described below
commit a73f32d5a075a145d643f4311c55274374692f4e
Author: Yi Hu <ya...@google.com>
AuthorDate: Wed May 25 13:48:07 2022 -0400
[BEAM-9324] Fix incompatibility of direct runner with cython (#17728)
* [BEAM-9324] Fix incompatibility of direct runner with cython
* documented workaround for future clean up
---
.../apache_beam/runners/worker/operations.py | 36 +++++++++++++++-------
1 file changed, 25 insertions(+), 11 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index f46a4a18369..09706fe7187 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -78,9 +78,7 @@ try:
except ImportError:
class FakeCython(object):
- @staticmethod
- def cast(type, value):
- return value
+ compiled = False
globals()['cython'] = FakeCython()
@@ -93,6 +91,22 @@ SdfSplitResultsPrimary = Tuple['DoOperation', 'SplitResultPrimary']
SdfSplitResultsResidual = Tuple['DoOperation', 'SplitResultResidual']
+# TODO(BEAM-9324) Remove these workarounds once upgraded to Cython 3
+def _cast_to_operation(value):
+ if cython.compiled:
+ return cython.cast(Operation, value)
+ else:
+ return value
+
+
+# TODO(BEAM-9324) Remove these workarounds once upgraded to Cython 3
+def _cast_to_receiver(value):
+ if cython.compiled:
+ return cython.cast(Receiver, value)
+ else:
+ return value
+
+
class ConsumerSet(Receiver):
"""A ConsumerSet represents a graph edge between two Operation nodes.
@@ -307,7 +321,7 @@ class GeneralPurposeConsumerSet(ConsumerSet):
self.update_counters_start(windowed_value)
for consumer in self.element_consumers:
- cython.cast(Operation, consumer).process(windowed_value)
+ _cast_to_operation(consumer).process(windowed_value)
# TODO: Do this branching when contstructing ConsumerSet
if self.has_batch_consumers:
@@ -324,10 +338,10 @@ class GeneralPurposeConsumerSet(ConsumerSet):
for wv in windowed_batch.as_windowed_values(
self.producer_batch_converter.explode_batch):
for consumer in self.element_consumers:
- cython.cast(Operation, consumer).process(wv)
+ _cast_to_operation(consumer).process(wv)
for consumer in self.passthrough_batch_consumers:
- cython.cast(Operation, consumer).process_batch(windowed_batch)
+ _cast_to_operation(consumer).process_batch(windowed_batch)
for (consumer_batch_converter,
consumers) in self.other_batch_consumers.items():
@@ -342,7 +356,7 @@ class GeneralPurposeConsumerSet(ConsumerSet):
"This is very inefficient, consider re-structuring your pipeline "
"or adding a DoFn to directly convert between these types.",
InefficientExecutionWarning)
- cython.cast(Operation, consumer).process_batch(
+ _cast_to_operation(consumer).process_batch(
windowed_batch.with_values(
consumer_batch_converter.produce_batch(
self.producer_batch_converter.explode_batch(
@@ -358,13 +372,13 @@ class GeneralPurposeConsumerSet(ConsumerSet):
for windowed_batch in WindowedBatch.from_windowed_values(
self._batched_elements, produce_fn=batch_converter.produce_batch):
for consumer in consumers:
- cython.cast(Operation, consumer).process_batch(windowed_batch)
+ _cast_to_operation(consumer).process_batch(windowed_batch)
for consumer in self.passthrough_batch_consumers:
for windowed_batch in WindowedBatch.from_windowed_values(
self._batched_elements,
produce_fn=self.producer_batch_converter.produce_batch):
- cython.cast(Operation, consumer).process_batch(windowed_batch)
+ _cast_to_operation(consumer).process_batch(windowed_batch)
self._batched_elements = []
@@ -495,7 +509,7 @@ class Operation(object):
"""Finish operation."""
for receiver in self.receivers:
- cython.cast(Receiver, receiver).flush()
+ _cast_to_receiver(receiver).flush()
def teardown(self):
# type: () -> None
@@ -511,7 +525,7 @@ class Operation(object):
def output(self, windowed_value, output_index=0):
# type: (WindowedValue, int) -> None
- cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
+ _cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
def add_receiver(self, operation, output_index=0):
# type: (Operation, int) -> None