You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/29 22:54:01 UTC

[GitHub] [pinot] ankitsultana opened a new pull request, #9873: [Draft] [multistage] PoC for Shuffle Skip Algorithm

ankitsultana opened a new pull request, #9873:
URL: https://github.com/apache/pinot/pull/9873

   supersedes: https://github.com/apache/pinot/pull/9838


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9873: [multistage] Add Greedy Shuffle Skip Optimizer for Colocated Joins

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1068408637


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java:
##########
@@ -37,4 +37,9 @@
   OUT getKey(IN input);
 
   int computeHash(IN input);
+
+  /**
+   * @return the hash-algorithm used for distributing rows
+   */
+  String hashAlgorithm();

Review Comment:
   sounds good. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #9873: [multistage] Add Pluggable Physical Optimizers + Greedy Shuffle Skip Optimizer

Posted by GitBox <gi...@apache.org>.
61yao commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1039054341


##########
pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java:
##########
@@ -94,6 +94,11 @@ public Map<String, Object> getConfigOverrides() {
     return overrides;
   }
 
+  @Override
+  protected int getNumQuickstartRunnerServers() {
+    return 4;

Review Comment:
   FYI. this doesn't change the e2e test servers. just in case you were changing this for the e2e test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9873: [multistage] Add Greedy Shuffle Skip Optimizer for Colocated Joins

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1067530814


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java:
##########
@@ -102,6 +111,15 @@ private StageNode walkRelPlan(RelNode node, int currentStageId) {
     }
   }
 
