You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/10 15:30:24 UTC

[GitHub] [pinot] 61yao opened a new pull request, #9778: [multistage] Mailbox receive operator test

61yao opened a new pull request, #9778:
URL: https://github.com/apache/pinot/pull/9778

   Add test for multistage MailboxReceiveOperator.
   Throw exception for unsupported mailbox type in constructor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on pull request #9778: [multistage] [testing] Mailbox receive operator test

Posted by GitBox <gi...@apache.org>.
61yao commented on PR #9778:
URL: https://github.com/apache/pinot/pull/9778#issuecomment-1322809141

   > test is failing on this PR can you take a look @61yao
   
   Fixed the test. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr merged pull request #9778: [multistage] [testing] Mailbox receive operator test

Posted by GitBox <gi...@apache.org>.
walterddr merged PR #9778:
URL: https://github.com/apache/pinot/pull/9778


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on pull request #9778: [multistage] [testing] Mailbox receive operator test

Posted by GitBox <gi...@apache.org>.
walterddr commented on PR #9778:
URL: https://github.com/apache/pinot/pull/9778#issuecomment-1320194773

   test is failing on this PR can you take a look @61yao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #9778: [multistage] [testing] Mailbox receive operator test

Posted by GitBox <gi...@apache.org>.
61yao commented on code in PR #9778:
URL: https://github.com/apache/pinot/pull/9778#discussion_r1023091276


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -43,30 +44,47 @@
 /**
  * This {@code MailboxReceiveOperator} receives data from a {@link ReceivingMailbox} and serve it out from the
  * {@link BaseOperator#getNextBlock()} API.
+ *
+ *  MailboxReceiveOperator receives mailbox from mailboxService from sendingStageInstances.
+ *  We use sendingStageInstance to deduce mailboxId and fetch the content from mailboxService.
+ *  When exchangeType is Singleton, we find the mapping mailbox for the mailboxService. If not found, use empty list.
+ *  When exchangeType is non-Singleton, we pull from each instance in round-robin way to get matched mailbox content.
  */
 public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxReceiveOperator.class);
   private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE";
 
+  // TODO: Unify SUPPORTED_EXCHANGE_TYPES with MailboxSendOperator.
+  private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPES =
+      ImmutableSet.of(RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED,
+          RelDistribution.Type.SINGLETON);
+
   private final MailboxService<TransferableBlock> _mailboxService;
   private final RelDistribution.Type _exchangeType;
-  private final KeySelector<Object[], Object[]> _keySelector;
-  private final List<ServerInstance> _sendingStageInstances;
-  private final DataSchema _dataSchema;
-  private final String _hostName;
-  private final int _port;
-  private final long _jobId;
-  private final int _stageId;
-  private final long _timeout;
-
+  private final List<MailboxIdentifier> _sendingMailbox;
+  private final long _deadlineInNanoSeconds;
   private int _serverIdx;
   private TransferableBlock _upstreamErrorBlock;
 
-  public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
-      List<ServerInstance> sendingStageInstances, RelDistribution.Type exchangeType,
-      KeySelector<Object[], Object[]> keySelector, String hostName, int port, long jobId, int stageId) {
-    _dataSchema = dataSchema;
+  private static MailboxIdentifier toMailboxId(ServerInstance fromInstance, long jobId, long stageId,
+      String receiveHostName, int receivePort) {
+    return new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), fromInstance.getHostname(),
+        fromInstance.getQueryMailboxPort(), receiveHostName, receivePort);
+  }
+
+  // TODO: Move deadlineInNanoSeconds to OperatorContext.

Review Comment:
   I was planning to add a struct to hold metadata info for operator  



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java:
##########
@@ -21,12 +21,6 @@
 import org.testng.annotations.DataProvider;

