You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/29 23:46:18 UTC

[GitHub] [pinot] ankitsultana commented on a diff in pull request #9870: [multistage][hotfix] shuffle rewrite

ankitsultana commented on code in PR #9870:
URL: https://github.com/apache/pinot/pull/9870#discussion_r1035392306


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java:
##########
@@ -150,25 +145,14 @@ public Set<Integer> visitMailboxSend(MailboxSendNode node, Void context) {
   @Override
   public Set<Integer> visitProject(ProjectNode node, Void context) {
     Set<Integer> oldPartitionKeys = node.getInputs().get(0).visit(this, context);
-
-    // all inputs carry over if they're still in the projection result
-    Set<Integer> partitionKeys = new HashSet<>();
-    for (int i = 0; i < node.getProjects().size(); i++) {
-      RexExpression rex = node.getProjects().get(i);
-      if (rex instanceof RexExpression.InputRef) {
-        if (oldPartitionKeys.contains(((RexExpression.InputRef) rex).getIndex())) {
-          partitionKeys.add(i);
-        }
-      }
-    }
-
-    return partitionKeys;
+    return deriveNewPartitionKeysFromRexExpressions(node.getProjects(), oldPartitionKeys);
   }
 
   @Override
   public Set<Integer> visitSort(SortNode node, Void context) {
-    // sort doesn't change the partition keys
-    return node.getInputs().get(0).visit(this, context);
+    // with sort shuffling reorder could change, so we can't directly return the old partition keys.

Review Comment:
   Can you share how this happens? My understanding is that sort happens within a stage and doesn't rename/remove any columns so we should be able to carry-over the keys.



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java:
##########
@@ -189,4 +173,27 @@ private static boolean canSkipShuffle(Set<Integer> partitionKeys, KeySelector<Ob
     }
     return false;
   }
+
+  private static Set<Integer> deriveNewPartitionKeysFromRexExpressions(List<RexExpression> rexExpressionList,
+      Set<Integer> oldPartitionKeys) {
+    Map<Integer, Integer> partitionKeyMap = new HashMap<>();
+    for (int i = 0; i < rexExpressionList.size(); i++) {
+      RexExpression rex = rexExpressionList.get(i);
+      if (rex instanceof RexExpression.InputRef) {
+        // put the old-index to new-index mapping

Review Comment:
   I have arrived to something similar in this: https://github.com/apache/pinot/pull/9873/files
   
   That PR is still being tested.. we can sync up later for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org