You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by nh...@apache.org on 2016/01/20 21:16:11 UTC

[1/4] incubator-hawq git commit: HAWQ-161. Port GPDB planner fixes to HAWQ SIGSEGV happened in HashJoin.

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 1b11926fe -> b95ed2513


HAWQ-161. Port GPDB planner fixes to HAWQ
SIGSEGV happened in HashJoin.

This issue was caused by a previous fix for MPP-12637, and therefore a wrong
plan was generated for query like:

select c.relname from pg_class c where c.relname in (select b.table_name from
information_schema.tables b where c.relname like '%id%');

The RC for MPP-25724 is that, planner believes the subquery should not be pulled
up because it contains CoerceToDomain node, while actually the subquery MUST be
pulled up, otherwise HashJoin would fail.

However, after I make planner pull up that subquery, the problem in MPP-12637
would happen again, i.e, for query like below, planner would error out.

select * FROM (select attnum::information_schema.cardinal_number from
pg_attribute where attnum > 0) q where attnum = 4;

The key problem here is the order of quals, including the pulled up ones, i.e,
the order of "attnum = 4" and "attnum > 0", my solution is replacing
RangeTblRef of pulled up subquery with the resulting FromExpr, which is
consistent with PostgreSQL's style, instead of separately appending the fromlist
and quals of the resulting FromExpr to the parent query in previous GPDB.

The final problem here now is in function deconstruct_jointree(). Previous
assumption is that the quals must be compatible with the same level relids,
however for query #1 mentioned above, we should break this assumption, so I
introduced the PostponedQual feature of PostgreSQL. Meanwhile, we should make
some corresponding changes in pull_up_simple_query to appropriately do the
ResolveNew procedure now.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/b95ed251
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/b95ed251
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/b95ed251

Branch: refs/heads/master
Commit: b95ed2513983ad36651beccc721ef51bf2e70494
Parents: 0f6beb6
Author: Kenan Yao <ky...@pivotal.io>
Authored: Thu Nov 12 15:36:37 2015 -0800
Committer: Noa Horn <nh...@pivotal.io>
Committed: Wed Jan 20 12:15:52 2016 -0800

----------------------------------------------------------------------
 src/backend/optimizer/path/pathkeys.c           |   3 +-
 src/backend/optimizer/plan/initsplan.c          | 143 ++++++++++++++++---
 src/backend/optimizer/prep/prepjointree.c       |  55 ++-----
 src/include/optimizer/planmain.h                |   3 +-
 src/test/regress/expected/gp_optimizer.out      |  76 ++++++++++
 .../regress/expected/information_schema.out     |  34 +++++
 .../expected/information_schema_optimizer.out   |  34 +++++
 src/test/regress/sql/gp_optimizer.sql           |  63 ++++++++
 src/test/regress/sql/information_schema.sql     |  22 +++
 9 files changed, 371 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b95ed251/src/backend/optimizer/path/pathkeys.c
----------------------------------------------------------------------
diff --git a/src/backend/optimizer/path/pathkeys.c b/src/backend/optimizer/path/pathkeys.c
index 3276b67..f8c096a 100644
--- a/src/backend/optimizer/path/pathkeys.c
+++ b/src/backend/optimizer/path/pathkeys.c
@@ -443,7 +443,8 @@ static List *gen_implied_qual(PlannerInfo *root,
 				new_qualscope, /* qualscope */
 				NULL, /* ojscope */
 				NULL, /* outerjoin_nonnullable */
-				NULL /* local equi join scope */
+				NULL, /* local equi join scope */
+				NULL /* postponed_qual_list */
 		);
 		relevant_clauses = lappend(relevant_clauses, new_clause);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b95ed251/src/backend/optimizer/plan/initsplan.c
----------------------------------------------------------------------
diff --git a/src/backend/optimizer/plan/initsplan.c b/src/backend/optimizer/plan/initsplan.c
index b3c726e..18e34c8 100644
--- a/src/backend/optimizer/plan/initsplan.c
+++ b/src/backend/optimizer/plan/initsplan.c
@@ -55,10 +55,20 @@
 int			from_collapse_limit;
 int			join_collapse_limit;
 
