You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/08/17 11:52:13 UTC

[1/2] flink git commit: [FLINK-4411] [py] Properly propagate chained dual input children

Repository: flink
Updated Branches:
  refs/heads/master b19648eb4 -> 84d28ba00


[FLINK-4411] [py] Properly propagate chained dual input children


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10508477
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10508477
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10508477

Branch: refs/heads/master
Commit: 1050847787b416399a6c03c0568969df93ed4822
Parents: b19648e
Author: zentol <ch...@apache.org>
Authored: Wed Aug 17 12:15:37 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Aug 17 13:51:55 2016 +0200

----------------------------------------------------------------------
 .../flink/python/api/flink/plan/Environment.py       | 15 ++++++++++++---
 .../python/org/apache/flink/python/api/test_main2.py |  4 ++--
 2 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10508477/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 9d08baf..a54dac8 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -219,15 +219,21 @@ class Environment(object):
         dual_input = set([_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT, _Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.COGROUP, _Identifier.UNION])
         x = len(self._sets) - 1
         while x > -1:
+            # CHAIN(parent -> child) -> grand_child
+            # for all intents and purposes the child set ceases to exist; it is merged into the parent
             child = self._sets[x]
             child_type = child.identifier
             if child_type in chainable:
                 parent = child.parent
+                # we can only chain to an actual python udf (=> operator is not None)
+                # we may only chain if the parent has only 1 child
                 if parent.operator is not None and len(parent.children) == 1 and len(parent.sinks) == 0:
                     parent.chained_info = child
                     parent.name += " -> " + child.name
                     parent.types = child.types
+                    # grand_children now belong to the parent
                     for grand_child in child.children:
+                        # dual_input operations have 2 parents; hence we have to change the correct one
                         if grand_child.identifier in dual_input:
                             if grand_child.parent.id == child.id:
                                 grand_child.parent = parent
@@ -235,15 +241,18 @@ class Environment(object):
                                 grand_child.other = parent
                         else:
                             grand_child.parent = parent
-                            parent.children.append(grand_child)
-                    parent.children.remove(child)
+                        parent.children.append(grand_child)
+                    # child sinks now belong to the parent
                     for sink in child.sinks:
                         sink.parent = parent
                         parent.sinks.append(sink)
+                    # child broadcast variables now belong to the parent
                     for bcvar in child.bcvars:
                         bcvar.parent = parent
                         parent.bcvars.append(bcvar)
-                    self._remove_set((child))
+                    # remove child set as it has been merged into the parent
+                    parent.children.remove(child)
+                    self._remove_set(child)
             x -= 1
 
     def _remove_set(self, set):

http://git-wip-us.apache.org/repos/asf/flink/blob/10508477/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
index ceb26d0..f1d40e1 100644
--- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
@@ -22,14 +22,14 @@ from flink.functions.CrossFunction import CrossFunction
 from flink.functions.JoinFunction import JoinFunction
 from flink.functions.CoGroupFunction import CoGroupFunction
 from flink.functions.Aggregation import Max, Min, Sum
-from utils import Verify, Verify2
+from utils import Verify, Verify2, Id
 
 if __name__ == "__main__":
     env = get_environment()
 
     d1 = env.from_elements(1, 6, 12)
 
-    d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False))
+    d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False)).map(Id()).map(Id())  # force map chaining
 
     d3 = env.from_elements(("hello",), ("world",))
 


[2/2] flink git commit: [FLINK-4412] [py] Chaining properly handles broadcast variables

Posted by ch...@apache.org.
[FLINK-4412] [py] Chaining properly handles broadcast variables


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84d28ba0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84d28ba0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84d28ba0

Branch: refs/heads/master
Commit: 84d28ba00f3e63a83132c1666c8dc8deec7800ba
Parents: 1050847
Author: zentol <ch...@apache.org>
Authored: Wed Aug 17 13:47:30 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Aug 17 13:52:02 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/python/api/flink/plan/DataSet.py |  1 -
 .../apache/flink/python/api/flink/plan/Environment.py | 14 ++++++++++++--
 2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84d28ba0/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
index caa4ae7..06557ca 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
@@ -669,7 +669,6 @@ class OperatorSet(DataSet):
         child.other = set._info
         child.name = name
         self._info.bcvars.append(child)
-        set._info.children.append(child)
         self._env._broadcast.append(child)
         return self
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84d28ba0/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index a54dac8..1e4ba1a 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -227,7 +227,9 @@ class Environment(object):
                 parent = child.parent
                 # we can only chain to an actual python udf (=> operator is not None)
                 # we may only chain if the parent has only 1 child
-                if parent.operator is not None and len(parent.children) == 1 and len(parent.sinks) == 0:
+                # we may only chain if the parent is not used as a broadcast variable
+                # we may only chain if the parent does not use the child as a broadcast variable
+                if parent.operator is not None and len(parent.children) == 1 and len(parent.sinks) == 0 and parent not in self._broadcast and child not in parent.bcvars:
                     parent.chained_info = child
                     parent.name += " -> " + child.name
                     parent.types = child.types
@@ -242,6 +244,14 @@ class Environment(object):
                         else:
                             grand_child.parent = parent
                         parent.children.append(grand_child)
+                    # if child is used as a broadcast variable the parent must now be used instead
+                    for s in self._sets:
+                        if child in s.bcvars:
+                            s.bcvars.remove(child)
+                            s.bcvars.append(parent)
+                    for bcvar in self._broadcast:
+                        if bcvar.other.id == child.id:
+                            bcvar.other = parent
                     # child sinks now belong to the parent
                     for sink in child.sinks:
                         sink.parent = parent
@@ -256,7 +266,7 @@ class Environment(object):
             x -= 1
 
     def _remove_set(self, set):
-        self._sets[:] = [s for s in self._sets if s.id!=set.id]
+        self._sets[:] = [s for s in self._sets if s.id != set.id]
 
     def _send_plan(self):
         self._send_parameters()