You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/07/25 19:17:00 UTC

[GitHub] [pinot] walterddr opened a new pull request, #9100: Carry Partition Scheme

walterddr opened a new pull request, #9100:
URL: https://github.com/apache/pinot/pull/9100

   During multi-stage query engine planning. We knew exactly what data partition each server has by looking into the exchange logics. Thus we should use this to optimize data shuffling.
   
   This PR
   - adds partition key analysis
   - populate partition keys across `InputRef` across nodes
   - analyzes partition keys for partition desire to eliminate unnecessary shuffling
   
   NOT in this PR
   - skip data shuffle entirely. this can be done by later implementation of the direct transferable block util


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937204103


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java:
##########
@@ -105,30 +113,128 @@ private StageNode walkRelPlan(RelNode node, int currentStageId) {
       RelDistribution.Type exchangeType = distribution.getType();
 
       // 2. make an exchange sender and receiver node pair
-      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
-          nextStageRoot.getStageId(), exchangeType);
-      StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
-          mailboxReceiver.getStageId(), exchangeType, exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
-          ? new FieldSelectionKeySelector(distributionKeys) : null);
+      KeySelector<Object[], Object[]> keySelector = exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
+          ? new FieldSelectionKeySelector(distributionKeys) : null;

Review Comment:
   Might want to add a comment here to explain the rationale. Something like this ?
   
   Basically, we currently support two kinds of exchanges (`BROADCAST` and `HASH_DISTRIBUTE`) via hints. Only in the latter scenario as of now, there is a case for optimizing the exchange and considering skipping it. If the exchange type is `BROADCAST`, we can't avoid exchange and we won't have this situation because otherwise it becomes sort of colocated partitioned join (which I think will be added later and should have used a different exchange type through a different hint). This is why the keySelector is null in the first scenario



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937191524


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java:
##########
@@ -105,30 +113,128 @@ private StageNode walkRelPlan(RelNode node, int currentStageId) {
       RelDistribution.Type exchangeType = distribution.getType();
 
       // 2. make an exchange sender and receiver node pair
-      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
-          nextStageRoot.getStageId(), exchangeType);
-      StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
-          mailboxReceiver.getStageId(), exchangeType, exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
-          ? new FieldSelectionKeySelector(distributionKeys) : null);
+      KeySelector<Object[], Object[]> keySelector = exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
+          ? new FieldSelectionKeySelector(distributionKeys) : null;
+
+      StageNode mailboxReceiver;
+      StageNode mailboxSender;
+      if (canSkipShuffle(nextStageRoot, keySelector)) {
+        mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
+            nextStageRoot.getStageId(), RelDistribution.Type.SINGLETON, keySelector);
+        mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
+            mailboxReceiver.getStageId(), RelDistribution.Type.SINGLETON, keySelector);

Review Comment:
   (nit) suggest adding a comment to explain SINGLETON means local otherwise it may confuse someone like I thought during our discussion that SINGLETON means single RECEIVER (which is what Calcite probably implies) but we are using differently



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937215921


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java:
##########
@@ -105,30 +113,128 @@ private StageNode walkRelPlan(RelNode node, int currentStageId) {
       RelDistribution.Type exchangeType = distribution.getType();
 
       // 2. make an exchange sender and receiver node pair
-      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
-          nextStageRoot.getStageId(), exchangeType);
-      StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
-          mailboxReceiver.getStageId(), exchangeType, exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
-          ? new FieldSelectionKeySelector(distributionKeys) : null);
+      KeySelector<Object[], Object[]> keySelector = exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
+          ? new FieldSelectionKeySelector(distributionKeys) : null;

Review Comment:
   good point. i will add the comments. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937195527


