You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/04/06 00:22:00 UTC

[1/2] beam git commit: Replace Any type with a KV type for inputs of a GroupByKey step

Repository: beam
Updated Branches:
  refs/heads/master 697b19fe5 -> cc5f78dd0


Replace Any type with a KV type for inputs of a GroupByKey step


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7fad7391
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7fad7391
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7fad7391

Branch: refs/heads/master
Commit: 7fad7391548b6399066c0d9e79949707d7a5914e
Parents: 697b19f
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Sun Apr 2 20:46:55 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Apr 5 17:21:14 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/runner.py | 29 ++++++++++++++++++++++++++
 1 file changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7fad7391/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index b203c8b..528b03f 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -119,6 +119,35 @@ class PipelineRunner(object):
           logging.error('Error while visiting %s', transform_node.full_label)
           raise
 
+    class GroupByKeyInputVisitor(PipelineVisitor):
+      """A visitor that replaces `Any` element type for input `PCollection` of
+      a `GroupByKey` with a `KV` type.
+
+      TODO(BEAM-115): Once Python SDk is compatible with the new Runner API,
+      we could directly replace the coder instead of mutating the element type.
+      """
+      def visit_transform(self, transform_node):
+        # Imported here to avoid circular dependencies.
+        # pylint: disable=wrong-import-order, wrong-import-position
+        from apache_beam import GroupByKey
+        from apache_beam import typehints
+        if isinstance(transform_node.transform, GroupByKey):
+          pcoll = transform_node.inputs[0]
+          input_type = pcoll.element_type
+          if not isinstance(input_type, typehints.TupleHint.TupleConstraint):
+            if isinstance(input_type, typehints.AnyTypeConstraint):
+              # `Any` type needs to be replaced with a KV[Any, Any] to
+              # force a KV coder as the main output coder for the pcollection
+              # preceding a GroupByKey.
+              pcoll.element_type = typehints.KV[typehints.Any, typehints.Any]
+            else:
+              # TODO: Handle other valid types,
+              # e.g. Union[KV[str, int], KV[str, float]]
+              raise ValueError(
+                  "Input to GroupByKey must be of Tuple or Any type. "
+                  "Found %s for %s" % (input_type, pcoll))
+
+    pipeline.visit(GroupByKeyInputVisitor())
     pipeline.visit(RunVisitor(self))
 
   def clear(self, pipeline, node=None):


[2/2] beam git commit: This closes #2409

Posted by al...@apache.org.
This closes #2409


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cc5f78dd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cc5f78dd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cc5f78dd

Branch: refs/heads/master
Commit: cc5f78dd01395c24e1164d88a3d5b5b39256b4ff
Parents: 697b19f 7fad739
Author: Ahmet Altay <al...@google.com>
Authored: Wed Apr 5 17:21:49 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Wed Apr 5 17:21:49 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/runner.py | 29 ++++++++++++++++++++++++++
 1 file changed, 29 insertions(+)
----------------------------------------------------------------------