+/* a structure to be used in deconstruct_recurse(), records the qual clauses
+ * which cannot be distributed to specific relations immediately at that recurse
+ * level */
+typedef struct PostponedQual
+{
+	Node	*qual;		/* a qual clause waiting to be processed */
+	Relids	relids;		/* the set of baserels it references */
+} PostponedQual;
+
 static List *deconstruct_recurse(PlannerInfo *root, Node *jtnode,
 					bool below_outer_join,
 					Relids *qualscope, Relids *inner_join_rels,
-					List **ptrToLocalEquiKeyList);
+					List **ptrToLocalEquiKeyList,
+					List **postponed_qual_list);
 static OuterJoinInfo *make_outerjoininfo(PlannerInfo *root,
 				   Relids left_rels, Relids right_rels,
 				   Relids inner_join_rels,
@@ -228,15 +238,24 @@ add_IN_vars_to_tlists(PlannerInfo *root)
 List *
 deconstruct_jointree(PlannerInfo *root)
 {
+	List		*result = NIL;
 	Relids		qualscope;
 	Relids		inner_join_rels;
+	List		*postponed_qual_list = NIL;
 
 	/* Start recursion at top of jointree */
 	Assert(root->parse->jointree != NULL &&
 		   IsA(root->parse->jointree, FromExpr));
 
-	return deconstruct_recurse(root, (Node *) root->parse->jointree, false,
-							   &qualscope, &inner_join_rels, NULL);
+	result = deconstruct_recurse(root, (Node *) root->parse->jointree, false,
+					&qualscope, &inner_join_rels, NULL, &postponed_qual_list);
+
+	if (postponed_qual_list != NIL)
+	{
+		elog(ERROR, "JOIN qualification may not refer to other relations.");
+	}
+
+	return result;
 }
 
 /*
@@ -259,13 +278,16 @@ deconstruct_jointree(PlannerInfo *root)
  *      values under the nullable side of an outer join are local equikeys
  *      but not global equikeys)
  *	Return value is the appropriate joinlist for this jointree node
+ *	*postponed_qual_list gets the list of qual clauses which cannot be
+ *	distributed to relids of current recurse level, and should be postponed to
+ *	upper level to be handled
  *
  * In addition, entries will be added to root->oj_info_list for outer joins.
  */
 static List *
 deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join,
 					Relids *qualscope, Relids *inner_join_rels,
-					List **ptrToLocalEquiKeyList)
+					List **ptrToLocalEquiKeyList, List **postponed_qual_list)
 {
 	List	   *joinlist;
 
@@ -290,6 +312,7 @@ deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join,
 		FromExpr   *f = (FromExpr *) jtnode;
 		int			remaining;
 		ListCell   *l;
+		List	   *child_postponed_quals = NIL;
 
 		/*
 		 * First, recurse to handle child joins.  We collapse subproblems into
@@ -311,7 +334,8 @@ deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join,
 											   below_outer_join,
 											   &sub_qualscope,
 											   inner_join_rels,
-											   ptrToLocalEquiKeyList);
+											   ptrToLocalEquiKeyList,
+											   &child_postponed_quals);
 			*qualscope = bms_add_members(*qualscope, sub_qualscope);
 			sub_members = list_length(sub_joinlist);
 			remaining--;
@@ -332,6 +356,31 @@ deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join,
 		if (list_length(f->fromlist) > 1)
 			*inner_join_rels = *qualscope;
 
+		/* Try to process any quals postponed by children. If they need further
+		 * postponement, add them to my output postponed_qual_list */
+		foreach(l, child_postponed_quals)
+		{
+			PostponedQual *pq = (PostponedQual *) lfirst(l);
+
+			if (bms_is_subset(pq->relids, *qualscope))
+			{
+				distribute_qual_to_rels(root, pq->qual,
+										false, false, below_outer_join,
+										*qualscope, NULL, NULL,
+										ptrToLocalEquiKeyList,
+										NULL);
+				pfree(pq);
+			}
+			else
+			{
+				*postponed_qual_list = lappend(*postponed_qual_list, pq);
+			}
+		}
+		if (child_postponed_quals != NIL)
+		{
+			pfree(child_postponed_quals);
+		}
+
 		/*
 		 * Now process the top-level quals.
 		 */
@@ -339,7 +388,8 @@ deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join,
 			distribute_qual_to_rels(root, (Node *) lfirst(l),
 									false, false, below_outer_join,
 									*qualscope, NULL, NULL,
-									ptrToLocalEquiKeyList);
+									ptrToLocalEquiKeyList,
+									postponed_qual_list);
 	}
 	else if (IsA(jtnode, JoinExpr))
 	{
@@ -358,6 +408,8 @@ deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join,
 
         List *localLeftEquiKeyList = NIL;
         List *localRightEquiKeyList = NIL;
+
+		List *child_postponed_quals = NIL;
 		/*
 		 * Order of operations here is subtle and critical.  First we recurse
 		 * to handle sub-JOINs.  Their join quals will be placed without
@@ -376,11 +428,13 @@ deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join,
 				leftjoinlist = deconstruct_recurse(root, j->larg,
 												   below_outer_join,
 												   &leftids, &left_inners,
-												   ptrToLocalEquiKeyList);
+												   ptrToLocalEquiKeyList,
+												   &child_postponed_quals);
 				rightjoinlist = deconstruct_recurse(root, j->rarg,
 													below_outer_join,
 													&rightids, &right_inners,
-													ptrToLocalEquiKeyList);
+													ptrToLocalEquiKeyList,
+													&child_postponed_quals);
 				*qualscope = bms_union(leftids, rightids);
 				*inner_join_rels = bms_copy(*qualscope);
 				/* Inner join adds no restrictions for quals */
@@ -392,11 +446,13 @@ deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join,
 				leftjoinlist = deconstruct_recurse(root, j->larg,
 												   below_outer_join,
 												   &leftids, &left_inners,
-												   ptrToLocalEquiKeyList);
+												   ptrToLocalEquiKeyList,
+												   &child_postponed_quals);
 				rightjoinlist = deconstruct_recurse(root, j->rarg,
 													true,
 													&rightids, &right_inners,
-													&localRightEquiKeyList);
+													&localRightEquiKeyList,
+													&child_postponed_quals);
 				*qualscope = bms_union(leftids, rightids);
 				*inner_join_rels = bms_union(left_inners, right_inners);
 				nonnullable_rels = leftids;
@@ -405,11 +461,13 @@ deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join,
 				leftjoinlist = deconstruct_recurse(root, j->larg,
 												   true,
 												   &leftids, &left_inners,
-													&localLeftEquiKeyList);
+													&localLeftEquiKeyList,
+													&child_postponed_quals);
 				rightjoinlist = deconstruct_recurse(root, j->rarg,
 													true,
 													&rightids, &right_inners,
-													&localRightEquiKeyList);
+													&localRightEquiKeyList,
+													&child_postponed_quals);
 				*qualscope = bms_union(leftids, rightids);
 				*inner_join_rels = bms_union(left_inners, right_inners);
 				/* each side is both outer and inner */
@@ -420,11 +478,13 @@ deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join,
 				leftjoinlist = deconstruct_recurse(root, j->larg,
 												   true,
 												   &rightids, &right_inners,
-												   &localRightEquiKeyList);
+												   &localRightEquiKeyList,
+												   &child_postponed_quals);
 				rightjoinlist = deconstruct_recurse(root, j->rarg,
 													below_outer_join,
 													&leftids, &left_inners,
-													ptrToLocalEquiKeyList);
+													ptrToLocalEquiKeyList,
+													&child_postponed_quals);
 				*qualscope = bms_union(leftids, rightids);
 				*inner_join_rels = bms_union(left_inners, right_inners);
 				nonnullable_rels = leftids;
