You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "walterddr (via GitHub)" <gi...@apache.org> on 2023/05/04 03:43:37 UTC

[GitHub] [pinot] walterddr commented on a diff in pull request #10708: [multistage]Refactor Mailbox assignment logic to planner

walterddr commented on code in PR #10708:
URL: https://github.com/apache/pinot/pull/10708#discussion_r1184514389


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java:
##########
@@ -60,10 +60,17 @@ public QueryPlan constructDispatchablePlan(StageNode globalReceiverNode,
     dispatchablePlanContext.getDispatchablePlanStageRootMap().put(0, globalReceiverNode);
     // 3. add worker assignment after the dispatchable plan context is fulfilled after the visit.
     computeWorkerAssignment(globalReceiverNode, dispatchablePlanContext);
-    // 4. convert it into query plan.
+    // 4. compute the mailbox assignment for each stage.
+    computeMailboxAssignment(dispatchablePlanContext);
+    // 5. convert it into query plan.

Review Comment:
   Let's add a TODO here for the 2 methods. later we want to refactor these into pluggable algorithms. (e.g. worker assignment visitor and mailbox assignment visitor)
   
   but so far the entrypoint looks super clean and i love it



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java:
##########
@@ -122,7 +134,37 @@ private static Worker.StageMetadata toProtoStageMetadata(StageMetadata stageMeta
   private static Worker.WorkerMetadata toProtoWorkerMetadata(WorkerMetadata workerMetadata) {
     Worker.WorkerMetadata.Builder builder = Worker.WorkerMetadata.newBuilder();
     builder.setVirtualAddress(addressToProto(workerMetadata.getVirtualServerAddress()));
+    builder.putAllMailboxMetadata(toProtoMailboxMap(workerMetadata.getMailBoxInfosMap()));
     builder.putAllCustomProperty(workerMetadata.getCustomProperties());
     return builder.build();
   }
+
+  private static Map<Integer, Worker.MailboxMetadata> toProtoMailboxMap(
+      Map<Integer, List<MailboxMetadata>> mailBoxInfosMap) {
+    Map<Integer, Worker.MailboxMetadata> mailboxMetadataMap = new HashMap<>();
+    for (Map.Entry<Integer, List<MailboxMetadata>> entry : mailBoxInfosMap.entrySet()) {
+      mailboxMetadataMap.put(entry.getKey(), toProtoMailbox(entry.getValue()));
+    }
+    return mailboxMetadataMap;
+  }
+
+  private static Worker.MailboxMetadata toProtoMailbox(List<MailboxMetadata> mailboxMetadataList) {
+    Worker.MailboxMetadata.Builder builder = Worker.MailboxMetadata.newBuilder();
+    for (MailboxMetadata mailboxMetadata : mailboxMetadataList) {
+      builder.addMailboxId(mailboxMetadata.getMailBoxId());
+      builder.addVirtualAddress(mailboxMetadata.getVirtualAddress().toString());
+      mailboxMetadata.getCustomProperties().entrySet().stream().forEach(
+          entry -> builder.putCustomProperty(
+              toMailboxCustomPropertiesKeyPrefix(mailboxMetadata.getMailBoxId(), entry.getKey()), entry.getValue()));
+    }
+    return builder.build();
+  }
+
+  private static String toMailboxCustomPropertiesKeyPrefix(String mailBoxId, String key) {
+    return String.format("%s.%s", mailBoxId, key);
+  }
+
+  private static String fromMailboxCustomPropertiesKeyPrefix(String mailBoxId, String key) {
+    return key.split(mailBoxId + "\\.")[1];
+  }

Review Comment:
   for these mailbox specific customProperty utils we can put them in MailboxMetadata class IMO (to make it consistent with Stage and Worker level metadata class)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java:
##########
@@ -98,16 +93,33 @@ private static StageMetadata fromProtoStageMetadata(Worker.StageMetadata protoSt
   private static WorkerMetadata fromProtoWorkerMetadata(Worker.WorkerMetadata protoWorkerMetadata) {
     WorkerMetadata.Builder builder = new WorkerMetadata.Builder();
     builder.setVirtualServerAddress(protoToAddress(protoWorkerMetadata.getVirtualAddress()));
+    builder.putAllMailBoxInfosMap(protoToMailbox(protoWorkerMetadata.getMailboxMetadataMap()));
     builder.putAllCustomProperties(protoWorkerMetadata.getCustomPropertyMap());
     return builder.build();
   }
 
-  public static List<Worker.StageMetadata> stageMetadataListToProtoList(List<StageMetadata> stageMetadataList) {
-    List<Worker.StageMetadata> protoList = new ArrayList<>();
-    for (StageMetadata stageMetadata : stageMetadataList) {
-      protoList.add(toProtoStageMetadata(stageMetadata));
+  private static Map<Integer, List<MailboxMetadata>> protoToMailbox(

Review Comment:
   ```suggestion
     private static Map<Integer, List<MailboxMetadata>> fromProtoMailboxMap(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org