You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/01 19:39:24 UTC

[GitHub] [beam] pabloem commented on a change in pull request #14108: Adding a warning to use multi-workers on FnApiRunner

pabloem commented on a change in pull request #14108:
URL: https://github.com/apache/beam/pull/14108#discussion_r584997094



##########
File path: sdks/python/apache_beam/runners/common.py
##########
@@ -1511,3 +1512,41 @@ def windows(self):
       raise AttributeError('windows not accessible in this context')
     else:
       return self.windowed_value.windows
+
+
+def group_by_key_input_visitor(deterministic_key_coders):
+  # Importing here to avoid a circular dependency
+  from apache_beam.pipeline import PipelineVisitor
+
+  class GroupByKeyInputVisitor(PipelineVisitor):
+    """A visitor that replaces `Any` element type for input `PCollection` of
+    a `GroupByKey` with a `KV` type.
+
+    TODO(BEAM-115): Once Python SDk is compatible with the new Runner API,

Review comment:
       done

##########
File path: sdks/python/apache_beam/runners/common.py
##########
@@ -1511,3 +1512,41 @@ def windows(self):
       raise AttributeError('windows not accessible in this context')
     else:
       return self.windowed_value.windows
+
+
+def group_by_key_input_visitor(deterministic_key_coders):
+  # Importing here to avoid a circular dependency
+  from apache_beam.pipeline import PipelineVisitor
+
+  class GroupByKeyInputVisitor(PipelineVisitor):
+    """A visitor that replaces `Any` element type for input `PCollection` of
+    a `GroupByKey` with a `KV` type.
+
+    TODO(BEAM-115): Once Python SDk is compatible with the new Runner API,
+    we could directly replace the coder instead of mutating the element type.
+    """
+    def __init__(self, deterministic_key_coders=True):
+      self.deterministic_key_coders = deterministic_key_coders
+
+    def enter_composite_transform(self, transform_node):
+      self.visit_transform(transform_node)
+
+    def visit_transform(self, transform_node):
+      # Imported here to avoid circular dependencies.
+      # pylint: disable=wrong-import-order, wrong-import-position
+      from apache_beam.transforms.core import GroupByKey

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org