Review Comment:
   Done



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java:
##########
@@ -0,0 +1,491 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
+
+
+public class MailboxReceiveOperatorTest {
+
+  private AutoCloseable _mocks;
+
+  @Mock
+  private ReceivingMailbox<TransferableBlock> _mailbox;
+
+  @Mock
+  private ReceivingMailbox<TransferableBlock> _mailbox2;
+
+  @Mock
+  private MailboxService<TransferableBlock> _mailboxService;
+  @Mock
+  private ServerInstance _server1;
+  @Mock
+  private ServerInstance _server2;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testReceiveTimeout()
+      throws InterruptedException {
+    MailboxReceiveOperator receiveOp =
+        new MailboxReceiveOperator(_mailboxService, new ArrayList<>(), RelDistribution.Type.SINGLETON, "test", 123, 456,
+            789, 1L);
+    Thread.sleep(1000);
+    TransferableBlock mailbox = receiveOp.nextBlock();
+    Assert.assertTrue(mailbox.isErrorBlock());
+    MetadataBlock errorBlock = (MetadataBlock) mailbox.getDataBlock();
+    Assert.assertTrue(errorBlock.getExceptions().containsKey(QueryException.EXECUTION_TIMEOUT_ERROR_CODE));
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*multiple instance "
+      + "found.*")
+  public void testReceiveSingletonMultiMatchMailboxServer() {
+
+    Mockito.when(_mailboxService.getHostname()).thenReturn("singleton");
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server1.getHostname()).thenReturn("singleton");
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server2.getHostname()).thenReturn("singleton");
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, "test", 123, 456, 789, null);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
+  public void testRangeDistributionNotSupported() {
+    Mockito.when(_mailboxService.getHostname()).thenReturn("singleton");
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server1.getHostname()).thenReturn("singleton");
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server2.getHostname()).thenReturn("singleton");
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.RANGE_DISTRIBUTED, "test", 123, 456, 789, null);
+  }
+
+  @Test
+  public void testReceiveSingletonNoMatchMailboxServer() {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = 789;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+
+    // Receive end of stream block directly when there is no match.
+    Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
+  }
+
+  @Test
+  public void testReceiveSingletonCloseMailbox() {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(true);
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    // Receive end of stream block directly when mailbox is close.
+    Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
+  }
+
+  @Test
+  public void testReceiveSingletonNullMailbox()
+      throws Exception {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    // Receive null mailbox during timeout.
+    Mockito.when(_mailbox.receive()).thenReturn(null);
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    // Receive end of stream block directly when mailbox is close.
+    Assert.assertTrue(receiveOp.nextBlock().isNoOpBlock());
+  }
+
+  @Test
+  public void testReceiveSingletonMailbox()
+      throws Exception {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Object[] expRow = new Object[]{1, 1};
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    List<Object[]> resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow);
+  }
+
+  @Test
+  public void testReceiveSingletonErrorMailbox()
+      throws Exception {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Exception e = new Exception("errorBlock");
+    Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(e));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    Assert.assertTrue(receivedBlock.isErrorBlock());
+    MetadataBlock error = (MetadataBlock) receivedBlock.getDataBlock();
+    Assert.assertEquals(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE), "errorBlock");
+  }
+
+  @Test
+  public void testReceiveMailboxFromTwoServersOneClose()
+      throws Exception {
+    String server1Host = "hash1";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    String server2Host = "hash2";
+    int server2Port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId1 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server1Host, server1Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(true);
+
+    StringMailboxIdentifier expectedMailboxId2 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server2Host, server2Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+    Object[] expRow = new Object[]{1, 1};
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, null);
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    List<Object[]> resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow);
+  }
+
+  @Test
+  public void testReceiveMailboxFromTwoServersOneNull()
+      throws Exception {
+    String server1Host = "hash1";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    String server2Host = "hash2";
+    int server2Port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId1 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server1Host, server1Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Mockito.when(_mailbox.receive()).thenReturn(null);
+
+    StringMailboxIdentifier expectedMailboxId2 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server2Host, server2Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+    Object[] expRow = new Object[]{1, 1};
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, null);
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    List<Object[]> resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow);
+  }
+
+  @Test
+  public void testReceiveMailboxFromTwoServers()
+      throws Exception {
+    String server1Host = "hash1";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    String server2Host = "hash2";
+    int server2Port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    StringMailboxIdentifier expectedMailboxId1 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server1Host, server1Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Object[] expRow1 = new Object[]{1, 1};
+    Object[] expRow2 = new Object[]{2, 2};
+    Mockito.when(_mailbox.receive())
+        .thenReturn(OperatorTestUtil.block(inSchema, expRow1), OperatorTestUtil.block(inSchema, expRow2),
+            TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    Object[] expRow3 = new Object[]{3, 3};
+    StringMailboxIdentifier expectedMailboxId2 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server2Host, server2Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, null);
+    // Receive first block from first server.
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    List<Object[]> resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow1);
+    // Receive second block from first server.
+    receivedBlock = receiveOp.nextBlock();
+    resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow2);
+
+    // Receive from second server.
+    receivedBlock = receiveOp.nextBlock();
+    resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow3);
+  }
+
+  @Test
+  public void testReceiveMailboxFromTwoServersOneError()
+      throws Exception {
+    String server1Host = "hash1";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    String server2Host = "hash2";
+    int server2Port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    StringMailboxIdentifier expectedMailboxId1 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server1Host, server1Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Mockito.when(_mailbox.receive())
+        .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("mailboxError")));
+
+    Object[] expRow3 = new Object[]{3, 3};
+    StringMailboxIdentifier expectedMailboxId2 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server2Host, server2Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, null);
+    // Receive error block from first server.
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    Assert.assertTrue(receivedBlock.isErrorBlock());
+    MetadataBlock error = (MetadataBlock) receivedBlock.getDataBlock();
+    Assert.assertEquals(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE), "mailboxError");
+  }
+
+  // TODO: This should probably be fixed.

