You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "walterddr (via GitHub)" <gi...@apache.org> on 2023/12/01 00:50:19 UTC

[PR] [multistage][draft] partition assignment refactor [pinot]

walterddr opened a new pull request, #12079:
URL: https://github.com/apache/pinot/pull/12079

   step 2 in #12015
   
   this PR:
   1. loosen the criteria for "co-assigning workers to the same partition to avoid shuffling" to non-leaf children
   2. reshape the rule of partition assignment & mailbox rule.
       - Exchange governs whether an exchange is "pre-partitioned" --> e.g. the input partition is the same desired by the exchange (exchange might still be needed if the keys are the same but functions are different)
       - For worker assignment
           - Exchange "pre-partitioned" is required for assigning the same set of workers from the 1st child
           - For leaf-stage, worker assignment is only controlled by `tableOptions`
       - For mailbox assignment
           - Exchange "pre-partitioned" is necessary and sufficient to assign direct exchange for 1st child; it is NOT sufficient for the other children (check logic needs to be applied)
           - `partition_parallelism` is only used between leaf-children and intermediate parent 
               - if they are both leaf (semi-join, partition_parallelism shouldn't apply; num_worker == num_partition of table)
               - if they are both intermediate stage, partition_parallelism shouldn't apply; num_worker == num_partition * partition_parallelism from leaf-table already)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [multistage]partition assignment refactor [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #12079:
URL: https://github.com/apache/pinot/pull/12079#discussion_r1419357282


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -208,10 +346,31 @@ private RoutingTable getRoutingTable(String tableName, TableType tableType, long
         CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + tableNameWithType), requestId);
   }
 
+  // --------------------------------------------------------------------------
+  // Partitioned leaf stage assignment
+  // --------------------------------------------------------------------------
   private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata metadata,
