You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/10 00:42:53 UTC

[GitHub] [pinot] agavra opened a new pull request, #9773: [multistage] refactor MailboxSendOperator for testability and tests

agavra opened a new pull request, #9773:
URL: https://github.com/apache/pinot/pull/9773

   This PR does two things: refactors `MailboxSendOperator` into testable components, and then tests each of those components.
   
   The refactor essentially factors out each of the exchange logic (hash, singleton, random, etc...) into their own classes as well as a top level class for all the common exchange logic. Now `MailboxSendOperator` can be really dumb and just pipes blocks to the underlying sender.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on pull request #9773: [multistage] refactor MailboxSendOperator for testability and tests

Posted by GitBox <gi...@apache.org>.
61yao commented on PR #9773:
URL: https://github.com/apache/pinot/pull/9773#issuecomment-1311366672

   Sorry for the ask, but do you mind breaking down this PR to smaller ones? It is kinda hard to review 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on pull request #9773: [multistage] refactor MailboxSendOperator for testability and tests

Posted by GitBox <gi...@apache.org>.
agavra commented on PR #9773:
URL: https://github.com/apache/pinot/pull/9773#issuecomment-1312922599

   @walterddr i had to force push because of rebasing and some refs I messed up locally, but the changes since you last reviewed:
   
   - I figured out the timeout issues, it was because i was using `LinkedList` instead of `ArrayList` in the `HashExchange` code. Turns out it's dramatically (think 100x slower) to do that when you have lots of rows
   - I changed the pattern to iterator as discussed
   - I made a few changes to account for the new `isLeafStageSender` flag that was introduced in your other PR (merge conflict)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on pull request #9773: [multistage] refactor MailboxSendOperator for testability and tests

Posted by GitBox <gi...@apache.org>.
agavra commented on PR #9773:
URL: https://github.com/apache/pinot/pull/9773#issuecomment-1312902006

   @61yao this PR cannot meaningfully be broken into smaller PRs - it splits a single file into five and adds tests for those additional files. any additional split would require adding unnecessary abstractions.
   
   As an aside, I don't think it's that big at all - about 200 lines are just file headers for newly created files and 500 lines are test-only code (so it's really only about 200/300 lines of production code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9773: [multistage] refactor MailboxSendOperator for testability and tests

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9773:
URL: https://github.com/apache/pinot/pull/9773#discussion_r1019327467


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java:
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.util.List;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+
+
+/**
+ * Interface for splitting transferable blocks. This is used for ensuring
+ * that the blocks that are sent along the wire play nicely with the
+ * underlying transport.
+ */
+public interface BlockSplitter {
+
+  /**
+   * @return a list of blocks that was split from the original {@code block}
+   */
+  List<TransferableBlock> split(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize);

Review Comment:
   (major) this design is cleaner but i also wonder if this introduces unnecessary looping of the dataset. for example, hash distribution + split block loops over the entire data set twice instead of once.
   
   although we are not doing it now, we can potentially (1) partition by key and (2) put them in split block, (3) and even send them over the wire, at a single loop. but i am not sure how to achieve that in these interface settings. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9773: [multistage] refactor MailboxSendOperator for testability and tests

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9773:
URL: https://github.com/apache/pinot/pull/9773#discussion_r1019488907


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.exchange;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.SendingMailbox;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.runtime.blocks.BlockSplitter;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+
+
+/**
+ * A class that contains the logic for properly distributing
+ */

Review Comment:
   😬 I didn't complete my sentence! Thanks for noticing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9773: [multistage] refactor MailboxSendOperator for testability and tests

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9773:
URL: https://github.com/apache/pinot/pull/9773#discussion_r1022129096


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -49,57 +47,67 @@
  */
 public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class);
+
   private static final String EXPLAIN_NAME = "MAILBOX_SEND";
   private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE =
       ImmutableSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED,
           RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED);