Review Comment:
   yes



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java:
##########
@@ -0,0 +1,491 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
+
+
+public class MailboxReceiveOperatorTest {
+
+  private AutoCloseable _mocks;
+
+  @Mock
+  private ReceivingMailbox<TransferableBlock> _mailbox;
+
+  @Mock
+  private ReceivingMailbox<TransferableBlock> _mailbox2;
+
+  @Mock
+  private MailboxService<TransferableBlock> _mailboxService;
+  @Mock
+  private ServerInstance _server1;
+  @Mock
+  private ServerInstance _server2;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testReceiveTimeout()
+      throws InterruptedException {
+    MailboxReceiveOperator receiveOp =
+        new MailboxReceiveOperator(_mailboxService, new ArrayList<>(), RelDistribution.Type.SINGLETON, "test", 123, 456,
+            789, 1L);
+    Thread.sleep(1000);
+    TransferableBlock mailbox = receiveOp.nextBlock();
+    Assert.assertTrue(mailbox.isErrorBlock());
+    MetadataBlock errorBlock = (MetadataBlock) mailbox.getDataBlock();
+    Assert.assertTrue(errorBlock.getExceptions().containsKey(QueryException.EXECUTION_TIMEOUT_ERROR_CODE));
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*multiple instance "
+      + "found.*")
+  public void testReceiveSingletonMultiMatchMailboxServer() {
+
+    Mockito.when(_mailboxService.getHostname()).thenReturn("singleton");
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server1.getHostname()).thenReturn("singleton");
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server2.getHostname()).thenReturn("singleton");
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, "test", 123, 456, 789, null);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
+  public void testRangeDistributionNotSupported() {
+    Mockito.when(_mailboxService.getHostname()).thenReturn("singleton");
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server1.getHostname()).thenReturn("singleton");
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server2.getHostname()).thenReturn("singleton");
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.RANGE_DISTRIBUTED, "test", 123, 456, 789, null);
+  }
+
+  @Test
+  public void testReceiveSingletonNoMatchMailboxServer() {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = 789;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+
+    // Receive end of stream block directly when there is no match.
+    Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
+  }
+
+  @Test
+  public void testReceiveSingletonCloseMailbox() {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(true);
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    // Receive end of stream block directly when mailbox is close.
+    Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
+  }
+
+  @Test
+  public void testReceiveSingletonNullMailbox()
+      throws Exception {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    // Receive null mailbox during timeout.
+    Mockito.when(_mailbox.receive()).thenReturn(null);
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    // Receive end of stream block directly when mailbox is close.
+    Assert.assertTrue(receiveOp.nextBlock().isNoOpBlock());

Review Comment:
   done



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -114,60 +130,49 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
-    } else if (System.nanoTime() >= _timeout) {
-      LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances);
+    } else if (System.nanoTime() >= _deadlineInNanoSeconds) {
+      LOGGER.error("Timed out after polling mailboxes: {}", _sendingMailbox);
       return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
     }
 
     int startingIdx = _serverIdx;
     int openMailboxCount = 0;