@@ -476,7 +536,8 @@ deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join,
                                                     (j->jointype != JOIN_INNER),
 										       &sub_qualscope,
                                                &sub_inners,
-                                               localEquiKeyList
+                                               localEquiKeyList,
+											   &child_postponed_quals
                                                );
 		    rightids = bms_add_members(rightids, sub_qualscope);
             *qualscope = bms_add_members(*qualscope, sub_qualscope);
@@ -518,12 +579,38 @@ deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join,
 			ojscope = NULL;
 		}
 
+		/* Try to process any quals postponed by children. If they need
+		 * further postponement, add them to my output postponed_qual_list */
+		foreach(qual, child_postponed_quals)
+		{
+			PostponedQual *pq = (PostponedQual *) lfirst(qual);
+
+			if (bms_is_subset(pq->relids, *qualscope))
+			{
+				distribute_qual_to_rels(root, pq->qual,
+										false, false, below_outer_join,
+										*qualscope, ojscope, nonnullable_rels,
+										ptrToLocalEquiKeyList,
+										NULL);
+				pfree(pq);
+			}
+			else
+			{
+				*postponed_qual_list = lappend(*postponed_qual_list, pq);
+			}
+		}
+		if (child_postponed_quals != NIL)
+		{
+			pfree(child_postponed_quals);
+		}
+
 		/* Process the qual clauses */
 		foreach(qual, (List *) j->quals)
 			distribute_qual_to_rels(root, (Node *) lfirst(qual),
 									false, false, below_outer_join,
 									*qualscope, ojscope, nonnullable_rels,
-									ptrToLocalEquiKeyList);
+									ptrToLocalEquiKeyList,
+									postponed_qual_list);
 
 		/* Now we can add the OuterJoinInfo to oj_info_list */
 		if (ojinfo)
