You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/29 17:13:33 UTC

[GitHub] [pinot] walterddr opened a new pull request, #9870: [multistage][hotfix] shuffle rewrite

walterddr opened a new pull request, #9870:
URL: https://github.com/apache/pinot/pull/9870

   shuffle exchange rewrite rules are not carrying partition correctly. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
61yao commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035482458


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java:
##########
@@ -189,4 +172,27 @@ private static boolean canSkipShuffle(Set<Integer> partitionKeys, KeySelector<Ob
     }
     return false;
   }
+
+  private static Set<Integer> deriveNewPartitionKeysFromRexExpressions(List<RexExpression> rexExpressionList,
+      Set<Integer> oldPartitionKeys) {
+    Map<Integer, Integer> partitionKeyMap = new HashMap<>();
+    for (int i = 0; i < rexExpressionList.size(); i++) {
+      RexExpression rex = rexExpressionList.get(i);
+      if (rex instanceof RexExpression.InputRef) {
+        // put the old-index to new-index mapping
+        // TODO: it doesn't handle duplicate references. e.g. if the same old partition key is referred twice. it will
+        // only keep the second one. (see JOIN handling on left/right as another example)
+        partitionKeyMap.put(((RexExpression.InputRef) rex).getIndex(), i);

Review Comment:
   Is there any other case except join this duplicate reference can happen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035398938


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java:
##########
@@ -150,25 +145,14 @@ public Set<Integer> visitMailboxSend(MailboxSendNode node, Void context) {
   @Override
   public Set<Integer> visitProject(ProjectNode node, Void context) {
     Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this, context);
-
-    // all inputs carry over if they're still in the projection result
-    Set<Integer> partitionKeys = new HashSet<>();
-    for (int i = 0; i < node.getProjects().size(); i++) {
-      RexExpression rex = node.getProjects().get(i);
-      if (rex instanceof RexExpression.InputRef) {
-        if (oldPartitionKeys.contains(((RexExpression.InputRef) rex).getIndex())) {
-          partitionKeys.add(i);
-        }
-      }
-    }
-
-    return partitionKeys;
+    return deriveNewPartitionKeysFromRexExpressions(node.getProjects(), oldPartitionKeys);
   }
 
   @Override
   public Set<Integer> visitSort(SortNode node, Void context) {
-    // sort doesn't change the partition keys
-    return node.getInputs().get(0).visit(this, context);
+    // with sort shuffling reorder could change, so we can't directly return the old partition keys.

Review Comment:
   CC @agavra ^



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035529816


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java:
##########
@@ -189,4 +172,27 @@ private static boolean canSkipShuffle(Set<Integer> partitionKeys, KeySelector<Ob
     }
     return false;
   }
+
+  private static Set<Integer> deriveNewPartitionKeysFromRexExpressions(List<RexExpression> rexExpressionList,
+      Set<Integer> oldPartitionKeys) {
+    Map<Integer, Integer> partitionKeyMap = new HashMap<>();
+    for (int i = 0; i < rexExpressionList.size(); i++) {
+      RexExpression rex = rexExpressionList.get(i);
+      if (rex instanceof RexExpression.InputRef) {
+        // put the old-index to new-index mapping
+        // TODO: it doesn't handle duplicate references. e.g. if the same old partition key is referred twice. it will
+        // only keep the second one. (see JOIN handling on left/right as another example)
+        partitionKeyMap.put(((RexExpression.InputRef) rex).getIndex(), i);

Review Comment:
   technically this is not a "bug" b/c the partitionkey generated this rewrite is in fact "CORRECT".
   
   both the join situation and the project with multiple select are considered as "equivalent" partition and affects the "canSkipShuffle" conditions. these also include:
   
   1. transformed version instead of just inputRef, such as ` 1 - partitionKey1`
   2. mixed of transformed with all partitionKeys included and guaranteed to be unique, such as `makeListUDF(partitionKey1, partitionKey2)`
   3. ...
   
   There can be many more, we can iteratively add support for these semantics



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035284833


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java:
##########
@@ -150,19 +147,11 @@ public Set<Integer> visitMailboxSend(MailboxSendNode node, Void context) {
   @Override
   public Set<Integer> visitProject(ProjectNode node, Void context) {
     Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this, context);
-
-    // all inputs carry over if they're still in the projection result
-    Set<Integer> partitionKeys = new HashSet<>();
-    for (int i = 0; i < node.getProjects().size(); i++) {
-      RexExpression rex = node.getProjects().get(i);
-      if (rex instanceof RexExpression.InputRef) {
-        if (oldPartitionKeys.contains(((RexExpression.InputRef) rex).getIndex())) {
-          partitionKeys.add(i);
-        }
-      }
+    if (rexExpressionListContainsAllPartitionKey(node.getProjects(), oldPartitionKeys)) {

Review Comment:
   actually there's another bug. fixing 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035040562


##########
pinot-query-runtime/src/test/resources/queries/SelectExpressions.json:
##########
@@ -36,7 +36,8 @@
       { "sql": "SELECT intCol as \"value\", doubleCol + floatCol AS \"sum\" FROM {tbl1}"},
       { "sql": "SELECT intCol as \"from\" FROM {tbl1}"},
       { "sql": "SELECT intCol as key, SUM(doubleCol + floatCol) AS aggSum FROM {tbl1} GROUP BY intCol"},
-      { "sql": "SELECT a.intCol as key, SUM(a.doubleCol + b.intCol) AS aggSum FROM {tbl1} AS a JOIN {tbl2} AS b ON a.intCol = b.intCol GROUP BY a.intCol"}
+      { "sql": "SELECT intCol, SUM(avgVal) FROM (SELECT strCol, intCol, AVG(doubleCol) AS avgVal FROM {tbl1} GROUP BY intCol, strCol) GROUP BY intCol"},
+      { "sql": "SELECT strCol, MAX(sumVal), MAX(sumVal + avgVal) AS transVal FROM (SELECT strCol, intCol, SUM(floatCol + 2 * intCol) AS sumVal, AVG(doubleCol) AS avgVal FROM {tbl1} GROUP BY strCol, intCol) GROUP BY strCol ORDER BY MAX(sumVal)" }

Review Comment:
   these 2 statements previously causes project and agg node visit to not return the right partition keys. now it is fixed. CC @ankitsultana 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035275130


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java:
##########
@@ -150,19 +147,11 @@ public Set<Integer> visitMailboxSend(MailboxSendNode node, Void context) {
   @Override
   public Set<Integer> visitProject(ProjectNode node, Void context) {
     Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this, context);
-
-    // all inputs carry over if they're still in the projection result
-    Set<Integer> partitionKeys = new HashSet<>();
-    for (int i = 0; i < node.getProjects().size(); i++) {
-      RexExpression rex = node.getProjects().get(i);
-      if (rex instanceof RexExpression.InputRef) {
-        if (oldPartitionKeys.contains(((RexExpression.InputRef) rex).getIndex())) {
-          partitionKeys.add(i);
-        }
-      }
+    if (rexExpressionListContainsAllPartitionKey(node.getProjects(), oldPartitionKeys)) {

Review Comment:
   project can make a partition become unpartitioned. for example
   ```
   - Aggregation [ GroupSet = (0, 1, 2), AggCall = (...) ]
     - Project [ SelectList = [inputRef(0), inputRef(1), func(xx)], PartitionKeys = [] ]
       - MailboxReceived [ PartitionKeys = [inputRef(0), inputRef(1), inputRef(2)] ]
   ```
   we need to remove the PartitionKeys (no longer inputRef [0, 1, 2]) because even the group set is now (0, 1, 2) it is no longer the same 0, 1, 2 it was referring to at the MailboxReceived level. so a simple passthrough doesn't work.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr merged pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
walterddr merged PR #9870:
URL: https://github.com/apache/pinot/pull/9870


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
walterddr commented on PR #9870:
URL: https://github.com/apache/pinot/pull/9870#issuecomment-1332524377

   @61yao merging this first to unblock some other PRs. please let me know if anything else requires follow up.
   
   > Can we also add query option to disable shuffle if needed. So we can know 1) the performance gain from this 2) remove the shuffle when we see an issue
   
   yes this can be done in follow up PRs. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035398938


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java:
##########
@@ -150,25 +145,14 @@ public Set<Integer> visitMailboxSend(MailboxSendNode node, Void context) {
   @Override
   public Set<Integer> visitProject(ProjectNode node, Void context) {
     Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this, context);
-
-    // all inputs carry over if they're still in the projection result
-    Set<Integer> partitionKeys = new HashSet<>();
-    for (int i = 0; i < node.getProjects().size(); i++) {
-      RexExpression rex = node.getProjects().get(i);
-      if (rex instanceof RexExpression.InputRef) {
-        if (oldPartitionKeys.contains(((RexExpression.InputRef) rex).getIndex())) {
-          partitionKeys.add(i);
-        }
-      }
-    }
-
-    return partitionKeys;
+    return deriveNewPartitionKeysFromRexExpressions(node.getProjects(), oldPartitionKeys);
   }
 
   @Override
   public Set<Integer> visitSort(SortNode node, Void context) {
-    // sort doesn't change the partition keys
-    return node.getInputs().get(0).visit(this, context);
+    // with sort shuffling reorder could change, so we can't directly return the old partition keys.

Review Comment:
   CC @agavra ^ please correct me if i were wrong



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035275130


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java:
##########
@@ -150,19 +147,11 @@ public Set<Integer> visitMailboxSend(MailboxSendNode node, Void context) {
   @Override
   public Set<Integer> visitProject(ProjectNode node, Void context) {
     Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this, context);
-
-    // all inputs carry over if they're still in the projection result
-    Set<Integer> partitionKeys = new HashSet<>();
-    for (int i = 0; i < node.getProjects().size(); i++) {
-      RexExpression rex = node.getProjects().get(i);
-      if (rex instanceof RexExpression.InputRef) {
-        if (oldPartitionKeys.contains(((RexExpression.InputRef) rex).getIndex())) {
-          partitionKeys.add(i);
-        }
-      }
+    if (rexExpressionListContainsAllPartitionKey(node.getProjects(), oldPartitionKeys)) {

Review Comment:
   project can make a partition become unpartitioned. for example
   ```
   - Aggregation [ GroupSet = (0, 1, 2), AggCall = (...) ]
     - Project [ SelectList = [inputRef(0), inputRef(1), func(xx)], PartitionKeys = [] ]
       - MailboxReceived [ PartitionKeys = [inputRef(0), inputRef(1), inputRef(2)] ]
   ```
   we need to remove the PartitionKeys (no longer inputRef [0, 1, 2]) because even the group set is now (0, 1, 2) it is no longer the same 0, 1, 2 it was referring to
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035233750


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java:
##########
@@ -150,19 +147,11 @@ public Set<Integer> visitMailboxSend(MailboxSendNode node, Void context) {
   @Override
   public Set<Integer> visitProject(ProjectNode node, Void context) {
     Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this, context);
-
-    // all inputs carry over if they're still in the projection result
-    Set<Integer> partitionKeys = new HashSet<>();
-    for (int i = 0; i < node.getProjects().size(); i++) {
-      RexExpression rex = node.getProjects().get(i);
-      if (rex instanceof RexExpression.InputRef) {
-        if (oldPartitionKeys.contains(((RexExpression.InputRef) rex).getIndex())) {
-          partitionKeys.add(i);
-        }
-      }
+    if (rexExpressionListContainsAllPartitionKey(node.getProjects(), oldPartitionKeys)) {

Review Comment:
   why is this necessary for project? projects shouldn't change partitioning, correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035064863


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java:
##########
@@ -54,7 +55,7 @@ public class ShuffleRewriteVisitor implements StageNodeVisitor<Set<Integer>, Voi
    * @param root the root node of the tree to rewrite
    */
   public static void optimizeShuffles(StageNode root) {
-    root.visit(new ShuffleRewriteVisitor(), null);
+     root.visit(new ShuffleRewriteVisitor(), null);

Review Comment:
   ```suggestion
       root.visit(new ShuffleRewriteVisitor(), null);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035415921


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java:
##########
@@ -150,25 +145,14 @@ public Set<Integer> visitMailboxSend(MailboxSendNode node, Void context) {
   @Override
   public Set<Integer> visitProject(ProjectNode node, Void context) {
     Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this, context);
-
-    // all inputs carry over if they're still in the projection result
-    Set<Integer> partitionKeys = new HashSet<>();
-    for (int i = 0; i < node.getProjects().size(); i++) {
-      RexExpression rex = node.getProjects().get(i);
-      if (rex instanceof RexExpression.InputRef) {
-        if (oldPartitionKeys.contains(((RexExpression.InputRef) rex).getIndex())) {
-          partitionKeys.add(i);
-        }
-      }
-    }
-
-    return partitionKeys;
+    return deriveNewPartitionKeysFromRexExpressions(node.getProjects(), oldPartitionKeys);
   }
 
   @Override
   public Set<Integer> visitSort(SortNode node, Void context) {
-    // sort doesn't change the partition keys
-    return node.getInputs().get(0).visit(this, context);
+    // with sort shuffling reorder could change, so we can't directly return the old partition keys.

Review Comment:
   synced offline. turns out if there's any re-ordering a project will always be added. so no need for handling this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
61yao commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035483859


##########
pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java:
##########
@@ -79,13 +79,39 @@ public void testQueryWithException(String query, String exceptionSnippet) {
   }
 
   @Test
-  public void testQueryGroupByAfterJoinShouldNotDoDataShuffle()
+  public void testQueryGroupByAfterJoinShouldProperlyRewriteShuffle()
       throws Exception {
     String query = "SELECT a.col1, a.col2, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 "
         + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col1, a.col2";
     QueryPlan queryPlan = _queryEnvironment.planQuery(query);
-    Assert.assertEquals(queryPlan.getQueryStageMap().size(), 5);
-    Assert.assertEquals(queryPlan.getStageMetadataMap().size(), 5);
+    assertGroupBySingletonAfterJoin(queryPlan, true);
+
+    // same query with selection list re-odering should also work
+    query = "SELECT a.col2, a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 "
+        + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col2, a.col1";
+    queryPlan = _queryEnvironment.planQuery(query);
+    assertGroupBySingletonAfterJoin(queryPlan, true);
+
+    // exact same group key should also work
+    query = "SELECT a.col1, a.col2, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 AND a.col2 = b.col2"
+        + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col1, a.col2";
+    queryPlan = _queryEnvironment.planQuery(query);
+    assertGroupBySingletonAfterJoin(queryPlan, true);
+
+    // shrinking group key should not rewrite into singleton
+    query = "SELECT a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 AND a.col2 = b.col2"
+        + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col1";
+    queryPlan = _queryEnvironment.planQuery(query);
+    assertGroupBySingletonAfterJoin(queryPlan, false);
+
+    // mismatched group key should not rewrite into singleton
+    query = "SELECT a.col3, a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 AND a.col2 = b.col2"
+        + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col1, a.col3";
+    queryPlan = _queryEnvironment.planQuery(query);
+    assertGroupBySingletonAfterJoin(queryPlan, false);

Review Comment:
   Can we add a test case for expanding group key? 
   
    query = "SELECT a.col1, a.col2, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 GROUP BY a.col1, a.col2";



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
61yao commented on PR #9870:
URL: https://github.com/apache/pinot/pull/9870#issuecomment-1331586890

   Can we also add query option to disable shuffle if needed. So we can know 1) the performance gain from this 2) remove the shuffle when we see an issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035392306


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java:
##########
@@ -150,25 +145,14 @@ public Set<Integer> visitMailboxSend(MailboxSendNode node, Void context) {
   @Override
   public Set<Integer> visitProject(ProjectNode node, Void context) {
     Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this, context);
-
-    // all inputs carry over if they're still in the projection result
-    Set<Integer> partitionKeys = new HashSet<>();
-    for (int i = 0; i < node.getProjects().size(); i++) {
-      RexExpression rex = node.getProjects().get(i);
-      if (rex instanceof RexExpression.InputRef) {
-        if (oldPartitionKeys.contains(((RexExpression.InputRef) rex).getIndex())) {
-          partitionKeys.add(i);
-        }
-      }
-    }
-
-    return partitionKeys;
+    return deriveNewPartitionKeysFromRexExpressions(node.getProjects(), oldPartitionKeys);
   }
 
   @Override
   public Set<Integer> visitSort(SortNode node, Void context) {
-    // sort doesn't change the partition keys
-    return node.getInputs().get(0).visit(this, context);
+    // with sort shuffling reorder could change, so we can't directly return the old partition keys.

Review Comment:
   Can you share how this happens? My understanding is that sort happens within a stage and doesn't rename/remove any columns so we should be able to carry-over the keys.



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java:
##########
@@ -189,4 +173,27 @@ private static boolean canSkipShuffle(Set<Integer> partitionKeys, KeySelector<Ob
     }
     return false;
   }
+
+  private static Set<Integer> deriveNewPartitionKeysFromRexExpressions(List<RexExpression> rexExpressionList,
+      Set<Integer> oldPartitionKeys) {
+    Map<Integer, Integer> partitionKeyMap = new HashMap<>();
+    for (int i = 0; i < rexExpressionList.size(); i++) {
+      RexExpression rex = rexExpressionList.get(i);
+      if (rex instanceof RexExpression.InputRef) {
+        // put the old-index to new-index mapping

Review Comment:
   I have arrived to something similar in this: https://github.com/apache/pinot/pull/9873/files
   
   That PR is still being tested.. we can sync up later for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035398836


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java:
##########
@@ -150,25 +145,14 @@ public Set<Integer> visitMailboxSend(MailboxSendNode node, Void context) {
   @Override
   public Set<Integer> visitProject(ProjectNode node, Void context) {
     Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this, context);
-
-    // all inputs carry over if they're still in the projection result
-    Set<Integer> partitionKeys = new HashSet<>();
-    for (int i = 0; i < node.getProjects().size(); i++) {
-      RexExpression rex = node.getProjects().get(i);
-      if (rex instanceof RexExpression.InputRef) {
-        if (oldPartitionKeys.contains(((RexExpression.InputRef) rex).getIndex())) {
-          partitionKeys.add(i);
-        }
-      }
-    }
-
-    return partitionKeys;
+    return deriveNewPartitionKeysFromRexExpressions(node.getProjects(), oldPartitionKeys);
   }
 
   @Override
   public Set<Integer> visitSort(SortNode node, Void context) {
-    // sort doesn't change the partition keys
-    return node.getInputs().get(0).visit(this, context);
+    // with sort shuffling reorder could change, so we can't directly return the old partition keys.

Review Comment:
   sortNode result record type will put collation columns in front of the rest of the selections. which essentially act as a "project". so it should follow similar rules to reorder the partitionkey into its new inputRef indices. 
   
   since this is a hotfix PR I am not including this in it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035530196


##########
pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java:
##########
@@ -79,13 +79,39 @@ public void testQueryWithException(String query, String exceptionSnippet) {
   }
 
   @Test
-  public void testQueryGroupByAfterJoinShouldNotDoDataShuffle()
+  public void testQueryGroupByAfterJoinShouldProperlyRewriteShuffle()
       throws Exception {
     String query = "SELECT a.col1, a.col2, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 "
         + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col1, a.col2";
     QueryPlan queryPlan = _queryEnvironment.planQuery(query);
-    Assert.assertEquals(queryPlan.getQueryStageMap().size(), 5);
-    Assert.assertEquals(queryPlan.getStageMetadataMap().size(), 5);
+    assertGroupBySingletonAfterJoin(queryPlan, true);
+
+    // same query with selection list re-odering should also work
+    query = "SELECT a.col2, a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 "
+        + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col2, a.col1";
+    queryPlan = _queryEnvironment.planQuery(query);
+    assertGroupBySingletonAfterJoin(queryPlan, true);
+
+    // exact same group key should also work
+    query = "SELECT a.col1, a.col2, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 AND a.col2 = b.col2"
+        + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col1, a.col2";
+    queryPlan = _queryEnvironment.planQuery(query);
+    assertGroupBySingletonAfterJoin(queryPlan, true);
+
+    // shrinking group key should not rewrite into singleton
+    query = "SELECT a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 AND a.col2 = b.col2"
+        + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col1";
+    queryPlan = _queryEnvironment.planQuery(query);
+    assertGroupBySingletonAfterJoin(queryPlan, false);
+
+    // mismatched group key should not rewrite into singleton
+    query = "SELECT a.col3, a.col1, AVG(b.col3) FROM a JOIN b ON a.col1 = b.col2 AND a.col2 = b.col2"
+        + " WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 GROUP BY a.col1, a.col3";
+    queryPlan = _queryEnvironment.planQuery(query);
+    assertGroupBySingletonAfterJoin(queryPlan, false);

Review Comment:
   I am not sure I understand the example, this exact query (without the WHERE clause is already included)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org