-    int eosCount = 0;
-
-    for (int i = 0; i < _sendingStageInstances.size(); i++) {
-      // this implements a round-robin mailbox iterator so we don't starve any mailboxes
-      _serverIdx = (startingIdx + i) % _sendingStageInstances.size();
 
-      ServerInstance server = _sendingStageInstances.get(_serverIdx);
+    // For all non-singleton distribution, we poll from every instance to check mailbox content.
+    // TODO: Fix wasted CPU cycles on waiting for servers that are not supposed to give content.
+    for (int i = 0; i < _sendingMailbox.size(); i++) {
+      // this implements a round-robin mailbox iterator, so we don't starve any mailboxes
+      _serverIdx = (startingIdx + i) % _sendingMailbox.size();
+      MailboxIdentifier mailboxId = _sendingMailbox.get(_serverIdx);
       try {
-        ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(toMailboxId(server));
+        ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(mailboxId);
         if (!mailbox.isClosed()) {
           openMailboxCount++;
-
           // this is blocking for 100ms and may return null
           TransferableBlock block = mailbox.receive();
+          // Get null block when pulling times out from mailbox.
           if (block != null) {
             if (block.isErrorBlock()) {
-              _upstreamErrorBlock = TransferableBlockUtils.getErrorTransferableBlock(
-                  block.getDataBlock().getExceptions());
+              _upstreamErrorBlock =
+                  TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
               return _upstreamErrorBlock;
             }
             if (!block.isEndOfStreamBlock()) {
               return block;
-            } else {
-              eosCount++;
             }
           }
         }
       } catch (Exception e) {
-        LOGGER.error(String.format("Error receiving data from mailbox %s", server), e);
+        // TODO: Handle this exception.
+        LOGGER.error(String.format("Error receiving data from mailbox %s", mailboxId), e);

Review Comment:
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #9778: [multistage] [testing] Mailbox receive operator test

Posted by GitBox <gi...@apache.org>.
61yao commented on code in PR #9778:
URL: https://github.com/apache/pinot/pull/9778#discussion_r1022381315


##########
pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java:
##########
@@ -65,6 +65,7 @@ public void onMatch(RelOptRuleCall call) {
 
     if (joinInfo.leftKeys.isEmpty()) {
       // when there's no JOIN key, use broadcast.
+      // TODO: Double check broadcast distribution won't cause problems

Review Comment:
   I haven't figured out yet. Removed the comment for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] 61yao commented on a diff in pull request #9778: [multistage] [testing] Mailbox receive operator test

Posted by GitBox <gi...@apache.org>.
61yao commented on code in PR #9778:
URL: https://github.com/apache/pinot/pull/9778#discussion_r1022381083


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -50,23 +48,26 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
 
   private final MailboxService<TransferableBlock> _mailboxService;
   private final RelDistribution.Type _exchangeType;
-  private final KeySelector<Object[], Object[]> _keySelector;
   private final List<ServerInstance> _sendingStageInstances;
-  private final DataSchema _dataSchema;
   private final String _hostName;
   private final int _port;
   private final long _jobId;
   private final int _stageId;
   private final long _timeout;
-
   private int _serverIdx;
   private TransferableBlock _upstreamErrorBlock;
 
-  public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
-      List<ServerInstance> sendingStageInstances, RelDistribution.Type exchangeType,
-      KeySelector<Object[], Object[]> keySelector, String hostName, int port, long jobId, int stageId) {
-    _dataSchema = dataSchema;
+  public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService,
+      List<ServerInstance> sendingStageInstances, RelDistribution.Type exchangeType, String hostName, int port,
+      long jobId, int stageId) {
     _mailboxService = mailboxService;
+    Preconditions.checkState(exchangeType != RelDistribution.Type.RANDOM_DISTRIBUTED,
+        "Random distribution is not supported");
+    Preconditions.checkState(exchangeType != RelDistribution.Type.ANY, "Any distribution is not supported");
+    Preconditions.checkState(exchangeType != RelDistribution.Type.RANGE_DISTRIBUTED,
+        "Range distribution is not supported");
+    Preconditions.checkState(exchangeType != RelDistribution.Type.ROUND_ROBIN_DISTRIBUTED,
+        "Round robin distribution is not supported");

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9778: [multistage] [testing] Mailbox receive operator test

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9778:
URL: https://github.com/apache/pinot/pull/9778#discussion_r1022982326


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java:
##########
@@ -0,0 +1,491 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
+
+
+public class MailboxReceiveOperatorTest {
+
+  private AutoCloseable _mocks;
+
+  @Mock
+  private ReceivingMailbox<TransferableBlock> _mailbox;
+
+  @Mock
+  private ReceivingMailbox<TransferableBlock> _mailbox2;
+
+  @Mock
+  private MailboxService<TransferableBlock> _mailboxService;
+  @Mock
+  private ServerInstance _server1;
+  @Mock
+  private ServerInstance _server2;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testReceiveTimeout()
+      throws InterruptedException {
+    MailboxReceiveOperator receiveOp =
+        new MailboxReceiveOperator(_mailboxService, new ArrayList<>(), RelDistribution.Type.SINGLETON, "test", 123, 456,
+            789, 1L);
+    Thread.sleep(1000);
+    TransferableBlock mailbox = receiveOp.nextBlock();
+    Assert.assertTrue(mailbox.isErrorBlock());
+    MetadataBlock errorBlock = (MetadataBlock) mailbox.getDataBlock();
+    Assert.assertTrue(errorBlock.getExceptions().containsKey(QueryException.EXECUTION_TIMEOUT_ERROR_CODE));
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*multiple instance "
+      + "found.*")
+  public void testReceiveSingletonMultiMatchMailboxServer() {
+
+    Mockito.when(_mailboxService.getHostname()).thenReturn("singleton");
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server1.getHostname()).thenReturn("singleton");
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server2.getHostname()).thenReturn("singleton");
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, "test", 123, 456, 789, null);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
+  public void testRangeDistributionNotSupported() {
+    Mockito.when(_mailboxService.getHostname()).thenReturn("singleton");
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server1.getHostname()).thenReturn("singleton");
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server2.getHostname()).thenReturn("singleton");
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.RANGE_DISTRIBUTED, "test", 123, 456, 789, null);
+  }
+
+  @Test
+  public void testReceiveSingletonNoMatchMailboxServer() {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = 789;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+
+    // Receive end of stream block directly when there is no match.
+    Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
+  }
+
+  @Test
+  public void testReceiveSingletonCloseMailbox() {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(true);
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    // Receive end of stream block directly when mailbox is close.
+    Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
+  }
+
+  @Test
+  public void testReceiveSingletonNullMailbox()
+      throws Exception {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    // Receive null mailbox during timeout.
+    Mockito.when(_mailbox.receive()).thenReturn(null);
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    // Receive end of stream block directly when mailbox is close.
+    Assert.assertTrue(receiveOp.nextBlock().isNoOpBlock());

Review Comment:
   comment and assert doesn't match



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -114,60 +130,49 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
-    } else if (System.nanoTime() >= _timeout) {
-      LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances);
+    } else if (System.nanoTime() >= _deadlineInNanoSeconds) {
+      LOGGER.error("Timed out after polling mailboxes: {}", _sendingMailbox);
       return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
     }
 
     int startingIdx = _serverIdx;
     int openMailboxCount = 0;
