You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/10 17:59:02 UTC

[GitHub] [pinot] walterddr commented on a diff in pull request #9778: [multistage] Mailbox receive operator test

walterddr commented on code in PR #9778:
URL: https://github.com/apache/pinot/pull/9778#discussion_r1019449248


##########
pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java:
##########
@@ -65,6 +65,7 @@ public void onMatch(RelOptRuleCall call) {
 
     if (joinInfo.leftKeys.isEmpty()) {
       // when there's no JOIN key, use broadcast.
+      // TODO: Double check broadcast distribution won't cause problems

Review Comment:
   could you explain what kind of problem?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -50,23 +48,26 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
 
   private final MailboxService<TransferableBlock> _mailboxService;
   private final RelDistribution.Type _exchangeType;
-  private final KeySelector<Object[], Object[]> _keySelector;
   private final List<ServerInstance> _sendingStageInstances;
-  private final DataSchema _dataSchema;
   private final String _hostName;
   private final int _port;
   private final long _jobId;
   private final int _stageId;
   private final long _timeout;
-
   private int _serverIdx;
   private TransferableBlock _upstreamErrorBlock;
 
-  public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
-      List<ServerInstance> sendingStageInstances, RelDistribution.Type exchangeType,
-      KeySelector<Object[], Object[]> keySelector, String hostName, int port, long jobId, int stageId) {
-    _dataSchema = dataSchema;
+  public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService,
+      List<ServerInstance> sendingStageInstances, RelDistribution.Type exchangeType, String hostName, int port,
+      long jobId, int stageId) {
     _mailboxService = mailboxService;
+    Preconditions.checkState(exchangeType != RelDistribution.Type.RANDOM_DISTRIBUTED,
+        "Random distribution is not supported");
+    Preconditions.checkState(exchangeType != RelDistribution.Type.ANY, "Any distribution is not supported");
+    Preconditions.checkState(exchangeType != RelDistribution.Type.RANGE_DISTRIBUTED,
+        "Range distribution is not supported");
+    Preconditions.checkState(exchangeType != RelDistribution.Type.ROUND_ROBIN_DISTRIBUTED,
+        "Round robin distribution is not supported");

Review Comment:
   i would say add a static SUPPORTED_EXCHANGE as `private static final Set` and assert exchange type is contained in the set
   ```suggestion
   private static final Set<ExchangeType> SUPPORTED_EXCHANGE_TYPES = ImmutableSet.of(BROADCAST, HASH, SINGELTON);
   // ...
       Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType), "Exchange/Distribution type: " + exchangeType + " is not supported!");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org