-  private static final Random RANDOM = new Random();
-  // TODO: Deduct this value via grpc config maximum byte size; and make it configurable with override.
-  // TODO: Max block size is a soft limit. only counts fixedSize datatable byte buffer
-  private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024;
-
-  private final List<ServerInstance> _receivingStageInstances;
-  private final RelDistribution.Type _exchangeType;
-  private final KeySelector<Object[], Object[]> _keySelector;
-  private final String _serverHostName;
-  private final int _serverPort;
-  private final long _jobId;
-  private final int _stageId;
-  private final MailboxService<TransferableBlock> _mailboxService;
-  private final DataSchema _dataSchema;
-  private final boolean _isLeafStageSender;
-  private Operator<TransferableBlock> _dataTableBlockBaseOperator;
-
-  public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
+
+  private final Operator<TransferableBlock> _dataTableBlockBaseOperator;
+  private final BlockExchange _exchange;
+
+  @VisibleForTesting
+  interface BlockExchangeFactory {
+    BlockExchange build(MailboxService<TransferableBlock> mailboxService, List<MailboxIdentifier> destinations,
+        RelDistribution.Type exchange, KeySelector<Object[], Object[]> selector, BlockSplitter splitter);
+  }
+
+  @VisibleForTesting
+  interface MailboxIdGenerator {
+    MailboxIdentifier generate(ServerInstance server);
+  }

Review Comment:
   great idea!



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -49,57 +47,67 @@
  */
 public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class);
+
   private static final String EXPLAIN_NAME = "MAILBOX_SEND";
   private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE =
       ImmutableSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED,
           RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED);
-  private static final Random RANDOM = new Random();
-  // TODO: Deduct this value via grpc config maximum byte size; and make it configurable with override.
-  // TODO: Max block size is a soft limit. only counts fixedSize datatable byte buffer
-  private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024;
-
-  private final List<ServerInstance> _receivingStageInstances;
-  private final RelDistribution.Type _exchangeType;
-  private final KeySelector<Object[], Object[]> _keySelector;
-  private final String _serverHostName;
-  private final int _serverPort;
-  private final long _jobId;
-  private final int _stageId;
-  private final MailboxService<TransferableBlock> _mailboxService;
-  private final DataSchema _dataSchema;
-  private final boolean _isLeafStageSender;
-  private Operator<TransferableBlock> _dataTableBlockBaseOperator;
-
-  public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
+
+  private final Operator<TransferableBlock> _dataTableBlockBaseOperator;
+  private final BlockExchange _exchange;
+
+  @VisibleForTesting
+  interface BlockExchangeFactory {
+    BlockExchange build(MailboxService<TransferableBlock> mailboxService, List<MailboxIdentifier> destinations,
+        RelDistribution.Type exchange, KeySelector<Object[], Object[]> selector, BlockSplitter splitter);
+  }
+
+  @VisibleForTesting
+  interface MailboxIdGenerator {
+    MailboxIdentifier generate(ServerInstance server);
+  }
+
+  public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
       Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, String hostName, int port,
       long jobId, int stageId, boolean isLeafStageSender) {
-    _dataSchema = dataSchema;
-    _mailboxService = mailboxService;
+    this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, exchangeType, keySelector,
+        isLeafStageSender, server -> toMailboxId(server, jobId, stageId, hostName, port), BlockExchange::getExchange);
+  }
+
+  @VisibleForTesting
+  MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
+      Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
+      RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector,
+      boolean isLeafStageSender, MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory) {
     _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
-    _exchangeType = exchangeType;
-    if (_exchangeType == RelDistribution.Type.SINGLETON) {
+
+    List<MailboxIdentifier> receivingMailboxes;
+    if (exchangeType == RelDistribution.Type.SINGLETON) {
+      // TODO: this logic should be moved into SingletonExchange
       ServerInstance singletonInstance = null;
       for (ServerInstance serverInstance : receivingStageInstances) {
-        if (serverInstance.getHostname().equals(_mailboxService.getHostname())
-            && serverInstance.getQueryMailboxPort() == _mailboxService.getMailboxPort()) {
+        if (serverInstance.getHostname().equals(mailboxService.getHostname())
+            && serverInstance.getQueryMailboxPort() == mailboxService.getMailboxPort()) {
           Preconditions.checkState(singletonInstance == null, "multiple instance found for singleton exchange type!");
           singletonInstance = serverInstance;
         }
       }
       Preconditions.checkNotNull(singletonInstance, "Unable to find receiving instance for singleton exchange");
-      _receivingStageInstances = Collections.singletonList(singletonInstance);
+      receivingMailboxes = Collections.singletonList(mailboxIdGenerator.generate(singletonInstance));
     } else {
-      _receivingStageInstances = receivingStageInstances;
+      receivingMailboxes = receivingStageInstances
+          .stream()
+          .map(mailboxIdGenerator::generate)
+          .collect(Collectors.toList());
     }
-    _keySelector = keySelector;
-    _serverHostName = hostName;
-    _serverPort = port;
-    _jobId = jobId;
-    _stageId = stageId;
-    Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(_exchangeType),
-        String.format("Exchange type '%s' is not supported yet", _exchangeType));
-    _isLeafStageSender = isLeafStageSender;
+
+    BlockSplitter splitter = (block, type, size)
+        -> TransferableBlockUtils.splitBlock(block, type, size, isLeafStageSender);
+    _exchange = blockExchangeFactory.build(mailboxService, receivingMailboxes, exchangeType, keySelector, splitter);
+
+    Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(exchangeType),
+        String.format("Exchange type '%s' is not supported yet", exchangeType));

Review Comment:
   why are we checking here instead of at the beginning?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java:
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.util.Iterator;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+
+
+/**
+ * Interface for splitting transferable blocks. This is used for ensuring
+ * that the blocks that are sent along the wire play nicely with the
+ * underlying transport.
+ */
+public interface BlockSplitter {

Review Comment:
   seems like we can also make this an inner interface in BlockExchange.java? 
   
   any chance we will use this other than test purpose?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -49,57 +47,67 @@
  */
 public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class);
+
   private static final String EXPLAIN_NAME = "MAILBOX_SEND";
   private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE =
       ImmutableSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED,
           RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED);