-    int eosCount = 0;
-
-    for (int i = 0; i < _sendingStageInstances.size(); i++) {
-      // this implements a round-robin mailbox iterator so we don't starve any mailboxes
-      _serverIdx = (startingIdx + i) % _sendingStageInstances.size();
 
-      ServerInstance server = _sendingStageInstances.get(_serverIdx);
+    // For all non-singleton distribution, we poll from every instance to check mailbox content.
+    // TODO: Fix wasted CPU cycles on waiting for servers that are not supposed to give content.
+    for (int i = 0; i < _sendingMailbox.size(); i++) {
+      // this implements a round-robin mailbox iterator, so we don't starve any mailboxes
+      _serverIdx = (startingIdx + i) % _sendingMailbox.size();
+      MailboxIdentifier mailboxId = _sendingMailbox.get(_serverIdx);
       try {
-        ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(toMailboxId(server));
+        ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(mailboxId);
         if (!mailbox.isClosed()) {
           openMailboxCount++;
-
           // this is blocking for 100ms and may return null
           TransferableBlock block = mailbox.receive();
+          // Get null block when pulling times out from mailbox.
           if (block != null) {
             if (block.isErrorBlock()) {
-              _upstreamErrorBlock = TransferableBlockUtils.getErrorTransferableBlock(
-                  block.getDataBlock().getExceptions());
+              _upstreamErrorBlock =
+                  TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
               return _upstreamErrorBlock;
             }
             if (!block.isEndOfStreamBlock()) {
               return block;
-            } else {
-              eosCount++;
             }
           }
         }
       } catch (Exception e) {
-        LOGGER.error(String.format("Error receiving data from mailbox %s", server), e);
+        // TODO: Handle this exception.
+        LOGGER.error(String.format("Error receiving data from mailbox %s", mailboxId), e);

Review Comment:
   note. is this related to the last test TODO?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -43,30 +44,47 @@
 /**
  * This {@code MailboxReceiveOperator} receives data from a {@link ReceivingMailbox} and serve it out from the
  * {@link BaseOperator#getNextBlock()} API.
+ *
+ *  MailboxReceiveOperator receives mailbox from mailboxService from sendingStageInstances.
+ *  We use sendingStageInstance to deduce mailboxId and fetch the content from mailboxService.
+ *  When exchangeType is Singleton, we find the mapping mailbox for the mailboxService. If not found, use empty list.
+ *  When exchangeType is non-Singleton, we pull from each instance in round-robin way to get matched mailbox content.
  */
 public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxReceiveOperator.class);
   private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE";
 
+  // TODO: Unify SUPPORTED_EXCHANGE_TYPES with MailboxSendOperator.
+  private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPES =
+      ImmutableSet.of(RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED,
+          RelDistribution.Type.SINGLETON);
+
   private final MailboxService<TransferableBlock> _mailboxService;
   private final RelDistribution.Type _exchangeType;
-  private final KeySelector<Object[], Object[]> _keySelector;
-  private final List<ServerInstance> _sendingStageInstances;
-  private final DataSchema _dataSchema;
-  private final String _hostName;
-  private final int _port;
-  private final long _jobId;
-  private final int _stageId;
-  private final long _timeout;
-
+  private final List<MailboxIdentifier> _sendingMailbox;
+  private final long _deadlineInNanoSeconds;
   private int _serverIdx;
   private TransferableBlock _upstreamErrorBlock;
 
-  public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
-      List<ServerInstance> sendingStageInstances, RelDistribution.Type exchangeType,
-      KeySelector<Object[], Object[]> keySelector, String hostName, int port, long jobId, int stageId) {
-    _dataSchema = dataSchema;
+  private static MailboxIdentifier toMailboxId(ServerInstance fromInstance, long jobId, long stageId,
+      String receiveHostName, int receivePort) {
+    return new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), fromInstance.getHostname(),
+        fromInstance.getQueryMailboxPort(), receiveHostName, receivePort);
+  }
+
+  // TODO: Move deadlineInNanoSeconds to OperatorContext.