+  // TODO: Switch to Worker SPI to avoid multiple-places where workers are assigned.
+  private void runPhysicalOptimizers(QueryPlan queryPlan) {
+    StageNode globalStageRoot = queryPlan.getQueryStageMap().get(0);
+    if (_plannerContext.getOptions().getOrDefault("useColocatedJoin", "false").equals("true")) {
+      GreedyShuffleRewriteVisitor.optimizeShuffles(queryPlan, _tableCache);
+    } else {
+      ShuffleRewriteVisitor.optimizeShuffles(globalStageRoot);

Review Comment:
   default shuffleRewriteVisitor is no longer used. remove this branch



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java:
##########
@@ -79,11 +80,14 @@ public class QueryEnvironment {
   // Pinot extensions
   private final Collection<RelOptRule> _logicalRuleSet;
   private final WorkerManager _workerManager;
+  private final TableCache _tableCache;

Review Comment:
   passing in _tableCache seems to be a bit weird to me. but i don't have a better abstraction here. 
   can we add a TODO and indicate exactly what this tableCache is intended to be used and ensure that this is not being abused. 
   
   b/c all info that planner needs are already encapsulated inside the CalciteSchema. 



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java:
##########
@@ -37,4 +37,9 @@
   OUT getKey(IN input);
 
   int computeHash(IN input);
+
+  /**
+   * @return the hash-algorithm used for distributing rows
+   */
+  String hashAlgorithm();

Review Comment:
   are we using this at all in this moment? if not i would suggest add this API in the future



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/DefaultPostOrderTraversalVisitor.java:
##########
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.stage;
+
+public abstract class DefaultPostOrderTraversalVisitor<T, C> implements StageNodeVisitor<T, C> {

Review Comment:
   missing javadoc



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java:
##########
@@ -104,6 +112,16 @@ private StageNode walkRelPlan(RelNode node, int currentStageId) {
     }
   }
 
+  // Could run CBO here later
+  private void runPhysicalOptimizers(QueryPlan queryPlan) {

Review Comment:
   for now it is fine. let's consider the SPI for worker assignment and we can refactor later



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java:
##########
@@ -79,11 +80,14 @@ public class QueryEnvironment {
   // Pinot extensions
   private final Collection<RelOptRule> _logicalRuleSet;
   private final WorkerManager _workerManager;
+  private final TableCache _tableCache;

Review Comment:
   actually IMO this info should be encapsulated in `org.apache.pinot.query.catalog.PinotTable` but we don't have to fix this in this PR



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/PartitionKey.java:
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.colocated;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.calcite.rex.RexInputRef;
+
+
+/**
+ * PartitionKey describes how data is distributed in a given stage. It consists of a list of columns which are stored
+ * as a list of {@link RexInputRef#getIndex()}, the number of partitions and the hash-algorithm used. A given stage may
+ * have more than 1 PartitionKey, in which case one may use a {@link java.util.Set<PartitionKey>} to represent this
+ * behavior.
+ *
+ * <p>
+ *  In other words, when a StageNode has the schema: (user_uuid, col1, col2, ...), and the PartitionKey is
+ *  ([0], 8, murmur), then that means that the data for the StageNode is partitioned using the user_uuid column, into
+ *  8 partitions where the partitionId is computed using murmur(user_uuid) % 8.
+ *
+ *  For a join stage the data is partitioned by the senders using their respective join-keys. In that case, we may
+ *  have more than 1 PartitionKey applicable for the JoinNode, and it can be represented by a set as:
+ *  {([0], 8, murmur), ([leftSchemaSize + 0], 8, murmur)}, assuming both senders partition using Murmur into 8
+ *  partitions. Note that a set of PartitionKey means that the partition keys are independent and they don't have any
+ *  ordering, i.e. the data is partitioned by both the join-key of the left child and the join-key of the right child.
+ * </p>
+ */
+public class PartitionKey {

Review Comment:
   make this class package private. and suggest renaming it to ColocationKey



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #9873: [Draft] [multistage] PoC for Shuffle Skip Algorithm

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #9873:
URL: https://github.com/apache/pinot/pull/9873#issuecomment-1331455320

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/9873?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#9873](https://codecov.io/gh/apache/pinot/pull/9873?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4ff0307) into [master](https://codecov.io/gh/apache/pinot/commit/d50f9ee8b04f615abc57eab4c08a53ad4902f72a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d50f9ee) will **decrease** coverage by `6.35%`.
   > The diff coverage is `3.73%`.
   
   > :exclamation: Current head 4ff0307 differs from pull request most recent head b6b8009. Consider uploading reports for the commit b6b8009 to get more accurate results
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #9873      +/-   ##
   ============================================
   - Coverage     70.40%   64.04%   -6.36%     
   + Complexity     5012     4986      -26     
   ============================================
     Files          1973     1923      -50     
     Lines        105815   103627    -2188     
     Branches      16032    15783     -249     
   ============================================
   - Hits          74503    66372    -8131     
   - Misses        26131    32472    +6341     
   + Partials       5181     4783     -398     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `67.61% <3.73%> (-0.24%)` | :arrow_down: |
   | unittests2 | `15.77% <3.73%> (-0.03%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/9873?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...requesthandler/MultiStageBrokerRequestHandler.java](https://codecov.io/gh/apache/pinot/pull/9873/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcmVxdWVzdGhhbmRsZXIvTXVsdGlTdGFnZUJyb2tlclJlcXVlc3RIYW5kbGVyLmphdmE=) | `0.00% <ø> (-59.16%)` | :arrow_down: |
   | [...planner/logical/PhysicalStageTraversalVisitor.java](https://codecov.io/gh/apache/pinot/pull/9873/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9sb2dpY2FsL1BoeXNpY2FsU3RhZ2VUcmF2ZXJzYWxWaXNpdG9yLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...lanner/partitioning/FieldSelectionKeySelector.java](https://codecov.io/gh/apache/pinot/pull/9873/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9wYXJ0aXRpb25pbmcvRmllbGRTZWxlY3Rpb25LZXlTZWxlY3Rvci5qYXZh) | `64.00% <0.00%> (-2.67%)` | :arrow_down: |
   | [.../query/planner/physical/ShuffleRewriteVisitor.java](https://codecov.io/gh/apache/pinot/pull/9873/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9waHlzaWNhbC9TaHVmZmxlUmV3cml0ZVZpc2l0b3IuamF2YQ==) | `94.44% <ø> (ø)` | |
   | [...y/planner/physical/SmartShuffleRewriteVisitor.java](https://codecov.io/gh/apache/pinot/pull/9873/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9waHlzaWNhbC9TbWFydFNodWZmbGVSZXdyaXRlVmlzaXRvci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...lanner/stage/DefaultPostOrderTraversalVisitor.java](https://codecov.io/gh/apache/pinot/pull/9873/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9zdGFnZS9EZWZhdWx0UG9zdE9yZGVyVHJhdmVyc2FsVmlzaXRvci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ache/pinot/query/planner/logical/StagePlanner.java](https://codecov.io/gh/apache/pinot/pull/9873/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9sb2dpY2FsL1N0YWdlUGxhbm5lci5qYXZh) | `95.55% <75.00%> (-4.45%)` | :arrow_down: |
   | [.../java/org/apache/pinot/query/QueryEnvironment.java](https://codecov.io/gh/apache/pinot/pull/9873/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvUXVlcnlFbnZpcm9ubWVudC5qYXZh) | `85.07% <100.00%> (+0.22%)` | :arrow_up: |
   | [...va/org/apache/pinot/common/config/NettyConfig.java](https://codecov.io/gh/apache/pinot/pull/9873/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL05ldHR5Q29uZmlnLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...a/org/apache/pinot/common/metrics/MinionMeter.java](https://codecov.io/gh/apache/pinot/pull/9873/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9NaW5pb25NZXRlci5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [439 more](https://codecov.io/gh/apache/pinot/pull/9873/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] yupeng9 commented on a diff in pull request #9873: [multistage] Add Greedy Shuffle Skip Optimizer for Colocated Joins

Posted by GitBox <gi...@apache.org>.
yupeng9 commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1069928782


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java:
##########
@@ -105,5 +106,10 @@ public Object[] getKey(Object[] input) {
     public int computeHash(Object[] input) {
       return _hashes.next();
     }
+
+    @Override
+    public String hashAlgorithm() {
+      return HASH_ALGORITHM;

Review Comment:
   better represent this as enum



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java:
##########
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.colocated;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This shuffle optimizer can avoid shuffles by taking into account all of the following:
+ *
+ * 1. Servers assigned to the stages. The optimizer may also choose to change the server assignment if skipping
+ *    shuffles is possible.
+ * 2. The hash-algorithm and the number of partitions of the data in sender/receiver nodes. So for instance if we do a
+ *    join on two tables where the left table is partitioned using Murmur but the right table is partitioned using
+ *    hashCode, then this optimizer can detect this case and keep the shuffle.
+ *
+ * Also see: {@link ColocationKey} for its definition.
+ */
+public class GreedyShuffleRewriteVisitor
+    implements StageNodeVisitor<Set<ColocationKey>, GreedyShuffleRewriteContext> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GreedyShuffleRewriteVisitor.class);
+
+  private final TableCache _tableCache;
+  private final Map<Integer, StageMetadata> _stageMetadataMap;
+  private boolean _canSkipShuffleForJoin;
+
+  public static void optimizeShuffles(QueryPlan queryPlan, TableCache tableCache) {
+    StageNode rootStageNode = queryPlan.getQueryStageMap().get(0);
+    Map<Integer, StageMetadata> stageMetadataMap = queryPlan.getStageMetadataMap();
+    GreedyShuffleRewriteContext context = GreedyShuffleRewritePreComputeVisitor.preComputeContext(rootStageNode);
+    // This assumes that if stageId(S1) > stageId(S2), then S1 is not an ancestor of S2.
+    // TODO: If this assumption is wrong, we can compute the reverse topological ordering explicitly.
+    for (int stageId = stageMetadataMap.size() - 1; stageId >= 0; stageId--) {
+      StageNode stageNode = context.getRootStageNode(stageId);
+      stageNode.visit(new GreedyShuffleRewriteVisitor(tableCache, stageMetadataMap), context);
+    }
+  }
+
+  private GreedyShuffleRewriteVisitor(TableCache tableCache, Map<Integer, StageMetadata> stageMetadataMap) {
+    _tableCache = tableCache;
+    _stageMetadataMap = stageMetadataMap;
+    _canSkipShuffleForJoin = false;
+  }
+
+  @Override
+  public Set<ColocationKey> visitAggregate(AggregateNode node, GreedyShuffleRewriteContext context) {
+    Set<ColocationKey> oldColocationKeys = node.getInputs().get(0).visit(this, context);
+
+    Map<Integer, Integer> oldToNewIndex = new HashMap<>();
+    for (int i = 0; i < node.getGroupSet().size(); i++) {
+      RexExpression rex = node.getGroupSet().get(i);
+      if (rex instanceof RexExpression.InputRef) {
+        int index = ((RexExpression.InputRef) rex).getIndex();
+        oldToNewIndex.put(index, i);
+      }
+    }
+
+    return computeNewColocationKeys(oldColocationKeys, oldToNewIndex);
+  }
+
+  @Override
+  public Set<ColocationKey> visitFilter(FilterNode node, GreedyShuffleRewriteContext context) {
+    // filters don't change partition keys
+    return node.getInputs().get(0).visit(this, context);
+  }
+
+  @Override
+  public Set<ColocationKey> visitJoin(JoinNode node, GreedyShuffleRewriteContext context) {
+    List<MailboxReceiveNode> innerLeafNodes = context.getLeafNodes(node.getStageId()).stream()
+        .map(x -> (MailboxReceiveNode) x).collect(Collectors.toList());
+    Preconditions.checkState(innerLeafNodes.size() == 2);
+
+    // Multiple checks need to be made to ensure that shuffle can be skipped for a join.
+    // Step-1: Join can be skipped only for equality joins.
+    boolean canColocate = canJoinBeColocated(node);
+    // Step-2: Only if the servers assigned to both left and right nodes are equal and the servers assigned to the join
+    //         stage are a superset of those servers, can we skip shuffles.
+    canColocate = canColocate && canServerAssignmentAllowShuffleSkip(node.getStageId(),
+        innerLeafNodes.get(0).getSenderStageId(), innerLeafNodes.get(1).getSenderStageId());
+    // Step-3: For both left/right MailboxReceiveNode/MailboxSendNode pairs, check whether the key partitioning can
+    //         allow shuffle skip.
+    canColocate = canColocate && partitionKeyConditionForJoin(innerLeafNodes.get(0),
+        (MailboxSendNode) innerLeafNodes.get(0).getSender(), context);
+    canColocate = canColocate && partitionKeyConditionForJoin(innerLeafNodes.get(1),
+        (MailboxSendNode) innerLeafNodes.get(1).getSender(), context);
+    // Step-4: Finally, ensure that the number of partitions and the hash algorithm is same for partition keys of both
+    //         children.
+    canColocate = canColocate && checkPartitionScheme(innerLeafNodes.get(0), innerLeafNodes.get(1), context);
+    if (canColocate) {
+      // If shuffle can be skipped, reassign servers.
+      _stageMetadataMap.get(node.getStageId()).setServerInstances(
+          _stageMetadataMap.get(innerLeafNodes.get(0).getSenderStageId()).getServerInstances());
+      _canSkipShuffleForJoin = true;
+    }
+
+    Set<ColocationKey> leftPKs = node.getInputs().get(0).visit(this, context);
+    Set<ColocationKey> rightPks = node.getInputs().get(1).visit(this, context);
+
+    int leftDataSchemaSize = node.getInputs().get(0).getDataSchema().size();
+    Set<ColocationKey> colocationKeys = new HashSet<>(leftPKs);
+
+    for (ColocationKey rightColocationKey : rightPks) {
+      ColocationKey newColocationKey = new ColocationKey(rightColocationKey.getNumPartitions(),
+          rightColocationKey.getHashAlgorithm());
+      for (Integer index : rightColocationKey.getIndices()) {
+        newColocationKey.addIndex(leftDataSchemaSize + index);
+      }
+      colocationKeys.add(newColocationKey);
+    }
+
+    return colocationKeys;
+  }
+
+  @Override
+  public Set<ColocationKey> visitMailboxReceive(MailboxReceiveNode node, GreedyShuffleRewriteContext context) {
+    KeySelector<Object[], Object[]> selector = node.getPartitionKeySelector();
+    Set<ColocationKey> oldColocationKeys = context.getColocationKeys(node.getSenderStageId());
+    // If the current stage is not a join-stage, then we already know sender's distribution
+    if (!context.isJoinStage(node.getStageId())) {
+      if (selector == null) {
+        return new HashSet<>();
+      } else if (colocationKeyCondition(oldColocationKeys, selector)
+          && areServersSuperset(node.getStageId(), node.getSenderStageId())) {
+        node.setExchangeType(RelDistribution.Type.SINGLETON);
+        _stageMetadataMap.get(node.getStageId()).setServerInstances(
+            _stageMetadataMap.get(node.getSenderStageId()).getServerInstances());
+        return oldColocationKeys;
+      }
+      // This means we can't skip shuffle and there's a partitioning enforced by receiver.
+      int numPartitions = _stageMetadataMap.get(node.getStageId()).getServerInstances().size();
+      List<ColocationKey> colocationKeys =
+          ((FieldSelectionKeySelector) selector).getColumnIndices().stream()
+              .map(x -> new ColocationKey(x, numPartitions, selector.hashAlgorithm())).collect(Collectors.toList());
+      return new HashSet<>(colocationKeys);
+    }
+    // If the current stage is a join-stage then we already know whether shuffle can be skipped.
+    if (_canSkipShuffleForJoin) {
+      node.setExchangeType(RelDistribution.Type.SINGLETON);
+      // For the join-case, servers are already re-assigned in visitJoin. Moreover, we haven't yet changed sender's
+      // distribution.
+      ((MailboxSendNode) node.getSender()).setExchangeType(RelDistribution.Type.SINGLETON);
+      return oldColocationKeys;
+    } else if (selector == null) {
+      return new HashSet<>();
+    }
+    // This means we can't skip shuffle and there's a partitioning enforced by receiver.
+    int numPartitions = _stageMetadataMap.get(node.getStageId()).getServerInstances().size();
+    List<ColocationKey> colocationKeys =
+        ((FieldSelectionKeySelector) selector).getColumnIndices().stream()
+            .map(x -> new ColocationKey(x, numPartitions, selector.hashAlgorithm())).collect(Collectors.toList());
+    return new HashSet<>(colocationKeys);
+  }
+
+  @Override
+  public Set<ColocationKey> visitMailboxSend(MailboxSendNode node, GreedyShuffleRewriteContext context) {
+    Set<ColocationKey> oldColocationKeys = node.getInputs().get(0).visit(this, context);
+    KeySelector<Object[], Object[]> selector = node.getPartitionKeySelector();
+
+    boolean canSkipShuffleBasic = colocationKeyCondition(oldColocationKeys, selector);
+    // If receiver is not a join-stage, then we can determine distribution type now.
+    if (!context.isJoinStage(node.getReceiverStageId())) {
+      Set<ColocationKey> colocationKeys;
+      if (canSkipShuffleBasic && areServersSuperset(node.getReceiverStageId(), node.getStageId())) {
+        // Servers are not re-assigned on sender-side. If needed, they are re-assigned on the receiver side.
+        node.setExchangeType(RelDistribution.Type.SINGLETON);
+        colocationKeys = oldColocationKeys;
+      } else {
+        colocationKeys = new HashSet<>();
+      }
+      context.setColocationKeys(node.getStageId(), colocationKeys);
+      return colocationKeys;
+    }
+    // If receiver is a join-stage, remember partition-keys of the child node of MailboxSendNode.
+    Set<ColocationKey> mailboxSendColocationKeys = canSkipShuffleBasic ? oldColocationKeys : new HashSet<>();
+    context.setColocationKeys(node.getStageId(), mailboxSendColocationKeys);
+    return mailboxSendColocationKeys;
+  }
+
+  @Override
+  public Set<ColocationKey> visitProject(ProjectNode node, GreedyShuffleRewriteContext context) {
+    // Project reorders or removes keys
+    Set<ColocationKey> oldColocationKeys = node.getInputs().get(0).visit(this, context);
+
+    Map<Integer, Integer> oldToNewIndex = new HashMap<>();
+    for (int i = 0; i < node.getProjects().size(); i++) {
+      RexExpression rex = node.getProjects().get(i);
+      if (rex instanceof RexExpression.InputRef) {
+        int index = ((RexExpression.InputRef) rex).getIndex();
+        oldToNewIndex.put(index, i);
+      }
+    }
+
+    return computeNewColocationKeys(oldColocationKeys, oldToNewIndex);
+  }
+
+  @Override
+  public Set<ColocationKey> visitSort(SortNode node, GreedyShuffleRewriteContext context) {
+    return node.getInputs().get(0).visit(this, context);
+  }
+
+  @Override
+  public Set<ColocationKey> visitTableScan(TableScanNode node, GreedyShuffleRewriteContext context) {
+    TableConfig tableConfig =
+        _tableCache.getTableConfig(node.getTableName());
+    if (tableConfig == null) {
+      LOGGER.warn("Couldn't find tableConfig for {}", node.getTableName());
+      return new HashSet<>();
+    }
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    if (indexingConfig != null && indexingConfig.getSegmentPartitionConfig() != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap =
+          indexingConfig.getSegmentPartitionConfig().getColumnPartitionMap();
+      if (columnPartitionMap != null) {
+        Set<String> partitionColumns = columnPartitionMap.keySet();
+        Set<ColocationKey> newColocationKeys = new HashSet<>();
+        for (int i = 0; i < node.getTableScanColumns().size(); i++) {
+          String columnName = node.getTableScanColumns().get(i);
+          if (partitionColumns.contains(node.getTableScanColumns().get(i))) {
+            int numPartitions = columnPartitionMap.get(columnName).getNumPartitions();
+            String hashAlgorithm = columnPartitionMap.get(columnName).getFunctionName();
+            newColocationKeys.add(new ColocationKey(i, numPartitions, hashAlgorithm));
+          }
+        }
+        return newColocationKeys;
+      }
+    }
+    return new HashSet<>();
+  }
+
+  @Override
+  public Set<ColocationKey> visitValue(ValueNode node, GreedyShuffleRewriteContext context) {
+    return new HashSet<>();
+  }
+
+  // TODO: Only equality joins can be colocated. We don't have join clause info available right now.

Review Comment:
   how do we plan to get the info?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/ColocationKey.java:
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.colocated;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.calcite.rex.RexInputRef;
+
+
+/**
+ * ColocationKey describes how data is distributed in a given stage. It consists of a list of columns which are stored
+ * as a list of {@link RexInputRef#getIndex()}, the number of partitions and the hash-algorithm used. A given stage may
+ * have more than 1 ColocationKey, in which case one may use a {@link java.util.Set< ColocationKey >} to represent this

Review Comment:
   I assume one table or one operand of a stage has at most one colocationKey, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9873: [Draft] [multistage] PoC for Shuffle Skip Algorithm

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1035642443


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java:
##########
@@ -98,6 +98,10 @@ protected List<Object[]> queryRunner(String sql) {
   protected List<Object[]> queryH2(String sql)
       throws Exception {
     Statement h2statement = _h2Connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    // TODO: Hack. Let's discuss how best to support this.
+    if (sql.contains("option")) {
+      sql = sql.replaceAll("option\\(useColocatedJoin\\=true\\)", "");

Review Comment:
   I added this to ensure basic sanity tests are passing. Proper tests for colocated join would be quite complex and I don't know if we have a ready to use framework.
   
   I can undo this change and pick up the tests in a separate PR since they'll take time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #9873: [multistage] Add Pluggable Physical Optimizers + Greedy Shuffle Skip Optimizer

Posted by GitBox <gi...@apache.org>.
61yao commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1039055333


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java:
##########
@@ -104,6 +112,16 @@ private StageNode walkRelPlan(RelNode node, int currentStageId) {
     }
   }
 
+  // Could run CBO here later
+  private void runPhysicalOptimizers(QueryPlan queryPlan) {

Review Comment:
   Should we wrap PhysicalOptimizer in a separate class if there is a lot more extension in the future? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9873: [multistage] Add Greedy Shuffle Skip Optimizer for Colocated Joins

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1071027499


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/ColocationKey.java:
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.colocated;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.calcite.rex.RexInputRef;
+
+
+/**
+ * ColocationKey describes how data is distributed in a given stage. It consists of a list of columns which are stored
+ * as a list of {@link RexInputRef#getIndex()}, the number of partitions and the hash-algorithm used. A given stage may
+ * have more than 1 ColocationKey, in which case one may use a {@link java.util.Set< ColocationKey >} to represent this

Review Comment:
   A TableScan stage is expected to have only 1 Colocation key.
   
   Other stages may have more than 1 colocation key. Such a state may be described as: data is partitioned by both colocationKeyA and colocationKeyB. Some examples where this may happen:
   
   1. Join-stage. If there's a join on A.col1 and B.col2, then the join-stage is considered to have the colocation key {[A.col1], [B.col2]}, indicating that the data is partitioned by both A.col1 and B.col2.
   2. Ancestors of a join-stage, assuming there's no shuffling done and the colocation key sustains.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] yupeng9 merged pull request #9873: [multistage] Add Greedy Shuffle Skip Optimizer for Colocated Joins

Posted by GitBox <gi...@apache.org>.
yupeng9 merged PR #9873:
URL: https://github.com/apache/pinot/pull/9873


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9873: [multistage] Add Pluggable Physical Optimizers + Greedy Shuffle Skip Optimizer

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1035655511


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/SmartShuffleRewriteVisitor.java:
##########
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.logical.StageLayoutVisitor;
+import org.apache.pinot.query.planner.logical.StageLayoutVisitor.StageLayout;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This shuffle optimizer uses a {@link Set<PartitionKey>} to keep track of partition-keys for each inner stage node.
+ * If there are two partition keys P1 and P2 in the set for a given inner stage node, that means the data for that
+ * stage is partitioned by both P1 and P2 (independently). An example instance where this can happen is a join stage.
+ * In the join-stage, the data in that stage is partitioned by both a partition-key on the left and a partition-key
+ * on the right.
+ *
+ * If a sending stage has the keys {P1, P2} and the receiving stage needs data partitioned by P1, then a shuffle is
+ * not needed. Moreover, if P1 = [0, 1] and the keys are again {P1, P2} (value of P2 is arbitrary), if the receiving
+ * stage needs data partitioned by [0, 1, 2], then again no shuffle is needed.
+ *
+ * Also see: {@link PartitionKey} for its definition.
+ * TODO: Use a smarter and more appropriate name than "SmartShuffleRewriterVisitor".
+ */
+public class SmartShuffleRewriteVisitor
+    implements StageNodeVisitor<Set<SmartShuffleRewriteVisitor.PartitionKey>, StageLayout> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SmartShuffleRewriteVisitor.class);
+
+  private final TableCache _tableCache;
+  private final Map<Integer, StageMetadata> _stageMetadataMap;
+  private boolean _canSkipShuffleForJoin;
+
+  /**
+   * A shuffle optimizer that can avoid shuffles by taking into account all of the following:
+   * 1. Servers assigned to the stages. The optimizer may also choose to change the server assignment if skipping
+   *    shuffles is possible.
+   * 2. The hash-algorithm and physical number of partitions of the data in sender/receiver nodes
+   *    So for instance if we do a join on two tables where the left table is partitioned using Murmur but the
+   *    right table is partitioned using hashCode, then this optimizer can detect this case and keep the shuffle.
+   * 3. Equivalent partitioning introduced by doing a join. e.g. if we do a join on two tables on a user_uuid column,

Review Comment:
   This third point is no longer valid. Will remove in next revision.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9873: [multistage] Add Greedy Shuffle Skip Optimizer for Colocated Joins

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1071032075


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java:
##########
@@ -105,5 +106,10 @@ public Object[] getKey(Object[] input) {
     public int computeHash(Object[] input) {
       return _hashes.next();
     }
+
+    @Override
+    public String hashAlgorithm() {
+      return HASH_ALGORITHM;

Review Comment:
   Representing as enum would be hard since each implementaiton of `KeySelector` will have their own names to identify themselves which may be across different packages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] yupeng9 commented on a diff in pull request #9873: [multistage] Add Greedy Shuffle Skip Optimizer for Colocated Joins

Posted by GitBox <gi...@apache.org>.
yupeng9 commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1071449118


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/HashExchangeTest.java:
##########
@@ -105,5 +106,10 @@ public Object[] getKey(Object[] input) {
     public int computeHash(Object[] input) {
       return _hashes.next();
     }
+
+    @Override
+    public String hashAlgorithm() {
+      return HASH_ALGORITHM;

Review Comment:
   I see, so it'll be pluggable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9873: [multistage] Add Pluggable Physical Optimizers + Greedy Shuffle Skip Optimizer

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1047408268


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java:
##########
@@ -221,7 +225,7 @@ private RelNode optimize(RelRoot relRoot, PlannerContext plannerContext) {
 
   private QueryPlan toDispatchablePlan(RelRoot relRoot, PlannerContext plannerContext, long requestId) {
     // 5. construct a dispatchable query plan.
-    StagePlanner queryStagePlanner = new StagePlanner(plannerContext, _workerManager, requestId);
+    StagePlanner queryStagePlanner = new StagePlanner(plannerContext, _workerManager, requestId, _tableCache);
     return queryStagePlanner.makePlan(relRoot);

Review Comment:
   my understanding of this POC is that. the end goal is to, within StagePlanner, assign each stage a parallelism and a set of servers to run those parallelism. is this correct?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/SmartShuffleRewriteVisitor.java:
##########
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.logical.StageLayoutVisitor;
+import org.apache.pinot.query.planner.logical.StageLayoutVisitor.StageLayout;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This shuffle optimizer uses a {@link Set<PartitionKey>} to keep track of partition-keys for each inner stage node.
+ * If there are two partition keys P1 and P2 in the set for a given inner stage node, that means the data for that
+ * stage is partitioned by both P1 and P2 (independently). An example instance where this can happen is a join stage.
+ * In the join-stage, the data in that stage is partitioned by both a partition-key on the left and a partition-key
+ * on the right.
+ *
+ * If a sending stage has the keys {P1, P2} and the receiving stage needs data partitioned by P1, then a shuffle is
+ * not needed. Moreover, if P1 = [0, 1] and the keys are again {P1, P2} (value of P2 is arbitrary), if the receiving
+ * stage needs data partitioned by [0, 1, 2], then again no shuffle is needed.
+ *
+ * Also see: {@link PartitionKey} for its definition.
+ * TODO: Use a smarter and more appropriate name than "SmartShuffleRewriterVisitor".
+ */
+public class SmartShuffleRewriteVisitor
+    implements StageNodeVisitor<Set<SmartShuffleRewriteVisitor.PartitionKey>, StageLayout> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SmartShuffleRewriteVisitor.class);
+
+  private final TableCache _tableCache;
+  private final Map<Integer, StageMetadata> _stageMetadataMap;
+  private boolean _canSkipShuffleForJoin;

Review Comment:
   IIUC, the idea here is that,
   1. there are some additional metadata that needs to be computed to run the colocated-join smart shuffle visitor. (e.g. the StageLayout context)
   2. then the smartShuffleVisitor is run and produces something locally private to the visitor itself. 
   
   with these 2 assumptions. we can probably make WorkerAssignmentVisitor as an SPI and load them dynamically based on query option. 
   
   I am thinking of something like this:
   
   
   QueryEnvironment adds 
   ```
   _workerAssignmentStrategyFactory;
   runWorkerAssignmentStrategy(QueryPlan, WorkerManager);
   ```
   
   added class `WorkerAssignmentStrategyFactory` with
   ```
   WorkerAssignmentStrategy strategy = WorkerAssignmentStrategyFactory.createStrategy(QueryPlan, WorkerManager);
   ```
    to create different worker assignment strategy based on QueryPlan
   
   another class `WorkerAssignmentStrategy` that
   ```
   WorkerAssignmentStrategy<Context, Layout> implements StageNodeVisitor<Context, Layout>
   ```
   



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java:
##########
@@ -84,6 +89,9 @@ public QueryPlan makePlan(RelRoot relRoot) {
       _workerManager.assignWorkerToStage(e.getKey(), e.getValue(), _requestId);
     }
 
+    // Run physical optimizations
+    runPhysicalOptimizers(queryPlan);
+

Review Comment:
   can we change this to `runWorkerAssignmentStrategy(queryPlan, _workerManager) and get rid of the 4 lines above ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9873: [multistage] Add Greedy Shuffle Skip Optimizer for Colocated Joins

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1071030592


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/GreedyShuffleRewriteVisitor.java:
##########
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.colocated;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This shuffle optimizer can avoid shuffles by taking into account all of the following:
+ *
+ * 1. Servers assigned to the stages. The optimizer may also choose to change the server assignment if skipping
+ *    shuffles is possible.
+ * 2. The hash-algorithm and the number of partitions of the data in sender/receiver nodes. So for instance if we do a
+ *    join on two tables where the left table is partitioned using Murmur but the right table is partitioned using
+ *    hashCode, then this optimizer can detect this case and keep the shuffle.
+ *
+ * Also see: {@link ColocationKey} for its definition.
+ */
+public class GreedyShuffleRewriteVisitor
+    implements StageNodeVisitor<Set<ColocationKey>, GreedyShuffleRewriteContext> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GreedyShuffleRewriteVisitor.class);
+
+  private final TableCache _tableCache;
+  private final Map<Integer, StageMetadata> _stageMetadataMap;
+  private boolean _canSkipShuffleForJoin;
+
+  public static void optimizeShuffles(QueryPlan queryPlan, TableCache tableCache) {
+    StageNode rootStageNode = queryPlan.getQueryStageMap().get(0);
+    Map<Integer, StageMetadata> stageMetadataMap = queryPlan.getStageMetadataMap();
+    GreedyShuffleRewriteContext context = GreedyShuffleRewritePreComputeVisitor.preComputeContext(rootStageNode);
+    // This assumes that if stageId(S1) > stageId(S2), then S1 is not an ancestor of S2.
+    // TODO: If this assumption is wrong, we can compute the reverse topological ordering explicitly.
+    for (int stageId = stageMetadataMap.size() - 1; stageId >= 0; stageId--) {
+      StageNode stageNode = context.getRootStageNode(stageId);
+      stageNode.visit(new GreedyShuffleRewriteVisitor(tableCache, stageMetadataMap), context);
+    }
+  }
+
+  private GreedyShuffleRewriteVisitor(TableCache tableCache, Map<Integer, StageMetadata> stageMetadataMap) {
+    _tableCache = tableCache;
+    _stageMetadataMap = stageMetadataMap;
+    _canSkipShuffleForJoin = false;
+  }
+
+  @Override
+  public Set<ColocationKey> visitAggregate(AggregateNode node, GreedyShuffleRewriteContext context) {
+    Set<ColocationKey> oldColocationKeys = node.getInputs().get(0).visit(this, context);
+
+    Map<Integer, Integer> oldToNewIndex = new HashMap<>();
+    for (int i = 0; i < node.getGroupSet().size(); i++) {
+      RexExpression rex = node.getGroupSet().get(i);
+      if (rex instanceof RexExpression.InputRef) {
+        int index = ((RexExpression.InputRef) rex).getIndex();
+        oldToNewIndex.put(index, i);
+      }
+    }
+
+    return computeNewColocationKeys(oldColocationKeys, oldToNewIndex);
+  }
+
+  @Override
+  public Set<ColocationKey> visitFilter(FilterNode node, GreedyShuffleRewriteContext context) {
+    // filters don't change partition keys
+    return node.getInputs().get(0).visit(this, context);
+  }
+
+  @Override
+  public Set<ColocationKey> visitJoin(JoinNode node, GreedyShuffleRewriteContext context) {
+    List<MailboxReceiveNode> innerLeafNodes = context.getLeafNodes(node.getStageId()).stream()
+        .map(x -> (MailboxReceiveNode) x).collect(Collectors.toList());
+    Preconditions.checkState(innerLeafNodes.size() == 2);
+
+    // Multiple checks need to be made to ensure that shuffle can be skipped for a join.
+    // Step-1: Join can be skipped only for equality joins.
+    boolean canColocate = canJoinBeColocated(node);
+    // Step-2: Only if the servers assigned to both left and right nodes are equal and the servers assigned to the join
+    //         stage are a superset of those servers, can we skip shuffles.
+    canColocate = canColocate && canServerAssignmentAllowShuffleSkip(node.getStageId(),
+        innerLeafNodes.get(0).getSenderStageId(), innerLeafNodes.get(1).getSenderStageId());
+    // Step-3: For both left/right MailboxReceiveNode/MailboxSendNode pairs, check whether the key partitioning can
+    //         allow shuffle skip.
+    canColocate = canColocate && partitionKeyConditionForJoin(innerLeafNodes.get(0),
+        (MailboxSendNode) innerLeafNodes.get(0).getSender(), context);
+    canColocate = canColocate && partitionKeyConditionForJoin(innerLeafNodes.get(1),
+        (MailboxSendNode) innerLeafNodes.get(1).getSender(), context);
+    // Step-4: Finally, ensure that the number of partitions and the hash algorithm is same for partition keys of both
+    //         children.
+    canColocate = canColocate && checkPartitionScheme(innerLeafNodes.get(0), innerLeafNodes.get(1), context);
+    if (canColocate) {
+      // If shuffle can be skipped, reassign servers.
+      _stageMetadataMap.get(node.getStageId()).setServerInstances(
+          _stageMetadataMap.get(innerLeafNodes.get(0).getSenderStageId()).getServerInstances());
+      _canSkipShuffleForJoin = true;
+    }
+
+    Set<ColocationKey> leftPKs = node.getInputs().get(0).visit(this, context);
+    Set<ColocationKey> rightPks = node.getInputs().get(1).visit(this, context);
+
+    int leftDataSchemaSize = node.getInputs().get(0).getDataSchema().size();
+    Set<ColocationKey> colocationKeys = new HashSet<>(leftPKs);
+
+    for (ColocationKey rightColocationKey : rightPks) {
+      ColocationKey newColocationKey = new ColocationKey(rightColocationKey.getNumPartitions(),
+          rightColocationKey.getHashAlgorithm());
+      for (Integer index : rightColocationKey.getIndices()) {
+        newColocationKey.addIndex(leftDataSchemaSize + index);
+      }
+      colocationKeys.add(newColocationKey);
+    }
+
+    return colocationKeys;
+  }
+
+  @Override
+  public Set<ColocationKey> visitMailboxReceive(MailboxReceiveNode node, GreedyShuffleRewriteContext context) {
+    KeySelector<Object[], Object[]> selector = node.getPartitionKeySelector();
+    Set<ColocationKey> oldColocationKeys = context.getColocationKeys(node.getSenderStageId());
+    // If the current stage is not a join-stage, then we already know sender's distribution
+    if (!context.isJoinStage(node.getStageId())) {
+      if (selector == null) {
+        return new HashSet<>();
+      } else if (colocationKeyCondition(oldColocationKeys, selector)
+          && areServersSuperset(node.getStageId(), node.getSenderStageId())) {
+        node.setExchangeType(RelDistribution.Type.SINGLETON);
+        _stageMetadataMap.get(node.getStageId()).setServerInstances(
+            _stageMetadataMap.get(node.getSenderStageId()).getServerInstances());
+        return oldColocationKeys;
+      }
+      // This means we can't skip shuffle and there's a partitioning enforced by receiver.
+      int numPartitions = _stageMetadataMap.get(node.getStageId()).getServerInstances().size();
+      List<ColocationKey> colocationKeys =
+          ((FieldSelectionKeySelector) selector).getColumnIndices().stream()
+              .map(x -> new ColocationKey(x, numPartitions, selector.hashAlgorithm())).collect(Collectors.toList());
+      return new HashSet<>(colocationKeys);
+    }
+    // If the current stage is a join-stage then we already know whether shuffle can be skipped.
+    if (_canSkipShuffleForJoin) {
+      node.setExchangeType(RelDistribution.Type.SINGLETON);
+      // For the join-case, servers are already re-assigned in visitJoin. Moreover, we haven't yet changed sender's
+      // distribution.
+      ((MailboxSendNode) node.getSender()).setExchangeType(RelDistribution.Type.SINGLETON);
+      return oldColocationKeys;
+    } else if (selector == null) {
+      return new HashSet<>();
+    }
+    // This means we can't skip shuffle and there's a partitioning enforced by receiver.
+    int numPartitions = _stageMetadataMap.get(node.getStageId()).getServerInstances().size();
+    List<ColocationKey> colocationKeys =
+        ((FieldSelectionKeySelector) selector).getColumnIndices().stream()
+            .map(x -> new ColocationKey(x, numPartitions, selector.hashAlgorithm())).collect(Collectors.toList());
+    return new HashSet<>(colocationKeys);
+  }
+
+  @Override
+  public Set<ColocationKey> visitMailboxSend(MailboxSendNode node, GreedyShuffleRewriteContext context) {
+    Set<ColocationKey> oldColocationKeys = node.getInputs().get(0).visit(this, context);
+    KeySelector<Object[], Object[]> selector = node.getPartitionKeySelector();
+
+    boolean canSkipShuffleBasic = colocationKeyCondition(oldColocationKeys, selector);
+    // If receiver is not a join-stage, then we can determine distribution type now.
+    if (!context.isJoinStage(node.getReceiverStageId())) {
+      Set<ColocationKey> colocationKeys;
+      if (canSkipShuffleBasic && areServersSuperset(node.getReceiverStageId(), node.getStageId())) {
+        // Servers are not re-assigned on sender-side. If needed, they are re-assigned on the receiver side.
+        node.setExchangeType(RelDistribution.Type.SINGLETON);
+        colocationKeys = oldColocationKeys;
+      } else {
+        colocationKeys = new HashSet<>();
+      }
+      context.setColocationKeys(node.getStageId(), colocationKeys);
+      return colocationKeys;
+    }
+    // If receiver is a join-stage, remember partition-keys of the child node of MailboxSendNode.
+    Set<ColocationKey> mailboxSendColocationKeys = canSkipShuffleBasic ? oldColocationKeys : new HashSet<>();
+    context.setColocationKeys(node.getStageId(), mailboxSendColocationKeys);
+    return mailboxSendColocationKeys;
+  }
+
+  @Override
+  public Set<ColocationKey> visitProject(ProjectNode node, GreedyShuffleRewriteContext context) {
+    // Project reorders or removes keys
+    Set<ColocationKey> oldColocationKeys = node.getInputs().get(0).visit(this, context);
+
+    Map<Integer, Integer> oldToNewIndex = new HashMap<>();
+    for (int i = 0; i < node.getProjects().size(); i++) {
+      RexExpression rex = node.getProjects().get(i);
+      if (rex instanceof RexExpression.InputRef) {
+        int index = ((RexExpression.InputRef) rex).getIndex();
+        oldToNewIndex.put(index, i);
+      }
+    }
+
+    return computeNewColocationKeys(oldColocationKeys, oldToNewIndex);
+  }
+
+  @Override
+  public Set<ColocationKey> visitSort(SortNode node, GreedyShuffleRewriteContext context) {
+    return node.getInputs().get(0).visit(this, context);
+  }
+
+  @Override
+  public Set<ColocationKey> visitTableScan(TableScanNode node, GreedyShuffleRewriteContext context) {
+    TableConfig tableConfig =
+        _tableCache.getTableConfig(node.getTableName());
+    if (tableConfig == null) {
+      LOGGER.warn("Couldn't find tableConfig for {}", node.getTableName());
+      return new HashSet<>();
+    }
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    if (indexingConfig != null && indexingConfig.getSegmentPartitionConfig() != null) {
+      Map<String, ColumnPartitionConfig> columnPartitionMap =
+          indexingConfig.getSegmentPartitionConfig().getColumnPartitionMap();
+      if (columnPartitionMap != null) {
+        Set<String> partitionColumns = columnPartitionMap.keySet();
+        Set<ColocationKey> newColocationKeys = new HashSet<>();
+        for (int i = 0; i < node.getTableScanColumns().size(); i++) {
+          String columnName = node.getTableScanColumns().get(i);
+          if (partitionColumns.contains(node.getTableScanColumns().get(i))) {
+            int numPartitions = columnPartitionMap.get(columnName).getNumPartitions();
+            String hashAlgorithm = columnPartitionMap.get(columnName).getFunctionName();
+            newColocationKeys.add(new ColocationKey(i, numPartitions, hashAlgorithm));
+          }
+        }
+        return newColocationKeys;
+      }
+    }
+    return new HashSet<>();
+  }
+
+  @Override
+  public Set<ColocationKey> visitValue(ValueNode node, GreedyShuffleRewriteContext context) {
+    return new HashSet<>();
+  }
+
+  // TODO: Only equality joins can be colocated. We don't have join clause info available right now.

Review Comment:
   I think we need to pass that information from Calcite's LogicalJoin to our JoinNode implementation: https://github.com/apache/pinot/blob/master/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java#L125



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] yupeng9 commented on a diff in pull request #9873: [multistage] Add Greedy Shuffle Skip Optimizer for Colocated Joins

Posted by GitBox <gi...@apache.org>.
yupeng9 commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1071448684


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/colocated/ColocationKey.java:
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.colocated;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.calcite.rex.RexInputRef;
+
+
+/**
+ * ColocationKey describes how data is distributed in a given stage. It consists of a list of columns which are stored
+ * as a list of {@link RexInputRef#getIndex()}, the number of partitions and the hash-algorithm used. A given stage may
+ * have more than 1 ColocationKey, in which case one may use a {@link java.util.Set< ColocationKey >} to represent this

Review Comment:
   yup, this sounds right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] yupeng9 commented on pull request #9873: [multistage] Add Greedy Shuffle Skip Optimizer for Colocated Joins

Posted by GitBox <gi...@apache.org>.
yupeng9 commented on PR #9873:
URL: https://github.com/apache/pinot/pull/9873#issuecomment-1384338659

   build failure unrelated to the diff


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9873: [multistage] Add Pluggable Physical Optimizers + Greedy Shuffle Skip Optimizer

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1064571345


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java:
##########
@@ -98,6 +98,10 @@ protected List<Object[]> queryRunner(String sql) {
   protected List<Object[]> queryH2(String sql)
       throws Exception {
     Statement h2statement = _h2Connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    // TODO: Hack. Let's discuss how best to support this.
+    if (sql.contains("option")) {
+      sql = sql.replaceAll("option\\(useColocatedJoin\\=true\\)", "");

Review Comment:
   ^ Essentially the issue is that the H2 queries fail because of the option to enable useColocatedJoin.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9873: [multistage] Add Greedy Shuffle Skip Optimizer for Colocated Joins

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9873:
URL: https://github.com/apache/pinot/pull/9873#discussion_r1067779727


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/KeySelector.java:
##########
@@ -37,4 +37,9 @@
   OUT getKey(IN input);
 
   int computeHash(IN input);
+
+  /**
+   * @return the hash-algorithm used for distributing rows
+   */
+  String hashAlgorithm();

Review Comment:
   Yeah this is getting used because to make the colocation decision we need to know which hash algorithm is used to distribute the data in a given stage. In case a leaf stage decides to hash-distribute, this will set the ColocationKey of the receiving stage appropriately by setting the `ColocationKey._hashAlgorithm` to `KeySelector#hashAlgorithm()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org