##########
pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerSpecialFeatureTest.java:
##########
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.PlannerUtils;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.stage.AbstractStageNode;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class QueryPlannerSpecialFeatureTest extends QueryEnvironmentTestBase {

Review Comment:
   I don't think this is needed. At this point, my suggestion would be to keep all multi-stage query planning / compilation (including parsing, testing rules etc) together in one file and multi-stage query execution etc in another one. I believe these tests can be incorporated in `QueryCompilationTest.java` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937325529


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java:
##########
@@ -70,17 +70,17 @@ public void onMatch(RelOptRuleCall call) {
     RelNode leftExchange;
     RelNode rightExchange;
     List<RelHint> hints = join.getHints();
-    if (hints.contains(PinotRelationalHints.USE_HASH_DISTRIBUTE)) {
+    if (hints.contains(PinotRelationalHints.USE_BROADCAST_DISTRIBUTE)) {
+      leftExchange = LogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
+      rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);

Review Comment:
   I see. May be we can do that as a special optimization for BROADCAST exchange and not make it the default BROADCAST exchange behavior ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937214640


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java:
##########
@@ -70,17 +70,17 @@ public void onMatch(RelOptRuleCall call) {
     RelNode leftExchange;
     RelNode rightExchange;
     List<RelHint> hints = join.getHints();
-    if (hints.contains(PinotRelationalHints.USE_HASH_DISTRIBUTE)) {
+    if (hints.contains(PinotRelationalHints.USE_BROADCAST_DISTRIBUTE)) {
+      leftExchange = LogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
+      rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);

Review Comment:
   this is not true. for example if we were to leverage the full potential of all the server instances, we need to randomly distribute the payload to all servers on left, but broadcast the right table to the same set of servers. 
   
   limiting the left hand side to stay put meant we can't do a scale-up join operation which will take lots of memory/cpu
   
   this would be an optimization on the server instance scheduling issue we are planning to improve in the future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937325529


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java:
##########
@@ -70,17 +70,17 @@ public void onMatch(RelOptRuleCall call) {
     RelNode leftExchange;
     RelNode rightExchange;
     List<RelHint> hints = join.getHints();
-    if (hints.contains(PinotRelationalHints.USE_HASH_DISTRIBUTE)) {
+    if (hints.contains(PinotRelationalHints.USE_BROADCAST_DISTRIBUTE)) {
+      leftExchange = LogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
+      rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);

Review Comment:
   I see. May be we can do that as a special optimization for BROADCAST exchange and not make it the default BROADCAST exchange behavior ? We can also discuss this offline. Merging this cc @walterddr 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937215715


##########
pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerSpecialFeatureTest.java:
##########
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.PlannerUtils;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.stage.AbstractStageNode;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class QueryPlannerSpecialFeatureTest extends QueryEnvironmentTestBase {

Review Comment:
   sure we can merge them. the file is getting longer so I've been planning to split them up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937194466


##########
pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java:
##########
@@ -65,44 +58,6 @@ public void testQueryWithException(String query, String exceptionSnippet) {
     }
   }
 
-  @Test

Review Comment:
   Why delete ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937194170


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java:
##########
@@ -70,17 +70,17 @@ public void onMatch(RelOptRuleCall call) {
     RelNode leftExchange;
     RelNode rightExchange;
     List<RelHint> hints = join.getHints();
-    if (hints.contains(PinotRelationalHints.USE_HASH_DISTRIBUTE)) {
+    if (hints.contains(PinotRelationalHints.USE_BROADCAST_DISTRIBUTE)) {
+      leftExchange = LogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
+      rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);

Review Comment:
   I don't follow this. If hint indicates to use BROADCAST_EXCHANGE, then why are we setting up exchanges for both sides of the JOIN ? In broadcast, only one side (either left or right) should be exchanged / moved, whereas other side remains as opposed to full shuffle based JOIN where data movement involves both sides.



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java:
##########
@@ -70,17 +70,17 @@ public void onMatch(RelOptRuleCall call) {
     RelNode leftExchange;
     RelNode rightExchange;
     List<RelHint> hints = join.getHints();
-    if (hints.contains(PinotRelationalHints.USE_HASH_DISTRIBUTE)) {
+    if (hints.contains(PinotRelationalHints.USE_BROADCAST_DISTRIBUTE)) {
+      leftExchange = LogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
+      rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);

Review Comment:
   I don't follow this. If hint indicates to use `BROADCAST_EXCHANGE`, then why are we setting up exchanges for both sides of the JOIN ? In broadcast, only one side (either left or right) should be exchanged / moved, whereas other side remains as opposed to full shuffle based JOIN where data movement involves both sides.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia merged pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
siddharthteotia merged PR #9100:
URL: https://github.com/apache/pinot/pull/9100


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #9100:
URL: https://github.com/apache/pinot/pull/9100#issuecomment-1195879142

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/9100?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#9100](https://codecov.io/gh/apache/pinot/pull/9100?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8ed4871) into [master](https://codecov.io/gh/apache/pinot/commit/b03b5e434f8eda78cbca3007d285453f1d48e1c4?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b03b5e4) will **decrease** coverage by `54.64%`.
   > The diff coverage is `87.06%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #9100       +/-   ##
   =============================================
   - Coverage     70.02%   15.37%   -54.65%     
   + Complexity     4981      170     -4811     
   =============================================
     Files          1838     1790       -48     
     Lines         97262    95267     -1995     
     Branches      14652    14450      -202     
   =============================================
   - Hits          68108    14652    -53456     
   - Misses        24430    79563    +55133     
   + Partials       4724     1052     -3672     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `15.37% <87.06%> (+0.07%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/9100?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...che/pinot/query/planner/stage/MailboxSendNode.java](https://codecov.io/gh/apache/pinot/pull/9100/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9zdGFnZS9NYWlsYm94U2VuZE5vZGUuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...t/query/rules/PinotJoinExchangeNodeInsertRule.java](https://codecov.io/gh/apache/pinot/pull/9100/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVsZXMvUGlub3RKb2luRXhjaGFuZ2VOb2RlSW5zZXJ0UnVsZS5qYXZh) | `72.72% <0.00%> (+16.84%)` | :arrow_up: |
   | [...rg/apache/pinot/query/service/QueryDispatcher.java](https://codecov.io/gh/apache/pinot/pull/9100/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvc2VydmljZS9RdWVyeURpc3BhdGNoZXIuamF2YQ==) | `73.77% <ø> (ø)` | |
   | [...query/runtime/operator/MailboxReceiveOperator.java](https://codecov.io/gh/apache/pinot/pull/9100/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9NYWlsYm94UmVjZWl2ZU9wZXJhdG9yLmphdmE=) | `76.78% <84.61%> (+1.23%)` | :arrow_up: |
   | [...ot/query/runtime/operator/MailboxSendOperator.java](https://codecov.io/gh/apache/pinot/pull/9100/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9NYWlsYm94U2VuZE9wZXJhdG9yLmphdmE=) | `85.71% <84.61%> (-5.43%)` | :arrow_down: |
   | [...ache/pinot/query/planner/logical/StagePlanner.java](https://codecov.io/gh/apache/pinot/pull/9100/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9sb2dpY2FsL1N0YWdlUGxhbm5lci5qYXZh) | `92.98% <89.33%> (-7.02%)` | :arrow_down: |
   | [...e/pinot/query/planner/stage/AbstractStageNode.java](https://codecov.io/gh/apache/pinot/pull/9100/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9zdGFnZS9BYnN0cmFjdFN0YWdlTm9kZS5qYXZh) | `100.00% <100.00%> (ø)` | |
   | [...pache/pinot/query/planner/stage/AggregateNode.java](https://codecov.io/gh/apache/pinot/pull/9100/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9zdGFnZS9BZ2dyZWdhdGVOb2RlLmphdmE=) | `100.00% <100.00%> (ø)` | |
   | [.../pinot/query/planner/stage/MailboxReceiveNode.java](https://codecov.io/gh/apache/pinot/pull/9100/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9zdGFnZS9NYWlsYm94UmVjZWl2ZU5vZGUuamF2YQ==) | `100.00% <100.00%> (+12.50%)` | :arrow_up: |
   | [...ry/rules/PinotAggregateExchangeNodeInsertRule.java](https://codecov.io/gh/apache/pinot/pull/9100/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVsZXMvUGlub3RBZ2dyZWdhdGVFeGNoYW5nZU5vZGVJbnNlcnRSdWxlLmphdmE=) | `91.07% <100.00%> (ø)` | |
   | ... and [1407 more](https://codecov.io/gh/apache/pinot/pull/9100/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   Help us with your feedback. Take ten seconds to tell us [how you rate us](https://about.codecov.io/nps?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937204103


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java:
##########
@@ -105,30 +113,128 @@ private StageNode walkRelPlan(RelNode node, int currentStageId) {
       RelDistribution.Type exchangeType = distribution.getType();
 
       // 2. make an exchange sender and receiver node pair
-      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
-          nextStageRoot.getStageId(), exchangeType);
-      StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
-          mailboxReceiver.getStageId(), exchangeType, exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
-          ? new FieldSelectionKeySelector(distributionKeys) : null);
+      KeySelector<Object[], Object[]> keySelector = exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
+          ? new FieldSelectionKeySelector(distributionKeys) : null;

Review Comment:
   Might want to add a comment here to explain the rationale. Something like this ?
   
   Basically, we currently support two kinds of exchanges (BROADCAST and HASH_DISTRIBUTE) via hints. Only in the latter scenario as of now, there is a case for optimizing the exchange and considering skipping it. If the exchange type is BROADCAST, we can't avoid exchange and we won't have this situation because otherwise it becomes sort of colocated partitioned join (which I think will be added later and should have used a different exchange type through a different hint). This is why the keySelector is null in the first scenario



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937191524


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java:
##########
@@ -105,30 +113,128 @@ private StageNode walkRelPlan(RelNode node, int currentStageId) {
       RelDistribution.Type exchangeType = distribution.getType();
 
       // 2. make an exchange sender and receiver node pair
-      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
-          nextStageRoot.getStageId(), exchangeType);
-      StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
-          mailboxReceiver.getStageId(), exchangeType, exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
-          ? new FieldSelectionKeySelector(distributionKeys) : null);
+      KeySelector<Object[], Object[]> keySelector = exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
+          ? new FieldSelectionKeySelector(distributionKeys) : null;
+
+      StageNode mailboxReceiver;
+      StageNode mailboxSender;
+      if (canSkipShuffle(nextStageRoot, keySelector)) {
+        mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
+            nextStageRoot.getStageId(), RelDistribution.Type.SINGLETON, keySelector);
+        mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
+            mailboxReceiver.getStageId(), RelDistribution.Type.SINGLETON, keySelector);

Review Comment:
   (nit) suggest adding a comment to explain `SINGLETON` means local otherwise it may confuse someone like I thought during our discussion that `SINGLETON` means single receiver (which is what Calcite probably implies) but we are using differently



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937214640


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java:
##########
@@ -70,17 +70,17 @@ public void onMatch(RelOptRuleCall call) {
     RelNode leftExchange;
     RelNode rightExchange;
     List<RelHint> hints = join.getHints();
-    if (hints.contains(PinotRelationalHints.USE_HASH_DISTRIBUTE)) {
+    if (hints.contains(PinotRelationalHints.USE_BROADCAST_DISTRIBUTE)) {
+      leftExchange = LogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
+      rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);

Review Comment:
   this is not true. for example if we were to leverage the full potential of the server instances, we need to randomly distribute the payload to all servers on left, but broadcast the right table to the same set of servers. 
   
   limiting the left hand side to stay put meant we can't do a scale-up join operation which will take lots of memory/cpu



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937204103


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java:
##########
@@ -105,30 +113,128 @@ private StageNode walkRelPlan(RelNode node, int currentStageId) {
       RelDistribution.Type exchangeType = distribution.getType();
 
       // 2. make an exchange sender and receiver node pair
-      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
-          nextStageRoot.getStageId(), exchangeType);
-      StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
-          mailboxReceiver.getStageId(), exchangeType, exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
-          ? new FieldSelectionKeySelector(distributionKeys) : null);
+      KeySelector<Object[], Object[]> keySelector = exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
+          ? new FieldSelectionKeySelector(distributionKeys) : null;

Review Comment:
   Might want to add a comment here to explain the rationale. Something like this ?
   
   Basically, we currently support two kinds of exchanges (BROADCAST and HASH_DISTRIBUTE) via hints. Only in the latter scenario as of now, there is a case for optimizing the exchange and considering skipping it. If the exchange type is BROADCAST, we won't have this situation because otherwise it becomes sort of colocated partitioned join (which I think will be added later and should have used a different exchange type through a different hint). This is why the keySelector is null in the first scenario



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937215266


##########
pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java:
##########
@@ -65,44 +58,6 @@ public void testQueryWithException(String query, String exceptionSnippet) {
     }
   }
 
-  @Test

Review Comment:
   these test are moved to QueryPlannerSpecialfeatureTest so that we have special assert for these tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937204103


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java:
##########
@@ -105,30 +113,128 @@ private StageNode walkRelPlan(RelNode node, int currentStageId) {
       RelDistribution.Type exchangeType = distribution.getType();
 
       // 2. make an exchange sender and receiver node pair
-      StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
-          nextStageRoot.getStageId(), exchangeType);
-      StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
-          mailboxReceiver.getStageId(), exchangeType, exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
-          ? new FieldSelectionKeySelector(distributionKeys) : null);
+      KeySelector<Object[], Object[]> keySelector = exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
+          ? new FieldSelectionKeySelector(distributionKeys) : null;

Review Comment:
   Might want to add a comment here to explain the rationale. Something like this ?
   
   Basically, we currently support two kinds of exchanges (BROADCAST and HASH_DISTRIBUTE) via hints. Only in the latter scenario as of now, there is a case for optimizing the exchange and considering skipping it. If the exchange type is BROADCAST, we won't have this situation because otherwise it becomes sort of colocated partitioned join (which I think will be added later and should have used a different exchange type through a different hint) .....



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] siddharthteotia commented on a diff in pull request #9100: [multistage] carry partition scheme for optimization

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on code in PR #9100:
URL: https://github.com/apache/pinot/pull/9100#discussion_r937325529


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/rules/PinotJoinExchangeNodeInsertRule.java:
##########
@@ -70,17 +70,17 @@ public void onMatch(RelOptRuleCall call) {
     RelNode leftExchange;
     RelNode rightExchange;
     List<RelHint> hints = join.getHints();
-    if (hints.contains(PinotRelationalHints.USE_HASH_DISTRIBUTE)) {
+    if (hints.contains(PinotRelationalHints.USE_BROADCAST_DISTRIBUTE)) {
+      leftExchange = LogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
+      rightExchange = LogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);

Review Comment:
   I see. May be we can do that as a special optimization for BROADCAST exchange and not make it the default BROADCAST exchange behavior ? We can also discuss this offline. Merging this @walterddr 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org