Review Comment:
   what is OperatorContext?



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java:
##########
@@ -21,12 +21,6 @@
 import org.testng.annotations.DataProvider;

Review Comment:
   revert this file i believe this is a rebase error



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java:
##########
@@ -0,0 +1,491 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
+
+
+public class MailboxReceiveOperatorTest {
+
+  private AutoCloseable _mocks;
+
+  @Mock
+  private ReceivingMailbox<TransferableBlock> _mailbox;
+
+  @Mock
+  private ReceivingMailbox<TransferableBlock> _mailbox2;
+
+  @Mock
+  private MailboxService<TransferableBlock> _mailboxService;
+  @Mock
+  private ServerInstance _server1;
+  @Mock
+  private ServerInstance _server2;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testReceiveTimeout()
+      throws InterruptedException {
+    MailboxReceiveOperator receiveOp =
+        new MailboxReceiveOperator(_mailboxService, new ArrayList<>(), RelDistribution.Type.SINGLETON, "test", 123, 456,
+            789, 1L);
+    Thread.sleep(1000);
+    TransferableBlock mailbox = receiveOp.nextBlock();
+    Assert.assertTrue(mailbox.isErrorBlock());
+    MetadataBlock errorBlock = (MetadataBlock) mailbox.getDataBlock();
+    Assert.assertTrue(errorBlock.getExceptions().containsKey(QueryException.EXECUTION_TIMEOUT_ERROR_CODE));
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*multiple instance "
+      + "found.*")
+  public void testReceiveSingletonMultiMatchMailboxServer() {
+
+    Mockito.when(_mailboxService.getHostname()).thenReturn("singleton");
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server1.getHostname()).thenReturn("singleton");
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server2.getHostname()).thenReturn("singleton");
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, "test", 123, 456, 789, null);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
+  public void testRangeDistributionNotSupported() {
+    Mockito.when(_mailboxService.getHostname()).thenReturn("singleton");
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server1.getHostname()).thenReturn("singleton");
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server2.getHostname()).thenReturn("singleton");
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.RANGE_DISTRIBUTED, "test", 123, 456, 789, null);
+  }
+
+  @Test
+  public void testReceiveSingletonNoMatchMailboxServer() {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = 789;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+
+    // Receive end of stream block directly when there is no match.
+    Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
+  }
+
+  @Test
+  public void testReceiveSingletonCloseMailbox() {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(true);
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    // Receive end of stream block directly when mailbox is close.
+    Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
+  }
+
+  @Test
+  public void testReceiveSingletonNullMailbox()
+      throws Exception {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    // Receive null mailbox during timeout.
+    Mockito.when(_mailbox.receive()).thenReturn(null);
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    // Receive end of stream block directly when mailbox is close.
+    Assert.assertTrue(receiveOp.nextBlock().isNoOpBlock());
+  }
+
+  @Test
+  public void testReceiveSingletonMailbox()
+      throws Exception {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Object[] expRow = new Object[]{1, 1};
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    List<Object[]> resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow);
+  }
+
+  @Test
+  public void testReceiveSingletonErrorMailbox()
+      throws Exception {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Exception e = new Exception("errorBlock");
+    Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(e));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    Assert.assertTrue(receivedBlock.isErrorBlock());
+    MetadataBlock error = (MetadataBlock) receivedBlock.getDataBlock();
+    Assert.assertEquals(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE), "errorBlock");
+  }
+
+  @Test
+  public void testReceiveMailboxFromTwoServersOneClose()
+      throws Exception {
+    String server1Host = "hash1";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    String server2Host = "hash2";
+    int server2Port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId1 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server1Host, server1Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(true);
+
+    StringMailboxIdentifier expectedMailboxId2 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server2Host, server2Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+    Object[] expRow = new Object[]{1, 1};
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, null);
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    List<Object[]> resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow);
+  }
+
+  @Test
+  public void testReceiveMailboxFromTwoServersOneNull()
+      throws Exception {
+    String server1Host = "hash1";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    String server2Host = "hash2";
+    int server2Port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId1 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server1Host, server1Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Mockito.when(_mailbox.receive()).thenReturn(null);
+
+    StringMailboxIdentifier expectedMailboxId2 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server2Host, server2Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+    Object[] expRow = new Object[]{1, 1};
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, null);
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    List<Object[]> resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow);
+  }
+
+  @Test
+  public void testReceiveMailboxFromTwoServers()
+      throws Exception {
+    String server1Host = "hash1";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    String server2Host = "hash2";
+    int server2Port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    StringMailboxIdentifier expectedMailboxId1 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server1Host, server1Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Object[] expRow1 = new Object[]{1, 1};
+    Object[] expRow2 = new Object[]{2, 2};
+    Mockito.when(_mailbox.receive())
+        .thenReturn(OperatorTestUtil.block(inSchema, expRow1), OperatorTestUtil.block(inSchema, expRow2),
+            TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    Object[] expRow3 = new Object[]{3, 3};
+    StringMailboxIdentifier expectedMailboxId2 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server2Host, server2Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, null);
+    // Receive first block from first server.
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    List<Object[]> resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow1);
+    // Receive second block from first server.
+    receivedBlock = receiveOp.nextBlock();
+    resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow2);
+
+    // Receive from second server.
+    receivedBlock = receiveOp.nextBlock();
+    resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow3);
+  }
+
+  @Test
+  public void testReceiveMailboxFromTwoServersOneError()
+      throws Exception {
+    String server1Host = "hash1";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    String server2Host = "hash2";
+    int server2Port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    StringMailboxIdentifier expectedMailboxId1 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server1Host, server1Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Mockito.when(_mailbox.receive())
+        .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("mailboxError")));
+
+    Object[] expRow3 = new Object[]{3, 3};
+    StringMailboxIdentifier expectedMailboxId2 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server2Host, server2Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, null);
+    // Receive error block from first server.
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    Assert.assertTrue(receivedBlock.isErrorBlock());
+    MetadataBlock error = (MetadataBlock) receivedBlock.getDataBlock();
+    Assert.assertEquals(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE), "mailboxError");
+  }
+
+  // TODO: This should probably be fixed.