-      DispatchablePlanContext context, String partitionKey, int numPartitions, int partitionParallelism) {
+      DispatchablePlanContext context, String partitionKey, Map<String, String> tableOptions) {
+    // when partition key exist, we assign workers for leaf-stage in partitioned fashion.
+
+    String numPartitionsStr = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
+    Preconditions.checkState(numPartitionsStr != null, "'%s' must be provided for partition key: %s",
+        PinotHintOptions.TableHintOptions.PARTITION_SIZE, partitionKey);
+    int numPartitions = Integer.parseInt(numPartitionsStr);
+    Preconditions.checkState(numPartitions > 0, "'%s' must be positive, got: %s",
+        PinotHintOptions.TableHintOptions.PARTITION_SIZE, numPartitions);
+
+    String partitionFunction = tableOptions.getOrDefault(PinotHintOptions.TableHintOptions.PARTITION_FUNCTION,

Review Comment:
   Don't add default. In most cases this is `murmur` but we should not assume it



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java:
##########
@@ -35,18 +35,28 @@
 /**
  * The {@code DispatchablePlanMetadata} info contains the information for dispatching a particular plan fragment.
  *
- * <p>It contains information aboute:
+ * <p>It contains information
  * <ul>
- *   <li>the tables it is suppose to scan for</li>
- *   <li>the underlying segments a stage requires to execute upon.</li>
- *   <li>the server instances to which this stage should be execute on</li>
+ *   <li>extracted from {@link org.apache.pinot.query.planner.physical.DispatchablePlanVisitor}</li>
+ *   <li>extracted from {@link org.apache.pinot.query.planner.physical.PinotDispatchPlanner}</li>
  * </ul>
  */
 public class DispatchablePlanMetadata implements Serializable {
-  // These 2 fields are extracted from TableScanNode
+
+  // --------------------------------------------------------------------------
+  // Fields extracted with {@link DispatchablePlanVisitor}
+  // --------------------------------------------------------------------------
+  // info from TableNode
   private final List<String> _scannedTables;
   private Map<String, String> _tableOptions;
+  // info from MailboxSendNode - whether a stage is pre-partitioned by the same way the sending exchange desires
+  private boolean _isPrePartitioned;

Review Comment:
   (minor) I prefer `_isPartitioned` over `_isPrePartitioned`. What does `pre` stand for?



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -333,26 +371,11 @@ private void assignWorkersToIntermediateFragment(PlanFragment fragment, Dispatch
       throw new IllegalStateException(
           "No server instance found for intermediate stage for tables: " + Arrays.toString(tableNames.toArray()));
     }
-    if (metadata.isRequiresSingletonInstance()) {
-      // require singleton should return a single global worker ID with 0;
-      metadata.setWorkerIdToServerInstanceMap(Collections.singletonMap(0,
-          new QueryServerInstance(serverInstances.get(RANDOM.nextInt(serverInstances.size())))));
-    } else {
-      Map<String, String> options = context.getPlannerContext().getOptions();
-      int stageParallelism = Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
-      Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new HashMap<>();
-      int workerId = 0;
-      for (ServerInstance serverInstance : serverInstances) {
-        QueryServerInstance queryServerInstance = new QueryServerInstance(serverInstance);
-        for (int i = 0; i < stageParallelism; i++) {
-          workerIdToServerInstanceMap.put(workerId++, queryServerInstance);
-        }
-      }
-      metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
-    }
+    return serverInstances;
   }
 
-  private ColocatedTableInfo getColocatedTableInfo(String tableName, String partitionKey, int numPartitions) {
+  private ColocatedTableInfo getColocatedTableInfo(String tableName, String partitionKey, int numPartitions,

Review Comment:
   Either change it or add a TODO



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java:
##########
@@ -144,4 +145,23 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
     }
     return null;
   }
+
+  private boolean isDirectExchangeCompatible(DispatchablePlanMetadata sender, DispatchablePlanMetadata receiver) {
+    Map<Integer, QueryServerInstance> senderServerMap = sender.getWorkerIdToServerInstanceMap();
+    Map<Integer, QueryServerInstance> receiverServerMap = receiver.getWorkerIdToServerInstanceMap();
+
+    int numSenders = senderServerMap.size();
+    int numReceivers = receiverServerMap.size();
+    if (sender.getScannedTables().size() > 0 && receiver.getScannedTables().size() == 0) {
+      // leaf-to-intermediate condition
+      return numSenders * sender.getPartitionParallelism() == numReceivers
+          && sender.getPartitionFunction() != null
+          && sender.getPartitionFunction().equals(receiver.getPartitionFunction());

Review Comment:
   We should compare ignore cases



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [multistage]partition assignment refactor [pinot]

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #12079:
URL: https://github.com/apache/pinot/pull/12079#discussion_r1419618384


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java:
##########
@@ -35,18 +35,28 @@
 /**
  * The {@code DispatchablePlanMetadata} info contains the information for dispatching a particular plan fragment.
  *
- * <p>It contains information aboute:
+ * <p>It contains information
  * <ul>
- *   <li>the tables it is suppose to scan for</li>
- *   <li>the underlying segments a stage requires to execute upon.</li>
- *   <li>the server instances to which this stage should be execute on</li>
+ *   <li>extracted from {@link org.apache.pinot.query.planner.physical.DispatchablePlanVisitor}</li>
+ *   <li>extracted from {@link org.apache.pinot.query.planner.physical.PinotDispatchPlanner}</li>
  * </ul>
  */
 public class DispatchablePlanMetadata implements Serializable {
-  // These 2 fields are extracted from TableScanNode
+
+  // --------------------------------------------------------------------------
+  // Fields extracted with {@link DispatchablePlanVisitor}
+  // --------------------------------------------------------------------------
+  // info from TableNode
   private final List<String> _scannedTables;
   private Map<String, String> _tableOptions;
+  // info from MailboxSendNode - whether a stage is pre-partitioned by the same way the sending exchange desires
+  private boolean _isPrePartitioned;

Review Comment:
   yeah `prePartitioned` is not the same as `partitioned`. it means data in this stage is `partitioned` and the root of this stage (sender) is sending data with the same partitioning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [multistage]partition assignment refactor [pinot]

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #12079:
URL: https://github.com/apache/pinot/pull/12079#discussion_r1419388803


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -208,10 +346,31 @@ private RoutingTable getRoutingTable(String tableName, TableType tableType, long
         CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + tableNameWithType), requestId);
   }
 
+  // --------------------------------------------------------------------------
+  // Partitioned leaf stage assignment
+  // --------------------------------------------------------------------------
   private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata metadata,
-      DispatchablePlanContext context, String partitionKey, int numPartitions, int partitionParallelism) {
+      DispatchablePlanContext context, String partitionKey, Map<String, String> tableOptions) {
+    // when partition key exist, we assign workers for leaf-stage in partitioned fashion.
+
+    String numPartitionsStr = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
+    Preconditions.checkState(numPartitionsStr != null, "'%s' must be provided for partition key: %s",
+        PinotHintOptions.TableHintOptions.PARTITION_SIZE, partitionKey);
+    int numPartitions = Integer.parseInt(numPartitionsStr);
+    Preconditions.checkState(numPartitions > 0, "'%s' must be positive, got: %s",
+        PinotHintOptions.TableHintOptions.PARTITION_SIZE, numPartitions);
+
+    String partitionFunction = tableOptions.getOrDefault(PinotHintOptions.TableHintOptions.PARTITION_FUNCTION,

Review Comment:
   this would be backward-incompatible, all existing queries with table hints without hash function will fail



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [multistage]partition assignment refactor [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #12079:
URL: https://github.com/apache/pinot/pull/12079#discussion_r1419658517


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java:
##########
@@ -35,18 +35,28 @@
 /**
  * The {@code DispatchablePlanMetadata} info contains the information for dispatching a particular plan fragment.
  *
- * <p>It contains information aboute:
+ * <p>It contains information
  * <ul>
- *   <li>the tables it is suppose to scan for</li>
- *   <li>the underlying segments a stage requires to execute upon.</li>
- *   <li>the server instances to which this stage should be execute on</li>
+ *   <li>extracted from {@link org.apache.pinot.query.planner.physical.DispatchablePlanVisitor}</li>
+ *   <li>extracted from {@link org.apache.pinot.query.planner.physical.PinotDispatchPlanner}</li>
  * </ul>
  */
 public class DispatchablePlanMetadata implements Serializable {
-  // These 2 fields are extracted from TableScanNode
+
+  // --------------------------------------------------------------------------
+  // Fields extracted with {@link DispatchablePlanVisitor}
+  // --------------------------------------------------------------------------
+  // info from TableNode
   private final List<String> _scannedTables;
   private Map<String, String> _tableOptions;
+  // info from MailboxSendNode - whether a stage is pre-partitioned by the same way the sending exchange desires
+  private boolean _isPrePartitioned;

Review Comment:
   I see. I missed the javadoc that this is just storing the property of `MailboxSendNode`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [multistage]partition assignment refactor [pinot]

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #12079:
URL: https://github.com/apache/pinot/pull/12079#discussion_r1419614507


##########
pinot-query-runtime/src/test/resources/queries/QueryHints.json:
##########
@@ -83,6 +83,8 @@
       },
       {
         "description": "Colocated JOIN with partition column with partition parallelism in first table",
+        "ignored": true,
+        "comment": "partition parallelism mismatched in hint, this query shouldn't work at all",

Review Comment:
   note: allowed here but it will revert back to generic assignment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [multistage]partition assignment refactor [pinot]

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr merged PR #12079:
URL: https://github.com/apache/pinot/pull/12079


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [multistage]partition assignment refactor [pinot]

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #12079:
URL: https://github.com/apache/pinot/pull/12079#discussion_r1419386491


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java:
##########
@@ -35,18 +35,28 @@
 /**
  * The {@code DispatchablePlanMetadata} info contains the information for dispatching a particular plan fragment.
  *
- * <p>It contains information aboute:
+ * <p>It contains information
  * <ul>
- *   <li>the tables it is suppose to scan for</li>
- *   <li>the underlying segments a stage requires to execute upon.</li>
- *   <li>the server instances to which this stage should be execute on</li>
+ *   <li>extracted from {@link org.apache.pinot.query.planner.physical.DispatchablePlanVisitor}</li>
+ *   <li>extracted from {@link org.apache.pinot.query.planner.physical.PinotDispatchPlanner}</li>
  * </ul>
  */
 public class DispatchablePlanMetadata implements Serializable {
-  // These 2 fields are extracted from TableScanNode
+
+  // --------------------------------------------------------------------------
+  // Fields extracted with {@link DispatchablePlanVisitor}
+  // --------------------------------------------------------------------------
+  // info from TableNode
   private final List<String> _scannedTables;
   private Map<String, String> _tableOptions;
+  // info from MailboxSendNode - whether a stage is pre-partitioned by the same way the sending exchange desires
+  private boolean _isPrePartitioned;

Review Comment:
   "pre" stands for the data input to the mailbox is "already" partitioned --> isPartition is confusing b/c it can also mean the result of the mailbox is partitioned



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [multistage]partition assignment refactor [pinot]

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #12079:
URL: https://github.com/apache/pinot/pull/12079#discussion_r1419611728


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -208,10 +346,31 @@ private RoutingTable getRoutingTable(String tableName, TableType tableType, long
         CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + tableNameWithType), requestId);
   }
 
+  // --------------------------------------------------------------------------
+  // Partitioned leaf stage assignment
+  // --------------------------------------------------------------------------
   private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata metadata,
-      DispatchablePlanContext context, String partitionKey, int numPartitions, int partitionParallelism) {
+      DispatchablePlanContext context, String partitionKey, Map<String, String> tableOptions) {
+    // when partition key exist, we assign workers for leaf-stage in partitioned fashion.
+
+    String numPartitionsStr = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
+    Preconditions.checkState(numPartitionsStr != null, "'%s' must be provided for partition key: %s",
+        PinotHintOptions.TableHintOptions.PARTITION_SIZE, partitionKey);
+    int numPartitions = Integer.parseInt(numPartitionsStr);
+    Preconditions.checkState(numPartitions > 0, "'%s' must be positive, got: %s",
+        PinotHintOptions.TableHintOptions.PARTITION_SIZE, numPartitions);
+
+    String partitionFunction = tableOptions.getOrDefault(PinotHintOptions.TableHintOptions.PARTITION_FUNCTION,

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [multistage][draft] partition assignment refactor [pinot]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #12079:
URL: https://github.com/apache/pinot/pull/12079#issuecomment-1835254368

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   Attention: `49 lines` in your changes are missing coverage. Please review.
   > Comparison is base [(`b9ed378`)](https://app.codecov.io/gh/apache/pinot/commit/b9ed378355acc5313ebc1031510671f7045f29a9?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 61.65% compared to head [(`a65352f`)](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) 0.00%.
   > Report is 4 commits behind head on master.
   
   | [Files](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines |
   |---|---|---|
   | [.../org/apache/pinot/query/routing/WorkerManager.java](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcm91dGluZy9Xb3JrZXJNYW5hZ2VyLmphdmE=) | 0.00% | [34 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...ery/planner/physical/MailboxAssignmentVisitor.java](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9waHlzaWNhbC9NYWlsYm94QXNzaWdubWVudFZpc2l0b3IuamF2YQ==) | 0.00% | [3 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [.../pinot/query/planner/plannode/MailboxSendNode.java](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9wbGFubm9kZS9NYWlsYm94U2VuZE5vZGUuamF2YQ==) | 0.00% | [3 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [.../query/planner/logical/RelToPlanNodeConverter.java](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9sb2dpY2FsL1JlbFRvUGxhbk5vZGVDb252ZXJ0ZXIuamF2YQ==) | 0.00% | [2 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...ery/planner/physical/DispatchablePlanMetadata.java](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9waHlzaWNhbC9EaXNwYXRjaGFibGVQbGFuTWV0YWRhdGEuamF2YQ==) | 0.00% | [2 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...che/pinot/query/planner/plannode/ExchangeNode.java](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9wbGFubm9kZS9FeGNoYW5nZU5vZGUuamF2YQ==) | 0.00% | [2 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...uery/planner/logical/PinotLogicalQueryPlanner.java](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9sb2dpY2FsL1Bpbm90TG9naWNhbFF1ZXJ5UGxhbm5lci5qYXZh) | 0.00% | [1 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...he/pinot/query/planner/logical/PlanFragmenter.java](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9sb2dpY2FsL1BsYW5GcmFnbWVudGVyLmphdmE=) | 0.00% | [1 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   | [...uery/planner/physical/DispatchablePlanVisitor.java](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9waHlzaWNhbC9EaXNwYXRjaGFibGVQbGFuVmlzaXRvci5qYXZh) | 0.00% | [1 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) |
   
   <details><summary>Additional details and impacted files</summary>
   
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #12079       +/-   ##
   =============================================
   - Coverage     61.65%    0.00%   -61.66%     
   =============================================
     Files          2389     2313       -76     
     Lines        129819   126082     -3737     
     Branches      20082    19525      -557     
   =============================================
   - Hits          80036        0    -80036     
   - Misses        43958   126082    +82124     
   + Partials       5825        0     -5825     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pinot/pull/12079/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/12079/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration](https://app.codecov.io/gh/apache/pinot/pull/12079/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-0.01%)` | :arrow_down: |
   | [integration1](https://app.codecov.io/gh/apache/pinot/pull/12079/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration2](https://app.codecov.io/gh/apache/pinot/pull/12079/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (ø)` | |
   | [java-11](https://app.codecov.io/gh/apache/pinot/pull/12079/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-21](https://app.codecov.io/gh/apache/pinot/pull/12079/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.54%)` | :arrow_down: |
   | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/12079/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.62%)` | :arrow_down: |
   | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/12079/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [temurin](https://app.codecov.io/gh/apache/pinot/pull/12079/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `0.00% <0.00%> (-61.66%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pinot/pull/12079/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/12079/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/12079/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   
   </details>
   
   [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/pinot/pull/12079?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).   
   :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [multistage]partition assignment refactor [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #12079:
URL: https://github.com/apache/pinot/pull/12079#discussion_r1419411451


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -208,10 +346,31 @@ private RoutingTable getRoutingTable(String tableName, TableType tableType, long
         CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + tableNameWithType), requestId);
   }
 
+  // --------------------------------------------------------------------------
+  // Partitioned leaf stage assignment
+  // --------------------------------------------------------------------------
   private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata metadata,
-      DispatchablePlanContext context, String partitionKey, int numPartitions, int partitionParallelism) {
+      DispatchablePlanContext context, String partitionKey, Map<String, String> tableOptions) {
+    // when partition key exist, we assign workers for leaf-stage in partitioned fashion.
+
+    String numPartitionsStr = tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
+    Preconditions.checkState(numPartitionsStr != null, "'%s' must be provided for partition key: %s",
+        PinotHintOptions.TableHintOptions.PARTITION_SIZE, partitionKey);
+    int numPartitions = Integer.parseInt(numPartitionsStr);
+    Preconditions.checkState(numPartitions > 0, "'%s' must be positive, got: %s",
+        PinotHintOptions.TableHintOptions.PARTITION_SIZE, numPartitions);
+
+    String partitionFunction = tableOptions.getOrDefault(PinotHintOptions.TableHintOptions.PARTITION_FUNCTION,

Review Comment:
   Understood. Currently we assume all table options using the same partition function. Putting `Murmur` as the default might be safer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [multistage]partition assignment refactor [pinot]

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #12079:
URL: https://github.com/apache/pinot/pull/12079#discussion_r1414670200


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -58,15 +58,22 @@
 public class WorkerManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(WorkerManager.class);
   private static final Random RANDOM = new Random();
+  private static final String DEFAULT_PARTITION_FUNCTION = "Hashcode";

Review Comment:
   not sure if this is the best way to go. i am also ok with having to ask this pass in as mandatory



##########
pinot-query-runtime/src/test/resources/queries/QueryHints.json:
##########
@@ -83,6 +83,8 @@
       },
       {
         "description": "Colocated JOIN with partition column with partition parallelism in first table",
+        "ignored": true,
+        "comment": "partition parallelism mismatched in hint, this query shouldn't work at all",

Review Comment:
   previously we decided to allow this but i guess we should not. putting an ignore here first but i think we should not allow this and should throw exception



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -333,26 +371,11 @@ private void assignWorkersToIntermediateFragment(PlanFragment fragment, Dispatch
       throw new IllegalStateException(
           "No server instance found for intermediate stage for tables: " + Arrays.toString(tableNames.toArray()));
     }
-    if (metadata.isRequiresSingletonInstance()) {
-      // require singleton should return a single global worker ID with 0;
-      metadata.setWorkerIdToServerInstanceMap(Collections.singletonMap(0,
-          new QueryServerInstance(serverInstances.get(RANDOM.nextInt(serverInstances.size())))));
-    } else {
-      Map<String, String> options = context.getPlannerContext().getOptions();
-      int stageParallelism = Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
-      Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new HashMap<>();
-      int workerId = 0;
-      for (ServerInstance serverInstance : serverInstances) {
-        QueryServerInstance queryServerInstance = new QueryServerInstance(serverInstance);
-        for (int i = 0; i < stageParallelism; i++) {
-          workerIdToServerInstanceMap.put(workerId++, queryServerInstance);
-        }
-      }
-      metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
-    }
+    return serverInstances;
   }
 
-  private ColocatedTableInfo getColocatedTableInfo(String tableName, String partitionKey, int numPartitions) {
+  private ColocatedTableInfo getColocatedTableInfo(String tableName, String partitionKey, int numPartitions,

Review Comment:
   technically the class should be named "PartitionTableInfo". this doesn't indicate "co-locate" at all



##########
pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json:
##########
@@ -1,6 +1,26 @@
 {
   "pinot_hint_option_tests": {
     "queries": [
+      {
+        "description": "hint table without partitioning should throw exception",
+        "sql": "EXPLAIN PLAN FOR SELECT * FROM d /*+ tableOptions(partition_key='col1', partition_size='4') */ LIMIT 10",
+        "expectedException": "Error composing query plan for:.*"

Review Comment:
   TODO: fix this test, it should always check the nested reason b/c that's always wrapped in parser throw or planner throw. doesn't make sense to check just the top level msg



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [multistage]partition assignment refactor [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #12079:
URL: https://github.com/apache/pinot/pull/12079#discussion_r1419410789


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java:
##########
@@ -35,18 +35,28 @@
 /**
  * The {@code DispatchablePlanMetadata} info contains the information for dispatching a particular plan fragment.
  *
- * <p>It contains information aboute:
+ * <p>It contains information
  * <ul>
- *   <li>the tables it is suppose to scan for</li>
- *   <li>the underlying segments a stage requires to execute upon.</li>
- *   <li>the server instances to which this stage should be execute on</li>
+ *   <li>extracted from {@link org.apache.pinot.query.planner.physical.DispatchablePlanVisitor}</li>
+ *   <li>extracted from {@link org.apache.pinot.query.planner.physical.PinotDispatchPlanner}</li>
  * </ul>
  */
 public class DispatchablePlanMetadata implements Serializable {
-  // These 2 fields are extracted from TableScanNode
+
+  // --------------------------------------------------------------------------
+  // Fields extracted with {@link DispatchablePlanVisitor}
+  // --------------------------------------------------------------------------
+  // info from TableNode
   private final List<String> _scannedTables;
   private Map<String, String> _tableOptions;
+  // info from MailboxSendNode - whether a stage is pre-partitioned by the same way the sending exchange desires
+  private boolean _isPrePartitioned;

Review Comment:
   `prePartitioned` in mailbox makes sense, but is strange in stage metadata. Based on my understanding, it stands for the data within a stage being partitioned



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [multistage]partition assignment refactor [pinot]

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #12079:
URL: https://github.com/apache/pinot/pull/12079#discussion_r1419612768


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java:
##########
@@ -333,26 +371,11 @@ private void assignWorkersToIntermediateFragment(PlanFragment fragment, Dispatch
       throw new IllegalStateException(
           "No server instance found for intermediate stage for tables: " + Arrays.toString(tableNames.toArray()));
     }
-    if (metadata.isRequiresSingletonInstance()) {
-      // require singleton should return a single global worker ID with 0;
-      metadata.setWorkerIdToServerInstanceMap(Collections.singletonMap(0,
-          new QueryServerInstance(serverInstances.get(RANDOM.nextInt(serverInstances.size())))));
-    } else {
-      Map<String, String> options = context.getPlannerContext().getOptions();
-      int stageParallelism = Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
-      Map<Integer, QueryServerInstance> workerIdToServerInstanceMap = new HashMap<>();
-      int workerId = 0;
-      for (ServerInstance serverInstance : serverInstances) {
-        QueryServerInstance queryServerInstance = new QueryServerInstance(serverInstance);
-        for (int i = 0; i < stageParallelism; i++) {
-          workerIdToServerInstanceMap.put(workerId++, queryServerInstance);
-        }
-      }
-      metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
-    }
+    return serverInstances;
   }
 
-  private ColocatedTableInfo getColocatedTableInfo(String tableName, String partitionKey, int numPartitions) {
+  private ColocatedTableInfo getColocatedTableInfo(String tableName, String partitionKey, int numPartitions,

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [multistage]partition assignment refactor [pinot]

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #12079:
URL: https://github.com/apache/pinot/pull/12079#discussion_r1419625532


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java:
##########
@@ -144,4 +145,23 @@ public Void process(PlanNode node, DispatchablePlanContext context) {
     }
     return null;
   }
+
+  private boolean isDirectExchangeCompatible(DispatchablePlanMetadata sender, DispatchablePlanMetadata receiver) {
+    Map<Integer, QueryServerInstance> senderServerMap = sender.getWorkerIdToServerInstanceMap();
+    Map<Integer, QueryServerInstance> receiverServerMap = receiver.getWorkerIdToServerInstanceMap();
+
+    int numSenders = senderServerMap.size();
+    int numReceivers = receiverServerMap.size();
+    if (sender.getScannedTables().size() > 0 && receiver.getScannedTables().size() == 0) {
+      // leaf-to-intermediate condition
+      return numSenders * sender.getPartitionParallelism() == numReceivers
+          && sender.getPartitionFunction() != null
+          && sender.getPartitionFunction().equals(receiver.getPartitionFunction());

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org