You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/08/17 11:52:13 UTC
[1/2] flink git commit: [FLINK-4411] [py] Properly propagate chained
dual input children
Repository: flink
Updated Branches:
refs/heads/master b19648eb4 -> 84d28ba00
[FLINK-4411] [py] Properly propagate chained dual input children
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10508477
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10508477
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10508477
Branch: refs/heads/master
Commit: 1050847787b416399a6c03c0568969df93ed4822
Parents: b19648e
Author: zentol <ch...@apache.org>
Authored: Wed Aug 17 12:15:37 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Aug 17 13:51:55 2016 +0200
----------------------------------------------------------------------
.../flink/python/api/flink/plan/Environment.py | 15 ++++++++++++---
.../python/org/apache/flink/python/api/test_main2.py | 4 ++--
2 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/10508477/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 9d08baf..a54dac8 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -219,15 +219,21 @@ class Environment(object):
dual_input = set([_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT, _Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.COGROUP, _Identifier.UNION])
x = len(self._sets) - 1
while x > -1:
+ # CHAIN(parent -> child) -> grand_child
+ # for all intents and purposes the child set ceases to exist; it is merged into the parent
child = self._sets[x]
child_type = child.identifier
if child_type in chainable:
parent = child.parent
+ # we can only chain to an actual python udf (=> operator is not None)
+ # we may only chain if the parent has only 1 child
if parent.operator is not None and len(parent.children) == 1 and len(parent.sinks) == 0:
parent.chained_info = child
parent.name += " -> " + child.name
parent.types = child.types
+ # grand_children now belong to the parent
for grand_child in child.children:
+ # dual_input operations have 2 parents; hence we have to change the correct one
if grand_child.identifier in dual_input:
if grand_child.parent.id == child.id:
grand_child.parent = parent
@@ -235,15 +241,18 @@ class Environment(object):
grand_child.other = parent
else:
grand_child.parent = parent
- parent.children.append(grand_child)
- parent.children.remove(child)
+ parent.children.append(grand_child)
+ # child sinks now belong to the parent
for sink in child.sinks:
sink.parent = parent
parent.sinks.append(sink)
+ # child broadcast variables now belong to the parent
for bcvar in child.bcvars:
bcvar.parent = parent
parent.bcvars.append(bcvar)
- self._remove_set((child))
+ # remove child set as it has been merged into the parent
+ parent.children.remove(child)
+ self._remove_set(child)
x -= 1
def _remove_set(self, set):
http://git-wip-us.apache.org/repos/asf/flink/blob/10508477/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
index ceb26d0..f1d40e1 100644
--- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
+++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
@@ -22,14 +22,14 @@ from flink.functions.CrossFunction import CrossFunction
from flink.functions.JoinFunction import JoinFunction
from flink.functions.CoGroupFunction import CoGroupFunction
from flink.functions.Aggregation import Max, Min, Sum
-from utils import Verify, Verify2
+from utils import Verify, Verify2, Id
if __name__ == "__main__":
env = get_environment()
d1 = env.from_elements(1, 6, 12)
- d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False))
+ d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False)).map(Id()).map(Id()) # force map chaining
d3 = env.from_elements(("hello",), ("world",))
[2/2] flink git commit: [FLINK-4412] [py] Chaining properly handles
broadcast variables
Posted by ch...@apache.org.
[FLINK-4412] [py] Chaining properly handles broadcast variables
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84d28ba0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84d28ba0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84d28ba0
Branch: refs/heads/master
Commit: 84d28ba00f3e63a83132c1666c8dc8deec7800ba
Parents: 1050847
Author: zentol <ch...@apache.org>
Authored: Wed Aug 17 13:47:30 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Aug 17 13:52:02 2016 +0200
----------------------------------------------------------------------
.../org/apache/flink/python/api/flink/plan/DataSet.py | 1 -
.../apache/flink/python/api/flink/plan/Environment.py | 14 ++++++++++++--
2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/84d28ba0/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
index caa4ae7..06557ca 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
@@ -669,7 +669,6 @@ class OperatorSet(DataSet):
child.other = set._info
child.name = name
self._info.bcvars.append(child)
- set._info.children.append(child)
self._env._broadcast.append(child)
return self
http://git-wip-us.apache.org/repos/asf/flink/blob/84d28ba0/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index a54dac8..1e4ba1a 100644
--- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -227,7 +227,9 @@ class Environment(object):
parent = child.parent
# we can only chain to an actual python udf (=> operator is not None)
# we may only chain if the parent has only 1 child
- if parent.operator is not None and len(parent.children) == 1 and len(parent.sinks) == 0:
+ # we may only chain if the parent is not used as a broadcast variable
+ # we may only chain if the parent does not use the child as a broadcast variable
+ if parent.operator is not None and len(parent.children) == 1 and len(parent.sinks) == 0 and parent not in self._broadcast and child not in parent.bcvars:
parent.chained_info = child
parent.name += " -> " + child.name
parent.types = child.types
@@ -242,6 +244,14 @@ class Environment(object):
else:
grand_child.parent = parent
parent.children.append(grand_child)
+ # if child is used as a broadcast variable the parent must now be used instead
+ for s in self._sets:
+ if child in s.bcvars:
+ s.bcvars.remove(child)
+ s.bcvars.append(parent)
+ for bcvar in self._broadcast:
+ if bcvar.other.id == child.id:
+ bcvar.other = parent
# child sinks now belong to the parent
for sink in child.sinks:
sink.parent = parent
@@ -256,7 +266,7 @@ class Environment(object):
x -= 1
def _remove_set(self, set):
- self._sets[:] = [s for s in self._sets if s.id!=set.id]
+ self._sets[:] = [s for s in self._sets if s.id != set.id]
def _send_plan(self):
self._send_parameters()