Review Comment:
   can you explain what needs to be fixed?
   did you mean the mocked exception should bubble up?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9778: [multistage] Mailbox receive operator test

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9778:
URL: https://github.com/apache/pinot/pull/9778#discussion_r1019449248


##########
pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java:
##########
@@ -65,6 +65,7 @@ public void onMatch(RelOptRuleCall call) {
 
     if (joinInfo.leftKeys.isEmpty()) {
       // when there's no JOIN key, use broadcast.
+      // TODO: Double check broadcast distribution won't cause problems

Review Comment:
   could you explain what kind of problem?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -50,23 +48,26 @@ public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
 
   private final MailboxService<TransferableBlock> _mailboxService;
   private final RelDistribution.Type _exchangeType;
-  private final KeySelector<Object[], Object[]> _keySelector;
   private final List<ServerInstance> _sendingStageInstances;
-  private final DataSchema _dataSchema;
   private final String _hostName;
   private final int _port;
   private final long _jobId;
   private final int _stageId;
   private final long _timeout;
-
   private int _serverIdx;
   private TransferableBlock _upstreamErrorBlock;
 
-  public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
-      List<ServerInstance> sendingStageInstances, RelDistribution.Type exchangeType,
-      KeySelector<Object[], Object[]> keySelector, String hostName, int port, long jobId, int stageId) {
-    _dataSchema = dataSchema;
+  public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService,
+      List<ServerInstance> sendingStageInstances, RelDistribution.Type exchangeType, String hostName, int port,
+      long jobId, int stageId) {
     _mailboxService = mailboxService;
+    Preconditions.checkState(exchangeType != RelDistribution.Type.RANDOM_DISTRIBUTED,
+        "Random distribution is not supported");
+    Preconditions.checkState(exchangeType != RelDistribution.Type.ANY, "Any distribution is not supported");
+    Preconditions.checkState(exchangeType != RelDistribution.Type.RANGE_DISTRIBUTED,
+        "Range distribution is not supported");
+    Preconditions.checkState(exchangeType != RelDistribution.Type.ROUND_ROBIN_DISTRIBUTED,
+        "Round robin distribution is not supported");

Review Comment:
   i would say add a static SUPPORTED_EXCHANGE as `private static final Set` and assert exchange type is contained in the set
   ```suggestion
   private static final Set<ExchangeType> SUPPORTED_EXCHANGE_TYPES = ImmutableSet.of(BROADCAST, HASH, SINGELTON);
   // ...
       Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType), "Exchange/Distribution type: " + exchangeType + " is not supported!");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #9778: [multistage] [testing] Mailbox receive operator test

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #9778:
URL: https://github.com/apache/pinot/pull/9778#issuecomment-1322804800

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/9778?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#9778](https://codecov.io/gh/apache/pinot/pull/9778?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (320221c) into [master](https://codecov.io/gh/apache/pinot/commit/20720366c4bff1de142db2b9a042748ebe2fc378?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2072036) will **decrease** coverage by `48.45%`.
   > The diff coverage is `100.00%`.
   
   > :exclamation: Current head 320221c differs from pull request most recent head 1c275f0. Consider uploading reports for the commit 1c275f0 to get more accurate results
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #9778       +/-   ##
   =============================================
   - Coverage     64.29%   15.84%   -48.46%     
   + Complexity     4977      175     -4802     
   =============================================
     Files          1912     1912               
     Lines        102699   102696        -3     
     Branches      15620    15622        +2     
   =============================================
   - Hits          66035    16268    -49767     
   - Misses        31882    85236    +53354     
   + Partials       4782     1192     -3590     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests1 | `?` | |
   | unittests2 | `15.84% <100.00%> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/9778?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../apache/pinot/common/datablock/DataBlockUtils.java](https://codecov.io/gh/apache/pinot/pull/9778/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZGF0YWJsb2NrL0RhdGFCbG9ja1V0aWxzLmphdmE=) | `0.00% <ø> (-87.62%)` | :arrow_down: |
   | [...rg/apache/pinot/query/service/QueryDispatcher.java](https://codecov.io/gh/apache/pinot/pull/9778/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvc2VydmljZS9RdWVyeURpc3BhdGNoZXIuamF2YQ==) | `84.69% <ø> (ø)` | |
   | [...query/runtime/operator/MailboxReceiveOperator.java](https://codecov.io/gh/apache/pinot/pull/9778/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9NYWlsYm94UmVjZWl2ZU9wZXJhdG9yLmphdmE=) | `91.66% <100.00%> (+12.30%)` | :arrow_up: |
   | [.../pinot/query/runtime/plan/PhysicalPlanVisitor.java](https://codecov.io/gh/apache/pinot/pull/9778/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL1BoeXNpY2FsUGxhblZpc2l0b3IuamF2YQ==) | `96.87% <100.00%> (ø)` | |
   | [...src/main/java/org/apache/pinot/sql/FilterKind.java](https://codecov.io/gh/apache/pinot/pull/9778/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcWwvRmlsdGVyS2luZC5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ain/java/org/apache/pinot/core/data/table/Key.java](https://codecov.io/gh/apache/pinot/pull/9778/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL0tleS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...in/java/org/apache/pinot/spi/utils/BytesUtils.java](https://codecov.io/gh/apache/pinot/pull/9778/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQnl0ZXNVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...n/java/org/apache/pinot/core/data/table/Table.java](https://codecov.io/gh/apache/pinot/pull/9778/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL1RhYmxlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/core/data/table/Record.java](https://codecov.io/gh/apache/pinot/pull/9778/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL1JlY29yZC5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/core/util/GroupByUtils.java](https://codecov.io/gh/apache/pinot/pull/9778/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS91dGlsL0dyb3VwQnlVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [1209 more](https://codecov.io/gh/apache/pinot/pull/9778/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org