@@ -801,7 +888,8 @@ distribute_qual_to_rels(PlannerInfo *root, Node *clause,
 						Relids qualscope,
 						Relids ojscope,
 						Relids outerjoin_nonnullable,
-						List **ptrToLocalEquiKeyList)
+						List **ptrToLocalEquiKeyList,
+						List **postponed_qual_list)
 {
 	Relids		relids;
 	bool		is_pushed_down;
@@ -824,7 +912,22 @@ distribute_qual_to_rels(PlannerInfo *root, Node *clause,
 	 * Otherwise the parser messed up.
 	 */
 	if (!bms_is_subset(relids, qualscope))
-		elog(ERROR, "JOIN qualification may not refer to other relations");
+	{
+		if (postponed_qual_list == NULL)
+		{
+			elog(ERROR, "JOIN qualification may not refer to other relations");
+		}
+		else
+		{
+			PostponedQual *pq = (PostponedQual *) palloc(sizeof(PostponedQual));
+
+			Assert(!is_deduced);
+			pq->qual = clause;
+			pq->relids = relids;
+			*postponed_qual_list = lappend(*postponed_qual_list, pq);
+			return;
+		}
+	}
 	if (ojscope && !bms_is_subset(relids, ojscope))
 		elog(ERROR, "JOIN qualification may not refer to other relations");
 
@@ -1366,11 +1469,11 @@ process_implied_equality(PlannerInfo *root,
 	 */
 	distribute_qual_to_rels(root, (Node *) clause,
 							true, true, false, relids, NULL, NULL,
-							NULL 
+							NULL,
 							/* NULL is okay for local equi list because
 							 *  we are recording a global equivalence
 							 */
-							);
+							NULL);
 }
 
 /*

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b95ed251/src/backend/optimizer/prep/prepjointree.c
----------------------------------------------------------------------
diff --git a/src/backend/optimizer/prep/prepjointree.c b/src/backend/optimizer/prep/prepjointree.c
index 6e4cd4e..e2c1707 100644
--- a/src/backend/optimizer/prep/prepjointree.c
+++ b/src/backend/optimizer/prep/prepjointree.c
@@ -61,7 +61,6 @@ typedef struct reduce_outer_joins_state
 
 static void pull_up_fromlist_subqueries(PlannerInfo    *root,
                                         List          **inout_fromlist,
-                                        Node          **inout_quals,
 				                        bool            below_outer_join);
 static Node *pull_up_simple_subquery(PlannerInfo *root, Node *jtnode,
 						RangeTblEntry *rte,
@@ -174,8 +173,7 @@ pull_up_subqueries(PlannerInfo *root, Node *jtnode,
 		FromExpr   *f = (FromExpr *) jtnode;
 
 		Assert(!append_rel_member);
-        pull_up_fromlist_subqueries(root, &f->fromlist, &f->quals,
-                                    below_outer_join);
+        pull_up_fromlist_subqueries(root, &f->fromlist, below_outer_join);
 	}
 	else if (IsA(jtnode, JoinExpr))
 	{
@@ -226,7 +224,7 @@ pull_up_subqueries(PlannerInfo *root, Node *jtnode,
          * side of the JOIN (right side of LEFT JOIN).
          */
         if (j->subqfromlist)
-            pull_up_fromlist_subqueries(root, &j->subqfromlist, &j->quals,
+            pull_up_fromlist_subqueries(root, &j->subqfromlist,
                                         below_outer_join || (j->jointype != JOIN_INNER));
 	}
 	else
@@ -243,7 +241,6 @@ pull_up_subqueries(PlannerInfo *root, Node *jtnode,
 static void
 pull_up_fromlist_subqueries(PlannerInfo    *root,
                             List          **inout_fromlist,
-                            Node          **inout_quals,
 				            bool            below_outer_join)
 {
     ListCell   *l;
@@ -254,30 +251,7 @@ pull_up_fromlist_subqueries(PlannerInfo    *root,
         Node   *newkid = pull_up_subqueries(root, oldkid,
 											below_outer_join, false);
 
-        /* CDB: Collapse subquery FROM list into current FROM list,
-         * so correlated refs from subquery will be in the right scope.
-         * Otherwise deconstruct_jointree() complains about them.
-         */
-        if (IsA(oldkid, RangeTblRef) &&
-            IsA(newkid, FromExpr))
-        {
-            FromExpr   *fkid = (FromExpr *)newkid;
-            ListCell   *lkid;
-
-            /* Replace RangeTblRef with subquery FROM list. */
-            foreach(lkid, fkid->fromlist)
-            {
-                if (lfirst(l) == oldkid)
-                    lfirst(l) = lfirst(lkid);
-                else
-                    l = lappend_cell(*inout_fromlist, l, lfirst(lkid));
-            }
-
-            /* Conjoin subquery WHERE clause with current search condition. */
-            *inout_quals = make_and_qual(*inout_quals, fkid->quals);
-        }
-        else
-            lfirst(l) = newkid;
+        lfirst(l) = newkid;
     }
 }                               /* pull_up_fromlist_subqueries */
 
@@ -487,6 +461,18 @@ pull_up_simple_subquery(PlannerInfo *root, Node *jtnode, RangeTblEntry *rte,
 				ResolveNew((Node *) otherrte->joinaliasvars,
 						   varno, 0, rte,
 						   subtlist, CMD_SELECT, 0);
+
+		else if (otherrte->rtekind == RTE_SUBQUERY && rte != otherrte)
+		{
+			otherrte->subquery = (Query *)
+				ResolveNew((Node *) otherrte->subquery,
+							varno, 1, rte, /* here the sublevels_up can only be 1, because if larger than 1,
+											  then the sublink is multilevel correlated, and cannot be pulled
+											  up to be a subquery range table; while on the other hand, we
+											  cannot directly put a subquery which refer to other relations
+											  of the same level after FROM. */
+							subtlist, CMD_SELECT, 0);
+		}
 	}
 
 	/*
@@ -634,17 +620,6 @@ is_simple_subquery(PlannerInfo *root, Query *subquery)
 		return false;
 
 	/*
-	 * Don't pull up a subquery that coerces values to a different domain, since
-	 * those may be used incorrectly before they are coerced by operators in 
-	 * the tree above
-	 */
-	List *corce_to_domain_list = extract_nodes(root->glob, (Node *) subquery, T_CoerceToDomain);
-	if (NIL != corce_to_domain_list)
-	{
-		return false;
-	}
-	
-	/*
 	 * Hack: don't try to pull up a subquery with an empty jointree.
 	 * query_planner() will correctly generate a Result plan for a jointree
 	 * that's totally empty, but I don't think the right things happen if an

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b95ed251/src/include/optimizer/planmain.h
----------------------------------------------------------------------
diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h
index 4bbbbab..c2d72ba 100644
--- a/src/include/optimizer/planmain.h
+++ b/src/include/optimizer/planmain.h
@@ -246,7 +246,8 @@ extern void distribute_qual_to_rels(PlannerInfo *root, Node *clause,
 						Relids qualscope,
 						Relids ojscope,
 						Relids outerjoin_nonnullable,
-						List **ptrToLocalEquiKeyList);
+						List **ptrToLocalEquiKeyList,
+						List **postponed_qual_list);
 
 
 /*

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b95ed251/src/test/regress/expected/gp_optimizer.out
----------------------------------------------------------------------
diff --git a/src/test/regress/expected/gp_optimizer.out b/src/test/regress/expected/gp_optimizer.out
index 79b82bf..23b1e5a 100644
--- a/src/test/regress/expected/gp_optimizer.out
+++ b/src/test/regress/expected/gp_optimizer.out
@@ -8602,6 +8602,82 @@ where c.cid = s.cid and s.date_sk = d.date_sk and
 (18 rows)
 
 reset optimizer_segments;
+--
+-- apply parallelization for subplan MPP-24563
+--
+create table t1_mpp_24563 (id int, value int) distributed by (id);
+insert into t1_mpp_24563 values (1, 3);
+create table t2_mpp_24563 (id int, value int, seq int) distributed by (id);
+insert into t2_mpp_24563 values (1, 7, 5);
+set optimizer = off;
+select row_number() over (order by seq asc) as id, foo.cnt
+from
+(select seq, (select count(*) from t1_mpp_24563 t1 where t1.id = t2.id) cnt from
+  t2_mpp_24563 t2 where value = 7) foo;
+ id | cnt
+----+-----
+  1 |   1
+(1 row)
+
+set optimizer = on;
+select row_number() over (order by seq asc) as id, foo.cnt
+from
+(select seq, (select count(*) from t1_mpp_24563 t1 where t1.id = t2.id) cnt from
+  t2_mpp_24563 t2 where value = 7) foo;
+ id | cnt
+----+-----
+  1 |   1
+(1 row)
+
+drop table t1_mpp_24563;
+drop table t2_mpp_24563;
+--
+-- MPP-20470 update the flow of node after parallelizing subplan.
+--
+CREATE TABLE t_mpp_20470 (
+    col_date timestamp without time zone,
+    col_name character varying(6),
+    col_expiry date
+) DISTRIBUTED BY (col_date) PARTITION BY RANGE(col_date)
+(
+START ('2013-05-10 00:00:00'::timestamp without time zone) END ('2013-05-11
+  00:00:00'::timestamp without time zone) WITH (tablename='t_mpp_20470_ptr1'),
+START ('2013-05-24 00:00:00'::timestamp without time zone) END ('2013-05-25
+  00:00:00'::timestamp without time zone) WITH (tablename='t_mpp_20470_ptr2')
+);
+NOTICE:  CREATE TABLE will create partition "t_mpp_20470_ptr1" for table "t_mpp_20470"
+NOTICE:  CREATE TABLE will create partition "t_mpp_20470_ptr2" for table "t_mpp_20470"
+COPY t_mpp_20470 from STDIN delimiter '|' null '';
+create view v1_mpp_20470 as
+SELECT
+CASE
+  WHEN  b.col_name::text = 'FUTCUR'::text
+  THEN  ( SELECT count(a.col_expiry) AS count FROM t_mpp_20470 a WHERE
+    a.col_name::text = b.col_name::text)::text
+  ELSE 'Q2'::text END  AS  cc,  1 AS nn
+FROM t_mpp_20470 b;
+set optimizer = off;
+SELECT  cc, sum(nn) over() FROM v1_mpp_20470;
+ cc | sum
+----+-----
+ Q2 |   4
+ Q2 |   4
+ 1  |   4
+ Q2 |   4
+(4 rows)
+
+set optimizer = on;
+SELECT  cc, sum(nn) over() FROM v1_mpp_20470;
+ cc | sum
+----+-----
+ Q2 |   4
+ Q2 |   4
+ 1  |   4
+ Q2 |   4
+(4 rows)
+
+drop view v1_mpp_20470;
+drop table t_mpp_20470;
 -- clean up
 drop schema orca cascade;
 NOTICE:  drop cascades to table orca.bm_dyn_test_onepart_1_prt_part5

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b95ed251/src/test/regress/expected/information_schema.out
----------------------------------------------------------------------
diff --git a/src/test/regress/expected/information_schema.out b/src/test/regress/expected/information_schema.out
index e3a1634..3084620 100644
--- a/src/test/regress/expected/information_schema.out
+++ b/src/test/regress/expected/information_schema.out
@@ -64,4 +64,38 @@ where ordinal_position = 20;
  information_schema | routines      | collation_catalog  |               20
 (13 rows)
 
+-- MPP-25724
+select a.column_name
+from information_schema.columns a
+where a.table_name
+in
+(select b.table_name from information_schema.tables b where
+	a.column_name like 'b') and a.table_name = 'r';
+ column_name
+-------------
+ b
+(1 row)
+
+select c.relname
+from pg_class c
+where c.relname
+in
+(select b.table_name from information_schema.tables b where
+	c.relname like 'r');
+ relname
+---------
+ r
+(1 row)
+
+select a.table_name
+from information_schema.tables a
+where a.table_name
+in
+(select b.relname from pg_class b where
+	a.table_name like 'r');
+ table_name
+------------
+ r
+(1 row)
+
 drop table r;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b95ed251/src/test/regress/expected/information_schema_optimizer.out
----------------------------------------------------------------------
diff --git a/src/test/regress/expected/information_schema_optimizer.out b/src/test/regress/expected/information_schema_optimizer.out
index e3a1634..3084620 100644
--- a/src/test/regress/expected/information_schema_optimizer.out
+++ b/src/test/regress/expected/information_schema_optimizer.out
@@ -64,4 +64,38 @@ where ordinal_position = 20;
  information_schema | routines      | collation_catalog  |               20
 (13 rows)
 
+-- MPP-25724
+select a.column_name
+from information_schema.columns a
+where a.table_name
+in
+(select b.table_name from information_schema.tables b where
+	a.column_name like 'b') and a.table_name = 'r';
+ column_name
+-------------
+ b
+(1 row)
+
+select c.relname
+from pg_class c
+where c.relname
+in
+(select b.table_name from information_schema.tables b where
+	c.relname like 'r');
+ relname
+---------
+ r
+(1 row)
+
+select a.table_name
+from information_schema.tables a
+where a.table_name
+in
+(select b.relname from pg_class b where
+	a.table_name like 'r');
+ table_name
+------------
+ r
+(1 row)
+
 drop table r;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b95ed251/src/test/regress/sql/gp_optimizer.sql
----------------------------------------------------------------------
diff --git a/src/test/regress/sql/gp_optimizer.sql b/src/test/regress/sql/gp_optimizer.sql
index 2f7e570..9d02ae5 100644
--- a/src/test/regress/sql/gp_optimizer.sql
+++ b/src/test/regress/sql/gp_optimizer.sql
@@ -713,5 +713,68 @@ from cust c, sales s, datedim d
 where c.cid = s.cid and s.date_sk = d.date_sk and
       ((d.year = 2001 and lower(s.type) = 't1' and plusone(d.moy) = 5) or (d.moy = 4 and upper(s.type) = 'T2'));
 reset optimizer_segments;
+
+--
+-- apply parallelization for subplan MPP-24563
+--
+create table t1_mpp_24563 (id int, value int) distributed by (id);
+insert into t1_mpp_24563 values (1, 3);
+
+create table t2_mpp_24563 (id int, value int, seq int) distributed by (id);
+insert into t2_mpp_24563 values (1, 7, 5);
+
+set optimizer = off;
+select row_number() over (order by seq asc) as id, foo.cnt
+from
+(select seq, (select count(*) from t1_mpp_24563 t1 where t1.id = t2.id) cnt from
+	t2_mpp_24563 t2 where value = 7) foo;
+
+set optimizer = on;
+select row_number() over (order by seq asc) as id, foo.cnt
+from
+(select seq, (select count(*) from t1_mpp_24563 t1 where t1.id = t2.id) cnt from
+	t2_mpp_24563 t2 where value = 7) foo;
+
+drop table t1_mpp_24563;
+drop table t2_mpp_24563;
+
+--
+-- MPP-20470 update the flow of node after parallelizing subplan.
+--
+CREATE TABLE t_mpp_20470 (
+    col_date timestamp without time zone,
+    col_name character varying(6),
+    col_expiry date
+) DISTRIBUTED BY (col_date) PARTITION BY RANGE(col_date)
+(
+START ('2013-05-10 00:00:00'::timestamp without time zone) END ('2013-05-11
+	00:00:00'::timestamp without time zone) WITH (tablename='t_mpp_20470_ptr1'),
+START ('2013-05-24 00:00:00'::timestamp without time zone) END ('2013-05-25
+	00:00:00'::timestamp without time zone) WITH (tablename='t_mpp_20470_ptr2')
+);
+
+COPY t_mpp_20470 from STDIN delimiter '|' null '';
+2013-05-10 00:00:00|OPTCUR|2013-05-29
+2013-05-10 04:35:20|OPTCUR|2013-05-29
+2013-05-24 03:10:30|FUTCUR|2014-04-28
+2013-05-24 05:32:34|OPTCUR|2013-05-29
+\.
+
+create view v1_mpp_20470 as
+SELECT
+CASE
+	WHEN  b.col_name::text = 'FUTCUR'::text
+	THEN  ( SELECT count(a.col_expiry) AS count FROM t_mpp_20470 a WHERE
+		a.col_name::text = b.col_name::text)::text
+	ELSE 'Q2'::text END  AS  cc,  1 AS nn
+FROM t_mpp_20470 b;
+
+set optimizer = off;
+SELECT  cc, sum(nn) over() FROM v1_mpp_20470;
+set optimizer = on;
+SELECT  cc, sum(nn) over() FROM v1_mpp_20470;
+
+drop view v1_mpp_20470;
+drop table t_mpp_20470;
 -- clean up
 drop schema orca cascade;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/b95ed251/src/test/regress/sql/information_schema.sql
----------------------------------------------------------------------
diff --git a/src/test/regress/sql/information_schema.sql b/src/test/regress/sql/information_schema.sql
index ff8b06a..ef2db03 100644
--- a/src/test/regress/sql/information_schema.sql
+++ b/src/test/regress/sql/information_schema.sql
@@ -32,4 +32,26 @@ select table_schema, table_name,column_name,ordinal_position
 from information_schema.columns
 where ordinal_position = 20;
 
+-- MPP-25724
+select a.column_name
+from information_schema.columns a
+where a.table_name
+in
+(select b.table_name from information_schema.tables b where
+	a.column_name like 'b') and a.table_name = 'r';
+
+select c.relname
+from pg_class c
+where c.relname
+in
+(select b.table_name from information_schema.tables b where
+	c.relname like 'r');
+
+select a.table_name
+from information_schema.tables a
+where a.table_name
+in
+(select b.relname from pg_class b where
+	a.table_name like 'r');
+
 drop table r;


[4/4] incubator-hawq git commit: HAWQ-161. Port GPDB planner fixes to HAWQ Query crashed with segmentation fault with optimizer=off.

Posted by nh...@apache.org.
HAWQ-161. Port GPDB planner fixes to HAWQ
Query crashed with segmentation fault with optimizer=off.

The crash happened in executor, while the root cause is in planner. The planner
mistakenly believe that, when both the root node of subplan and the corresponding
main plan node are executed in QD, there is no need to call function
ParallelizeCorrelatedSubPlan. While, actually, in this situation presented in
the JIRA, both the main plan node Window and subplan node Agg are executed in QD,
we still need to do the subplan parallelization, otherwise, the SeqScan node
cannot get the parameter from main plan, because they are in different slices.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/6e2846fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/6e2846fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/6e2846fa

Branch: refs/heads/master
Commit: 6e2846fafe40c4259a5e81e2f5c187ff04bb8c07
Parents: 1b11926
Author: Kenan Yao <ky...@pivotal.io>
Authored: Mon Nov 24 11:31:47 2014 +0800
Committer: Noa Horn <nh...@pivotal.io>
Committed: Wed Jan 20 12:15:52 2016 -0800

----------------------------------------------------------------------
 src/backend/cdb/cdbllize.c | 37 ++++++++++++++++++++++++++++++++++++-
 1 file changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/6e2846fa/src/backend/cdb/cdbllize.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbllize.c b/src/backend/cdb/cdbllize.c
index 4aa8795..8e02f3c 100644
--- a/src/backend/cdb/cdbllize.c
+++ b/src/backend/cdb/cdbllize.c
@@ -329,6 +329,7 @@ typedef struct ParallelizeCorrelatedPlanWalkerContext
 	Movement movement; /* What is the final movement necessary? Is it gather or broadcast */
 	List *rtable; /* rtable from the global context */
 	bool subPlanDistributed; /* is original subplan distributed */
+	bool subPlanHasMotion;/* is original subplan has motion already  */
 } ParallelizeCorrelatedPlanWalkerContext;
 
 /**
@@ -462,6 +463,20 @@ static Node* ParallelizeCorrelatedSubPlanMutator(Node *node, ParallelizeCorrelat
 		|| IsA(node, ShareInputScan))
 	{
 		Plan *scanPlan = (Plan *) node;
+		/**
+		 * If original subplan has no motion, we double check whether the scan
+		 * node is on catalog table or not. If catalog, no need to apply
+		 * parallelization.
+		 * This is for case like:
+		 * SELECT array(select case when p.oid in (select
+		 * unnest(array[typoutput, typsend]) from pg_type) then 'upg_catalog.'
+		 * else 'pg_catalog.' end) FROM pg_proc p;
+		 **/
+		if(!ctx->subPlanHasMotion)
+		{
+			if(!scanPlan->flow || scanPlan->flow->flotype == FLOW_REPLICATED)
+				return (Node *)node;
+		}
 
 		/**
 		 * Steps:
@@ -595,6 +610,19 @@ static Node* ParallelizeCorrelatedSubPlanMutator(Node *node, ParallelizeCorrelat
 		{
 			return node;
 		}
+
+		/**
+		 * Pull up the parallelization of subplan to here, because we want to
+		 * set the is_parallelized flag. We don't want to apply double
+		 * parallelization for nested subplan.
+		 * This is for case like:
+		 * select A.i, A.j, (select sum(C.j) from C where C.j = A.j and C.i =
+		 * (select A.i from A where A.i = C.i)) from A;
+		 **/
+		SubPlan* new_sp = (SubPlan *)plan_tree_mutator(node, ParallelizeCorrelatedSubPlanMutator, ctx);
+		Assert(new_sp);
+		new_sp->is_parallelized = true;
+		return (Node *)new_sp;
 	}
 
 	/**
@@ -603,6 +631,7 @@ static Node* ParallelizeCorrelatedSubPlanMutator(Node *node, ParallelizeCorrelat
 	 */
 	if (IsA(node, Motion))
 	{
+		ctx->subPlanHasMotion = true;
 		Plan *plan = (Plan *) node;
 		node = (Node *) plan->lefttree;
 		Assert(node);
@@ -621,6 +650,7 @@ Plan* ParallelizeCorrelatedSubPlan(PlannerInfo *root, SubPlan *spExpr, Plan *pla
 	ctx.base.node = (Node *) root;
 	ctx.movement = m;
 	ctx.subPlanDistributed = subPlanDistributed;
+	ctx.subPlanHasMotion = false;
 	ctx.sp = spExpr;
 	ctx.rtable = root->glob->finalrtable;
 	return (Plan *) ParallelizeCorrelatedSubPlanMutator((Node *) plan, &ctx);
@@ -655,6 +685,7 @@ void ParallelizeSubplan(SubPlan *spExpr, PlanProfile *context)
 
 	bool containingPlanDistributed = (context->currentPlanFlow && context->currentPlanFlow->flotype == FLOW_PARTITIONED);
 	bool subPlanDistributed = (origPlan->flow && origPlan->flow->flotype == FLOW_PARTITIONED);
+	bool hasParParam = (list_length(spExpr->parParam) > 0);
 
 	/**
 	 * If containing plan is distributed then we must know the flow of the subplan.
@@ -693,7 +724,11 @@ void ParallelizeSubplan(SubPlan *spExpr, PlanProfile *context)
 
 		newPlan = materialize_subplan(context->root, newPlan);
 	}
-	else if (containingPlanDistributed || subPlanDistributed)
+	/* *
+	 * [JIRA: MPP-24563] Adding hasParParam check here, for the kind of cases in
+	 * JIRA, which has both focused parent plan and subplan.
+	 * */
+	else if(containingPlanDistributed || subPlanDistributed || hasParParam)
 	{
 		Movement reqMove = containingPlanDistributed ? MOVEMENT_BROADCAST : MOVEMENT_FOCUS;
 


[3/4] incubator-hawq git commit: HAWQ-161. Port GPDB planner fixes to HAWQ Planner generated unnecessary motion node for nested subplan which scan catalog table.

Posted by nh...@apache.org.
HAWQ-161. Port GPDB planner fixes to HAWQ
Planner generated unnecessary motion node for nested subplan which scan catalog table.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/0f6beb62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/0f6beb62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/0f6beb62

Branch: refs/heads/master
Commit: 0f6beb62fa1b14a8ca1e8054c75345625cfb4464
Parents: ada7919
Author: Kenan Yao <ky...@pivotal.io>
Authored: Thu Nov 12 15:13:16 2015 -0800
Committer: Noa Horn <nh...@pivotal.io>
Committed: Wed Jan 20 12:15:52 2016 -0800

----------------------------------------------------------------------
 src/backend/cdb/cdbllize.c           | 21 +++++++++++++--------
 src/backend/optimizer/plan/setrefs.c |  1 +
 2 files changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0f6beb62/src/backend/cdb/cdbllize.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbllize.c b/src/backend/cdb/cdbllize.c
index 4d58b6b..59a59d8 100644
--- a/src/backend/cdb/cdbllize.c
+++ b/src/backend/cdb/cdbllize.c
@@ -329,7 +329,6 @@ typedef struct ParallelizeCorrelatedPlanWalkerContext
 	Movement movement; /* What is the final movement necessary? Is it gather or broadcast */
 	List *rtable; /* rtable from the global context */
 	bool subPlanDistributed; /* is original subplan distributed */
-	bool subPlanHasMotion;/* is original subplan has motion already  */
 } ParallelizeCorrelatedPlanWalkerContext;
 
 /**
@@ -416,12 +415,20 @@ Plan *materialize_subplan(PlannerInfo *root, Plan *subplan)
 	return mat;
 }
 
+/* *
+ * Not listing every node type here, if further bug found due to not updating flow
+ * after cdbparallelize, just simply add that node type here.
+ * */
 static Node *ParallelizeCorrelatedSubPlanUpdateFlowMutator(Node *node)
 {
 	Assert(is_plan_node(node));
 	switch (nodeTag(node))
 	{
 		case T_Agg:
+		case T_Window:
+		case T_Sort:
+		case T_Material:
+		case T_Limit:
 		case T_Result:
 		{
 			if(((Plan *)node)->lefttree && ((Plan *)node)->lefttree->flow)
@@ -501,11 +508,8 @@ static Node* ParallelizeCorrelatedSubPlanMutator(Node *node, ParallelizeCorrelat
 		 * unnest(array[typoutput, typsend]) from pg_type) then 'upg_catalog.'
 		 * else 'pg_catalog.' end) FROM pg_proc p;
 		 **/
-		if(!ctx->subPlanHasMotion)
-		{
-			if(!scanPlan->flow || scanPlan->flow->flotype == FLOW_REPLICATED)
-				return (Node *)node;
-		}
+		if(scanPlan->flow && (scanPlan->flow->locustype == CdbLocusType_Entry))
+			return (Node *)node;
 
 		/**
 		 * Steps:
@@ -663,7 +667,6 @@ static Node* ParallelizeCorrelatedSubPlanMutator(Node *node, ParallelizeCorrelat
 	 */
 	if (IsA(node, Motion))
 	{
-		ctx->subPlanHasMotion = true;
 		Plan *plan = (Plan *) node;
 		node = (Node *) plan->lefttree;
 		Assert(node);
@@ -671,6 +674,9 @@ static Node* ParallelizeCorrelatedSubPlanMutator(Node *node, ParallelizeCorrelat
 	}
 
 	Node * result = plan_tree_mutator(node, ParallelizeCorrelatedSubPlanMutator, ctx);
+	/* *
+	 * Update the flow of the current plan node.
+	 * */
 	if(is_plan_node(node))
 		return ParallelizeCorrelatedSubPlanUpdateFlowMutator(result);
 	return result;
@@ -685,7 +691,6 @@ Plan* ParallelizeCorrelatedSubPlan(PlannerInfo *root, SubPlan *spExpr, Plan *pla
 	ctx.base.node = (Node *) root;
 	ctx.movement = m;
 	ctx.subPlanDistributed = subPlanDistributed;
-	ctx.subPlanHasMotion = false;
 	ctx.sp = spExpr;
 	ctx.rtable = root->glob->finalrtable;
 	return (Plan *) ParallelizeCorrelatedSubPlanMutator((Node *) plan, &ctx);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/0f6beb62/src/backend/optimizer/plan/setrefs.c
----------------------------------------------------------------------
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index afce5a5..2f70516 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -2528,6 +2528,7 @@ cdb_insert_result_node(PlannerGlobal *glob, Plan *plan, int rtoffset)
 	
     /* Reattach the Flow node. */
     resultplan->flow = flow;
+    plan->flow = flow;
 	
     return resultplan;
 }                               /* cdb_insert_result_node */


[2/4] incubator-hawq git commit: HAWQ-161. Port GPDB planner fixes to HAWQ Query Crashes with "Error on receive...server closed the connection unexpectedly".

Posted by nh...@apache.org.
HAWQ-161. Port GPDB planner fixes to HAWQ
Query Crashes with "Error on receive...server closed the connection unexpectedly".

Same issue as MPP-24563; after applying the fix for MPP-24563, this case still
failed because of assert failure in apply_shareinput_xslice. Root cause is that
cdbparallelize changes the plan but does not update the flow of the upper nodes,
therefore, once ParallelizeCorrelatedSubPlanMutator takes action, then we cannot
gurantee the correctness of the upper nodes' flow. Fix is to update the flow after
transforming the lower nodes. Note that, the function
ParallelizeCorrelatedSubPlanUpdateFlowMutator currently only handles the three
known situations, if more cases of this kind found, just twickle that function
can solve the problem.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/ada79191
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/ada79191
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/ada79191

Branch: refs/heads/master
Commit: ada7919105cc1dd2042e1d675a6a6690ef7b51a1
Parents: 6e2846f
Author: Kenan Yao <ky...@pivotal.io>
Authored: Wed Dec 24 14:17:24 2014 +0800
Committer: Noa Horn <nh...@pivotal.io>
Committed: Wed Jan 20 12:15:52 2016 -0800

----------------------------------------------------------------------
 src/backend/cdb/cdbllize.c | 37 ++++++++++++++++++++++++++++++++++++-
 1 file changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ada79191/src/backend/cdb/cdbllize.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbllize.c b/src/backend/cdb/cdbllize.c
index 8e02f3c..4d58b6b 100644
--- a/src/backend/cdb/cdbllize.c
+++ b/src/backend/cdb/cdbllize.c
@@ -350,6 +350,7 @@ static void ParallelizeSubplan(SubPlan *spExpr, PlanProfile *context);
 static Plan* ParallelizeCorrelatedSubPlan(PlannerInfo *root, SubPlan *spExpr, Plan *plan, Movement m, bool subPlanDistributed);
 static Node* MapVarsMutator(Node *expr, MapVarsMutatorContext *ctx);
 static Node* ParallelizeCorrelatedSubPlanMutator(Node *node, ParallelizeCorrelatedPlanWalkerContext *ctx);
+static Node *ParallelizeCorrelatedSubPlanUpdateFlowMutator(Node *node);
 
 /**
  * Does an expression contain a parameter?
@@ -415,6 +416,34 @@ Plan *materialize_subplan(PlannerInfo *root, Plan *subplan)
 	return mat;
 }
 
+static Node *ParallelizeCorrelatedSubPlanUpdateFlowMutator(Node *node)
+{
+	Assert(is_plan_node(node));
+	switch (nodeTag(node))
+	{
+		case T_Agg:
+		case T_Result:
+		{
+			if(((Plan *)node)->lefttree && ((Plan *)node)->lefttree->flow)
+				((Plan *)node)->flow = pull_up_Flow((Plan *)node, ((Plan *)node)->lefttree, true);
+			break;
+		}
+		case T_Append:
+		{
+			List *append_list = (List *) ((Append *)node)->appendplans;
+			if(append_list == NULL)
+				break;
+			Plan *first_append = (Plan *) (lfirst(list_head(append_list)));
+			Assert(first_append && first_append->flow);
+			bool with_sort = list_length(append_list) == 1 ? true : false;
+			((Plan *)node)->flow = pull_up_Flow((Plan *)node, first_append, with_sort);
+			break;
+		}
+		default:
+			break;
+	}
+	return node;
+}
 
 /**
  * This is the workhorse method that transforms a plan containing correlated references
@@ -587,6 +616,9 @@ static Node* ParallelizeCorrelatedSubPlanMutator(Node *node, ParallelizeCorrelat
 		((Plan *) res)->allParam = saveAllParam;
 		((Plan *) res)->extParam = saveExtParam;
 
+		Assert(((Plan *) res)->lefttree && ((Plan *) res)->lefttree->flow);
+		((Plan *) res)->flow = pull_up_Flow((Plan *) res, ((Plan *)res)->lefttree, true);
+
 		/**
 		 * It is possible that there is an additional level of correlation in the result node.
 		 * we will need to recurse in these structures again.
@@ -638,7 +670,10 @@ static Node* ParallelizeCorrelatedSubPlanMutator(Node *node, ParallelizeCorrelat
 		return ParallelizeCorrelatedSubPlanMutator(node, ctx);
 	}
 
-	return plan_tree_mutator(node, ParallelizeCorrelatedSubPlanMutator, ctx);
+	Node * result = plan_tree_mutator(node, ParallelizeCorrelatedSubPlanMutator, ctx);
+	if(is_plan_node(node))
+		return ParallelizeCorrelatedSubPlanUpdateFlowMutator(result);
+	return result;
 }
 
 /**