-  private static final Random RANDOM = new Random();
-  // TODO: Deduct this value via grpc config maximum byte size; and make it configurable with override.
-  // TODO: Max block size is a soft limit. only counts fixedSize datatable byte buffer
-  private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024;
-
-  private final List<ServerInstance> _receivingStageInstances;
-  private final RelDistribution.Type _exchangeType;
-  private final KeySelector<Object[], Object[]> _keySelector;
-  private final String _serverHostName;
-  private final int _serverPort;
-  private final long _jobId;
-  private final int _stageId;
-  private final MailboxService<TransferableBlock> _mailboxService;
-  private final DataSchema _dataSchema;
-  private final boolean _isLeafStageSender;
-  private Operator<TransferableBlock> _dataTableBlockBaseOperator;
-
-  public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
+
+  private final Operator<TransferableBlock> _dataTableBlockBaseOperator;
+  private final BlockExchange _exchange;
+
+  @VisibleForTesting
+  interface BlockExchangeFactory {
+    BlockExchange build(MailboxService<TransferableBlock> mailboxService, List<MailboxIdentifier> destinations,
+        RelDistribution.Type exchange, KeySelector<Object[], Object[]> selector, BlockSplitter splitter);
+  }
+
+  @VisibleForTesting
+  interface MailboxIdGenerator {
+    MailboxIdentifier generate(ServerInstance server);
+  }
+
+  public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
       Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, String hostName, int port,
       long jobId, int stageId, boolean isLeafStageSender) {
-    _dataSchema = dataSchema;
-    _mailboxService = mailboxService;
+    this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, exchangeType, keySelector,
+        isLeafStageSender, server -> toMailboxId(server, jobId, stageId, hostName, port), BlockExchange::getExchange);
+  }
+
+  @VisibleForTesting
+  MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
+      Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
+      RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector,
+      boolean isLeafStageSender, MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory) {
     _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
-    _exchangeType = exchangeType;
-    if (_exchangeType == RelDistribution.Type.SINGLETON) {
+
+    List<MailboxIdentifier> receivingMailboxes;
+    if (exchangeType == RelDistribution.Type.SINGLETON) {
+      // TODO: this logic should be moved into SingletonExchange

Review Comment:
   any reason we can't create a SingletonExchange? in this PR? i won't imagine too much code change right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9773: [multistage] refactor MailboxSendOperator for testability and tests

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9773:
URL: https://github.com/apache/pinot/pull/9773#discussion_r1022137874


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -49,57 +47,67 @@
  */
 public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class);
+
   private static final String EXPLAIN_NAME = "MAILBOX_SEND";
   private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE =
       ImmutableSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED,
           RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED);
-  private static final Random RANDOM = new Random();
-  // TODO: Deduct this value via grpc config maximum byte size; and make it configurable with override.
-  // TODO: Max block size is a soft limit. only counts fixedSize datatable byte buffer
-  private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024;
-
-  private final List<ServerInstance> _receivingStageInstances;
-  private final RelDistribution.Type _exchangeType;
-  private final KeySelector<Object[], Object[]> _keySelector;
-  private final String _serverHostName;
-  private final int _serverPort;
-  private final long _jobId;
-  private final int _stageId;
-  private final MailboxService<TransferableBlock> _mailboxService;
-  private final DataSchema _dataSchema;
-  private final boolean _isLeafStageSender;
-  private Operator<TransferableBlock> _dataTableBlockBaseOperator;
-
-  public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
+
+  private final Operator<TransferableBlock> _dataTableBlockBaseOperator;
+  private final BlockExchange _exchange;
+
+  @VisibleForTesting
+  interface BlockExchangeFactory {
+    BlockExchange build(MailboxService<TransferableBlock> mailboxService, List<MailboxIdentifier> destinations,
+        RelDistribution.Type exchange, KeySelector<Object[], Object[]> selector, BlockSplitter splitter);
+  }
+
+  @VisibleForTesting
+  interface MailboxIdGenerator {
+    MailboxIdentifier generate(ServerInstance server);
+  }
+
+  public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
       Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, String hostName, int port,
       long jobId, int stageId, boolean isLeafStageSender) {
-    _dataSchema = dataSchema;
-    _mailboxService = mailboxService;
+    this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, exchangeType, keySelector,
+        isLeafStageSender, server -> toMailboxId(server, jobId, stageId, hostName, port), BlockExchange::getExchange);
+  }
+
+  @VisibleForTesting
+  MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
+      Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
+      RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector,
+      boolean isLeafStageSender, MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory) {
     _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
-    _exchangeType = exchangeType;
-    if (_exchangeType == RelDistribution.Type.SINGLETON) {
+
+    List<MailboxIdentifier> receivingMailboxes;
+    if (exchangeType == RelDistribution.Type.SINGLETON) {
+      // TODO: this logic should be moved into SingletonExchange

Review Comment:
   good question. i guess it requires a separate clean up. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9773: [multistage] refactor MailboxSendOperator for testability and tests

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9773:
URL: https://github.com/apache/pinot/pull/9773#discussion_r1019322644


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -18,86 +18,86 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Random;
 import java.util.Set;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
-import org.apache.pinot.common.datablock.BaseDataBlock;
-import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxIdentifier;
 import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.mailbox.SendingMailbox;
 import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pinot.query.runtime.operator.exchange.BlockExchange;
 
 
 /**
  * This {@code MailboxSendOperator} is created to send {@link TransferableBlock}s to the receiving end.
  */
 public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class);
   private static final String EXPLAIN_NAME = "MAILBOX_SEND";
   private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE =
       ImmutableSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED,
           RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED);
-  private static final Random RANDOM = new Random();
-  // TODO: Deduct this value via grpc config maximum byte size; and make it configurable with override.
-  // TODO: Max block size is a soft limit. only counts fixedSize datatable byte buffer
-  private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024;
-
-  private final List<ServerInstance> _receivingStageInstances;
-  private final RelDistribution.Type _exchangeType;
-  private final KeySelector<Object[], Object[]> _keySelector;
-  private final String _serverHostName;
-  private final int _serverPort;
-  private final long _jobId;
-  private final int _stageId;
-  private final MailboxService<TransferableBlock> _mailboxService;
-  private final DataSchema _dataSchema;
-  private Operator<TransferableBlock> _dataTableBlockBaseOperator;
-
-  public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
+
+  private final Operator<TransferableBlock> _dataTableBlockBaseOperator;
+  private final BlockExchange _exchange;
+
+  interface BlockExchangeFactory {
+    BlockExchange build(MailboxService<TransferableBlock> mailboxService, List<MailboxIdentifier> destinations,
+        RelDistribution.Type exchange, KeySelector<Object[], Object[]> selector);
+  }
+
+  interface GenerateMailboxId {

Review Comment:
   `mailboxIDGenerator` ? 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java:
##########
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.exchange;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.query.mailbox.MailboxIdentifier;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.SendingMailbox;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.runtime.blocks.BlockSplitter;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+
+
+/**
+ * A class that contains the logic for properly distributing
+ */

Review Comment:
   not clear javadoc?
   properly distributing blocks? what's the definition of "properly"?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java:
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.util.List;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+
+
+/**
+ * Interface for splitting transferable blocks. This is used for ensuring
+ * that the blocks that are sent along the wire play nicely with the
+ * underlying transport.
+ */
+public interface BlockSplitter {
+
+  /**
+   * @return a list of blocks that was split from the original {@code block}
+   */
+  List<TransferableBlock> split(TransferableBlock block, BaseDataBlock.Type type, int maxBlockSize);

Review Comment:
   (major) this design is cleaner but i also wonder if this introduces unnecessary looping of the dataset. 
   
   for example, hash distribution + split block loops over the entire data set twice instead of once: we can actually partition by key and put them in split block at the same time, but i am not sure how to achieve that in these interface settings. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on pull request #9773: [multistage] refactor MailboxSendOperator for testability and tests

Posted by GitBox <gi...@apache.org>.
61yao commented on PR #9773:
URL: https://github.com/apache/pinot/pull/9773#issuecomment-1312930559

   > @61yao this PR cannot meaningfully be broken into smaller PRs - it splits a single file into five and adds tests for those additional files. any additional split would require adding unnecessary abstractions.
   > 
   > As an aside, I don't think it's that big at all - about 200 lines are just file headers for newly created files and 500 lines are test-only code (so it's really only about 200/300 lines of production code)
   > 
   > I'm also happy to sync up offline and run you through the PR if you think that'll help.
   
   I am ok with not breaking up this if it is hard to do. One of the reason I asked is when you break things, it is hard to identify where it breaks. (like the linked list issue you mentioned). It is hard to notice an array changes to a link list in the review as well.. 
   Anyway, I am happy the issues gets resolved. I'll try my best to look current PR as it is.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9773: [multistage] refactor MailboxSendOperator for testability and tests

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9773:
URL: https://github.com/apache/pinot/pull/9773#discussion_r1022135489


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -49,57 +47,67 @@
  */
 public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class);
+
   private static final String EXPLAIN_NAME = "MAILBOX_SEND";
   private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE =
       ImmutableSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED,
           RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED);
-  private static final Random RANDOM = new Random();
-  // TODO: Deduct this value via grpc config maximum byte size; and make it configurable with override.
-  // TODO: Max block size is a soft limit. only counts fixedSize datatable byte buffer
-  private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024;
-
-  private final List<ServerInstance> _receivingStageInstances;
-  private final RelDistribution.Type _exchangeType;
-  private final KeySelector<Object[], Object[]> _keySelector;
-  private final String _serverHostName;
-  private final int _serverPort;
-  private final long _jobId;
-  private final int _stageId;
-  private final MailboxService<TransferableBlock> _mailboxService;
-  private final DataSchema _dataSchema;
-  private final boolean _isLeafStageSender;
-  private Operator<TransferableBlock> _dataTableBlockBaseOperator;
-
-  public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
+
+  private final Operator<TransferableBlock> _dataTableBlockBaseOperator;
+  private final BlockExchange _exchange;
+
+  @VisibleForTesting
+  interface BlockExchangeFactory {
+    BlockExchange build(MailboxService<TransferableBlock> mailboxService, List<MailboxIdentifier> destinations,
+        RelDistribution.Type exchange, KeySelector<Object[], Object[]> selector, BlockSplitter splitter);
+  }
+
+  @VisibleForTesting
+  interface MailboxIdGenerator {
+    MailboxIdentifier generate(ServerInstance server);
+  }
+
+  public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
       Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, String hostName, int port,
       long jobId, int stageId, boolean isLeafStageSender) {
-    _dataSchema = dataSchema;
-    _mailboxService = mailboxService;
+    this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, exchangeType, keySelector,
+        isLeafStageSender, server -> toMailboxId(server, jobId, stageId, hostName, port), BlockExchange::getExchange);
+  }
+
+  @VisibleForTesting
+  MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
+      Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
+      RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector,
+      boolean isLeafStageSender, MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory) {
     _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
-    _exchangeType = exchangeType;
-    if (_exchangeType == RelDistribution.Type.SINGLETON) {
+
+    List<MailboxIdentifier> receivingMailboxes;
+    if (exchangeType == RelDistribution.Type.SINGLETON) {
+      // TODO: this logic should be moved into SingletonExchange

Review Comment:
   I did create a `SingletonExchange` - this piece of code was confusing to me though... I wasn't really sure what it was doing and I didn't want to mess with it since there's no existing code coverage. Basically I was trying to understand why we're even iterating at all throughout the server instances instead of just throwing an exception if there's more than one?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java:
##########
@@ -49,57 +47,67 @@
  */
 public class MailboxSendOperator extends BaseOperator<TransferableBlock> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class);
+
   private static final String EXPLAIN_NAME = "MAILBOX_SEND";
   private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE =
       ImmutableSet.of(RelDistribution.Type.SINGLETON, RelDistribution.Type.RANDOM_DISTRIBUTED,
           RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED);
-  private static final Random RANDOM = new Random();
-  // TODO: Deduct this value via grpc config maximum byte size; and make it configurable with override.
-  // TODO: Max block size is a soft limit. only counts fixedSize datatable byte buffer
-  private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024;
-
-  private final List<ServerInstance> _receivingStageInstances;
-  private final RelDistribution.Type _exchangeType;
-  private final KeySelector<Object[], Object[]> _keySelector;
-  private final String _serverHostName;
-  private final int _serverPort;
-  private final long _jobId;
-  private final int _stageId;
-  private final MailboxService<TransferableBlock> _mailboxService;
-  private final DataSchema _dataSchema;
-  private final boolean _isLeafStageSender;
-  private Operator<TransferableBlock> _dataTableBlockBaseOperator;
-
-  public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
+
+  private final Operator<TransferableBlock> _dataTableBlockBaseOperator;
+  private final BlockExchange _exchange;
+
+  @VisibleForTesting
+  interface BlockExchangeFactory {
+    BlockExchange build(MailboxService<TransferableBlock> mailboxService, List<MailboxIdentifier> destinations,
+        RelDistribution.Type exchange, KeySelector<Object[], Object[]> selector, BlockSplitter splitter);
+  }
+
+  @VisibleForTesting
+  interface MailboxIdGenerator {
+    MailboxIdentifier generate(ServerInstance server);
+  }
+
+  public MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
       Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector, String hostName, int port,
       long jobId, int stageId, boolean isLeafStageSender) {
-    _dataSchema = dataSchema;
-    _mailboxService = mailboxService;
+    this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, exchangeType, keySelector,
+        isLeafStageSender, server -> toMailboxId(server, jobId, stageId, hostName, port), BlockExchange::getExchange);
+  }
+
+  @VisibleForTesting
+  MailboxSendOperator(MailboxService<TransferableBlock> mailboxService,
+      Operator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances,
+      RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> keySelector,
+      boolean isLeafStageSender, MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory) {
     _dataTableBlockBaseOperator = dataTableBlockBaseOperator;
-    _exchangeType = exchangeType;
-    if (_exchangeType == RelDistribution.Type.SINGLETON) {
+
+    List<MailboxIdentifier> receivingMailboxes;
+    if (exchangeType == RelDistribution.Type.SINGLETON) {
+      // TODO: this logic should be moved into SingletonExchange
       ServerInstance singletonInstance = null;
       for (ServerInstance serverInstance : receivingStageInstances) {
-        if (serverInstance.getHostname().equals(_mailboxService.getHostname())
-            && serverInstance.getQueryMailboxPort() == _mailboxService.getMailboxPort()) {
+        if (serverInstance.getHostname().equals(mailboxService.getHostname())
+            && serverInstance.getQueryMailboxPort() == mailboxService.getMailboxPort()) {
           Preconditions.checkState(singletonInstance == null, "multiple instance found for singleton exchange type!");
           singletonInstance = serverInstance;
         }
       }
       Preconditions.checkNotNull(singletonInstance, "Unable to find receiving instance for singleton exchange");
-      _receivingStageInstances = Collections.singletonList(singletonInstance);
+      receivingMailboxes = Collections.singletonList(mailboxIdGenerator.generate(singletonInstance));
     } else {
-      _receivingStageInstances = receivingStageInstances;
+      receivingMailboxes = receivingStageInstances
+          .stream()
+          .map(mailboxIdGenerator::generate)
+          .collect(Collectors.toList());
     }
-    _keySelector = keySelector;
-    _serverHostName = hostName;
-    _serverPort = port;
-    _jobId = jobId;
-    _stageId = stageId;
-    Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(_exchangeType),
-        String.format("Exchange type '%s' is not supported yet", _exchangeType));
-    _isLeafStageSender = isLeafStageSender;
+
+    BlockSplitter splitter = (block, type, size)
+        -> TransferableBlockUtils.splitBlock(block, type, size, isLeafStageSender);
+    _exchange = blockExchangeFactory.build(mailboxService, receivingMailboxes, exchangeType, keySelector, splitter);
+
+    Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(exchangeType),
+        String.format("Exchange type '%s' is not supported yet", exchangeType));

Review Comment:
   no good reason, this is just what it was before my refactor. I'll change that.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BlockSplitter.java:
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.blocks;
+
+import java.util.Iterator;
+import org.apache.pinot.common.datablock.BaseDataBlock;
+
+
+/**
+ * Interface for splitting transferable blocks. This is used for ensuring
+ * that the blocks that are sent along the wire play nicely with the
+ * underlying transport.
+ */
+public interface BlockSplitter {

Review Comment:
   > seems like we can also make this an inner interface in BlockExchange.java?
   
   Yeah we can do that, I left it outside because I thought there might actually be some pretty good reasons to have multiple types of block splitters (e.g. for columnar once we support that).
   
   > any chance we will use this other than test purpose?
   
   It already is - `MailboxSendOperator` already uses it to abstract away the difference between `isLeafStageSedner` and not. See:
   ```
       BlockSplitter splitter = (block, type, size)
            -> TransferableBlockUtils.splitBlock(block, type, size, isLeafStageSender);
        _exchange = blockExchangeFactory.build(mailboxService, receivingMailboxes, exchangeType, keySelector, splitter);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr merged pull request #9773: [multistage] refactor MailboxSendOperator for testability and tests

Posted by GitBox <gi...@apache.org>.
walterddr merged PR #9773:
URL: https://github.com/apache/pinot/pull/9773


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #9773: [multistage] refactor MailboxSendOperator for testability and tests

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #9773:
URL: https://github.com/apache/pinot/pull/9773#issuecomment-1309639663

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/9773?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#9773](https://codecov.io/gh/apache/pinot/pull/9773?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4a03632) into [master](https://codecov.io/gh/apache/pinot/commit/8c9af20c3feaf9b54f129b7923728fc08fa205a5?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8c9af20) will **increase** coverage by `3.62%`.
   > The diff coverage is `96.29%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #9773      +/-   ##
   ============================================
   + Coverage     64.02%   67.65%   +3.62%     
   - Complexity     4953     5192     +239     
   ============================================
     Files          1902     1457     -445     
     Lines        102463    76196   -26267     
     Branches      15587    12109    -3478     
   ============================================
   - Hits          65607    51550   -14057     
   + Misses        32088    20986   -11102     
   + Partials       4768     3660    -1108     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests1 | `67.65% <96.29%> (+0.01%)` | :arrow_up: |
   | unittests2 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/9773?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...va/org/apache/pinot/query/runtime/QueryRunner.java](https://codecov.io/gh/apache/pinot/pull/9773/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9RdWVyeVJ1bm5lci5qYXZh) | `81.37% <ø> (-0.19%)` | :arrow_down: |
   | [...query/runtime/operator/exchange/BlockExchange.java](https://codecov.io/gh/apache/pinot/pull/9773/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9leGNoYW5nZS9CbG9ja0V4Y2hhbmdlLmphdmE=) | `94.28% <94.28%> (ø)` | |
   | [...ot/query/runtime/operator/MailboxSendOperator.java](https://codecov.io/gh/apache/pinot/pull/9773/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9NYWlsYm94U2VuZE9wZXJhdG9yLmphdmE=) | `89.18% <94.44%> (+0.76%)` | :arrow_up: |
   | [...y/runtime/operator/exchange/BroadcastExchange.java](https://codecov.io/gh/apache/pinot/pull/9773/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9leGNoYW5nZS9Ccm9hZGNhc3RFeGNoYW5nZS5qYXZh) | `100.00% <100.00%> (ø)` | |
   | [.../query/runtime/operator/exchange/HashExchange.java](https://codecov.io/gh/apache/pinot/pull/9773/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9leGNoYW5nZS9IYXNoRXhjaGFuZ2UuamF2YQ==) | `100.00% <100.00%> (ø)` | |
   | [...uery/runtime/operator/exchange/RandomExchange.java](https://codecov.io/gh/apache/pinot/pull/9773/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9leGNoYW5nZS9SYW5kb21FeGNoYW5nZS5qYXZh) | `100.00% <100.00%> (ø)` | |
   | [...y/runtime/operator/exchange/SingletonExchange.java](https://codecov.io/gh/apache/pinot/pull/9773/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9leGNoYW5nZS9TaW5nbGV0b25FeGNoYW5nZS5qYXZh) | `100.00% <100.00%> (ø)` | |
   | [.../pinot/query/runtime/plan/PhysicalPlanVisitor.java](https://codecov.io/gh/apache/pinot/pull/9773/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL1BoeXNpY2FsUGxhblZpc2l0b3IuamF2YQ==) | `96.77% <100.00%> (ø)` | |
   | [...ore/query/scheduler/resources/ResourceManager.java](https://codecov.io/gh/apache/pinot/pull/9773/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9zY2hlZHVsZXIvcmVzb3VyY2VzL1Jlc291cmNlTWFuYWdlci5qYXZh) | `84.00% <0.00%> (-12.00%)` | :arrow_down: |
   | [.../pinot/core/query/scheduler/PriorityScheduler.java](https://codecov.io/gh/apache/pinot/pull/9773/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9zY2hlZHVsZXIvUHJpb3JpdHlTY2hlZHVsZXIuamF2YQ==) | `77.77% <0.00%> (-5.56%)` | :arrow_down: |
   | ... and [457 more](https://codecov.io/gh/apache/pinot/pull/9773/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org