You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/15 16:08:17 UTC

[GitHub] [pinot] walterddr commented on a diff in pull request #9778: [multistage] [testing] Mailbox receive operator test

walterddr commented on code in PR #9778:
URL: https://github.com/apache/pinot/pull/9778#discussion_r1022982326


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java:
##########
@@ -0,0 +1,491 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
+
+
+public class MailboxReceiveOperatorTest {
+
+  private AutoCloseable _mocks;
+
+  @Mock
+  private ReceivingMailbox<TransferableBlock> _mailbox;
+
+  @Mock
+  private ReceivingMailbox<TransferableBlock> _mailbox2;
+
+  @Mock
+  private MailboxService<TransferableBlock> _mailboxService;
+  @Mock
+  private ServerInstance _server1;
+  @Mock
+  private ServerInstance _server2;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testReceiveTimeout()
+      throws InterruptedException {
+    MailboxReceiveOperator receiveOp =
+        new MailboxReceiveOperator(_mailboxService, new ArrayList<>(), RelDistribution.Type.SINGLETON, "test", 123, 456,
+            789, 1L);
+    Thread.sleep(1000);
+    TransferableBlock mailbox = receiveOp.nextBlock();
+    Assert.assertTrue(mailbox.isErrorBlock());
+    MetadataBlock errorBlock = (MetadataBlock) mailbox.getDataBlock();
+    Assert.assertTrue(errorBlock.getExceptions().containsKey(QueryException.EXECUTION_TIMEOUT_ERROR_CODE));
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*multiple instance "
+      + "found.*")
+  public void testReceiveSingletonMultiMatchMailboxServer() {
+
+    Mockito.when(_mailboxService.getHostname()).thenReturn("singleton");
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server1.getHostname()).thenReturn("singleton");
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server2.getHostname()).thenReturn("singleton");
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, "test", 123, 456, 789, null);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
+  public void testRangeDistributionNotSupported() {
+    Mockito.when(_mailboxService.getHostname()).thenReturn("singleton");
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server1.getHostname()).thenReturn("singleton");
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server2.getHostname()).thenReturn("singleton");
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.RANGE_DISTRIBUTED, "test", 123, 456, 789, null);
+  }
+
+  @Test
+  public void testReceiveSingletonNoMatchMailboxServer() {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = 789;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+
+    // Receive end of stream block directly when there is no match.
+    Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
+  }
+
+  @Test
+  public void testReceiveSingletonCloseMailbox() {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(true);
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    // Receive end of stream block directly when mailbox is close.
+    Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
+  }
+
+  @Test
+  public void testReceiveSingletonNullMailbox()
+      throws Exception {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    // Receive null mailbox during timeout.
+    Mockito.when(_mailbox.receive()).thenReturn(null);
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    // Receive end of stream block directly when mailbox is close.
+    Assert.assertTrue(receiveOp.nextBlock().isNoOpBlock());

Review Comment:
   comment and assert doesn't match



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -114,60 +130,49 @@ public String toExplainString() {
   protected TransferableBlock getNextBlock() {
     if (_upstreamErrorBlock != null) {
       return _upstreamErrorBlock;
-    } else if (System.nanoTime() >= _timeout) {
-      LOGGER.error("Timed out after polling mailboxes: {}", _sendingStageInstances);
+    } else if (System.nanoTime() >= _deadlineInNanoSeconds) {
+      LOGGER.error("Timed out after polling mailboxes: {}", _sendingMailbox);
       return TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
     }
 
     int startingIdx = _serverIdx;
     int openMailboxCount = 0;
-    int eosCount = 0;
-
-    for (int i = 0; i < _sendingStageInstances.size(); i++) {
-      // this implements a round-robin mailbox iterator so we don't starve any mailboxes
-      _serverIdx = (startingIdx + i) % _sendingStageInstances.size();
 
-      ServerInstance server = _sendingStageInstances.get(_serverIdx);
+    // For all non-singleton distribution, we poll from every instance to check mailbox content.
+    // TODO: Fix wasted CPU cycles on waiting for servers that are not supposed to give content.
+    for (int i = 0; i < _sendingMailbox.size(); i++) {
+      // this implements a round-robin mailbox iterator, so we don't starve any mailboxes
+      _serverIdx = (startingIdx + i) % _sendingMailbox.size();
+      MailboxIdentifier mailboxId = _sendingMailbox.get(_serverIdx);
       try {
-        ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(toMailboxId(server));
+        ReceivingMailbox<TransferableBlock> mailbox = _mailboxService.getReceivingMailbox(mailboxId);
         if (!mailbox.isClosed()) {
           openMailboxCount++;
-
           // this is blocking for 100ms and may return null
           TransferableBlock block = mailbox.receive();
+          // Get null block when pulling times out from mailbox.
           if (block != null) {
             if (block.isErrorBlock()) {
-              _upstreamErrorBlock = TransferableBlockUtils.getErrorTransferableBlock(
-                  block.getDataBlock().getExceptions());
+              _upstreamErrorBlock =
+                  TransferableBlockUtils.getErrorTransferableBlock(block.getDataBlock().getExceptions());
               return _upstreamErrorBlock;
             }
             if (!block.isEndOfStreamBlock()) {
               return block;
-            } else {
-              eosCount++;
             }
           }
         }
       } catch (Exception e) {
-        LOGGER.error(String.format("Error receiving data from mailbox %s", server), e);
+        // TODO: Handle this exception.
+        LOGGER.error(String.format("Error receiving data from mailbox %s", mailboxId), e);

Review Comment:
   note. is this related to the last test TODO?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -43,30 +44,47 @@
 /**
  * This {@code MailboxReceiveOperator} receives data from a {@link ReceivingMailbox} and serve it out from the
  * {@link BaseOperator#getNextBlock()} API.
+ *
+ *  MailboxReceiveOperator receives mailbox from mailboxService from sendingStageInstances.
+ *  We use sendingStageInstance to deduce mailboxId and fetch the content from mailboxService.
+ *  When exchangeType is Singleton, we find the mapping mailbox for the mailboxService. If not found, use empty list.
+ *  When exchangeType is non-Singleton, we pull from each instance in round-robin way to get matched mailbox content.
  */
 public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> {
   private static final Logger LOGGER = LoggerFactory.getLogger(MailboxReceiveOperator.class);
   private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE";
 
+  // TODO: Unify SUPPORTED_EXCHANGE_TYPES with MailboxSendOperator.
+  private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPES =
+      ImmutableSet.of(RelDistribution.Type.BROADCAST_DISTRIBUTED, RelDistribution.Type.HASH_DISTRIBUTED,
+          RelDistribution.Type.SINGLETON);
+
   private final MailboxService<TransferableBlock> _mailboxService;
   private final RelDistribution.Type _exchangeType;
-  private final KeySelector<Object[], Object[]> _keySelector;
-  private final List<ServerInstance> _sendingStageInstances;
-  private final DataSchema _dataSchema;
-  private final String _hostName;
-  private final int _port;
-  private final long _jobId;
-  private final int _stageId;
-  private final long _timeout;
-
+  private final List<MailboxIdentifier> _sendingMailbox;
+  private final long _deadlineInNanoSeconds;
   private int _serverIdx;
   private TransferableBlock _upstreamErrorBlock;
 
-  public MailboxReceiveOperator(MailboxService<TransferableBlock> mailboxService, DataSchema dataSchema,
-      List<ServerInstance> sendingStageInstances, RelDistribution.Type exchangeType,
-      KeySelector<Object[], Object[]> keySelector, String hostName, int port, long jobId, int stageId) {
-    _dataSchema = dataSchema;
+  private static MailboxIdentifier toMailboxId(ServerInstance fromInstance, long jobId, long stageId,
+      String receiveHostName, int receivePort) {
+    return new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), fromInstance.getHostname(),
+        fromInstance.getQueryMailboxPort(), receiveHostName, receivePort);
+  }
+
+  // TODO: Move deadlineInNanoSeconds to OperatorContext.

Review Comment:
   what is OperatorContext?



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryTestSet.java:
##########
@@ -21,12 +21,6 @@
 import org.testng.annotations.DataProvider;

Review Comment:
   revert this file i believe this is a rebase error



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java:
##########
@@ -0,0 +1,491 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.common.datablock.MetadataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.mailbox.StringMailboxIdentifier;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
+
+
+public class MailboxReceiveOperatorTest {
+
+  private AutoCloseable _mocks;
+
+  @Mock
+  private ReceivingMailbox<TransferableBlock> _mailbox;
+
+  @Mock
+  private ReceivingMailbox<TransferableBlock> _mailbox2;
+
+  @Mock
+  private MailboxService<TransferableBlock> _mailboxService;
+  @Mock
+  private ServerInstance _server1;
+  @Mock
+  private ServerInstance _server2;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testReceiveTimeout()
+      throws InterruptedException {
+    MailboxReceiveOperator receiveOp =
+        new MailboxReceiveOperator(_mailboxService, new ArrayList<>(), RelDistribution.Type.SINGLETON, "test", 123, 456,
+            789, 1L);
+    Thread.sleep(1000);
+    TransferableBlock mailbox = receiveOp.nextBlock();
+    Assert.assertTrue(mailbox.isErrorBlock());
+    MetadataBlock errorBlock = (MetadataBlock) mailbox.getDataBlock();
+    Assert.assertTrue(errorBlock.getExceptions().containsKey(QueryException.EXECUTION_TIMEOUT_ERROR_CODE));
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*multiple instance "
+      + "found.*")
+  public void testReceiveSingletonMultiMatchMailboxServer() {
+
+    Mockito.when(_mailboxService.getHostname()).thenReturn("singleton");
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server1.getHostname()).thenReturn("singleton");
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server2.getHostname()).thenReturn("singleton");
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, "test", 123, 456, 789, null);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
+  public void testRangeDistributionNotSupported() {
+    Mockito.when(_mailboxService.getHostname()).thenReturn("singleton");
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server1.getHostname()).thenReturn("singleton");
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(123);
+
+    Mockito.when(_server2.getHostname()).thenReturn("singleton");
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123);
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.RANGE_DISTRIBUTED, "test", 123, 456, 789, null);
+  }
+
+  @Test
+  public void testReceiveSingletonNoMatchMailboxServer() {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = 789;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+
+    // Receive end of stream block directly when there is no match.
+    Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
+  }
+
+  @Test
+  public void testReceiveSingletonCloseMailbox() {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(true);
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    // Receive end of stream block directly when mailbox is close.
+    Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
+  }
+
+  @Test
+  public void testReceiveSingletonNullMailbox()
+      throws Exception {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    // Receive null mailbox during timeout.
+    Mockito.when(_mailbox.receive()).thenReturn(null);
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    // Receive end of stream block directly when mailbox is close.
+    Assert.assertTrue(receiveOp.nextBlock().isNoOpBlock());
+  }
+
+  @Test
+  public void testReceiveSingletonMailbox()
+      throws Exception {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Object[] expRow = new Object[]{1, 1};
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    List<Object[]> resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow);
+  }
+
+  @Test
+  public void testReceiveSingletonErrorMailbox()
+      throws Exception {
+    String serverHost = "singleton";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    int server2port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(serverHost);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2port);
+
+    int mailboxPort = server2port;
+    Mockito.when(_mailboxService.getHostname()).thenReturn(serverHost);
+    Mockito.when(_mailboxService.getMailboxPort()).thenReturn(mailboxPort);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), serverHost, server2port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Exception e = new Exception("errorBlock");
+    Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(e));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.SINGLETON, toHost, toPort, jobId, stageId, null);
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    Assert.assertTrue(receivedBlock.isErrorBlock());
+    MetadataBlock error = (MetadataBlock) receivedBlock.getDataBlock();
+    Assert.assertEquals(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE), "errorBlock");
+  }
+
+  @Test
+  public void testReceiveMailboxFromTwoServersOneClose()
+      throws Exception {
+    String server1Host = "hash1";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    String server2Host = "hash2";
+    int server2Port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId1 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server1Host, server1Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(true);
+
+    StringMailboxIdentifier expectedMailboxId2 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server2Host, server2Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+    Object[] expRow = new Object[]{1, 1};
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, null);
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    List<Object[]> resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow);
+  }
+
+  @Test
+  public void testReceiveMailboxFromTwoServersOneNull()
+      throws Exception {
+    String server1Host = "hash1";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    String server2Host = "hash2";
+    int server2Port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    StringMailboxIdentifier expectedMailboxId1 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server1Host, server1Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Mockito.when(_mailbox.receive()).thenReturn(null);
+
+    StringMailboxIdentifier expectedMailboxId2 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server2Host, server2Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+    Object[] expRow = new Object[]{1, 1};
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, null);
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    List<Object[]> resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow);
+  }
+
+  @Test
+  public void testReceiveMailboxFromTwoServers()
+      throws Exception {
+    String server1Host = "hash1";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    String server2Host = "hash2";
+    int server2Port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    StringMailboxIdentifier expectedMailboxId1 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server1Host, server1Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Object[] expRow1 = new Object[]{1, 1};
+    Object[] expRow2 = new Object[]{2, 2};
+    Mockito.when(_mailbox.receive())
+        .thenReturn(OperatorTestUtil.block(inSchema, expRow1), OperatorTestUtil.block(inSchema, expRow2),
+            TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    Object[] expRow3 = new Object[]{3, 3};
+    StringMailboxIdentifier expectedMailboxId2 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server2Host, server2Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, null);
+    // Receive first block from first server.
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    List<Object[]> resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow1);
+    // Receive second block from first server.
+    receivedBlock = receiveOp.nextBlock();
+    resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow2);
+
+    // Receive from second server.
+    receivedBlock = receiveOp.nextBlock();
+    resultRows = receivedBlock.getContainer();
+    Assert.assertEquals(resultRows.size(), 1);
+    Assert.assertEquals(resultRows.get(0), expRow3);
+  }
+
+  @Test
+  public void testReceiveMailboxFromTwoServersOneError()
+      throws Exception {
+    String server1Host = "hash1";
+    int server1Port = 123;
+    Mockito.when(_server1.getHostname()).thenReturn(server1Host);
+    Mockito.when(_server1.getQueryMailboxPort()).thenReturn(server1Port);
+
+    String server2Host = "hash2";
+    int server2Port = 456;
+    Mockito.when(_server2.getHostname()).thenReturn(server2Host);
+    Mockito.when(_server2.getQueryMailboxPort()).thenReturn(server2Port);
+
+    int jobId = 456;
+    int stageId = 0;
+    int toPort = 8888;
+    String toHost = "toHost";
+
+    DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+    StringMailboxIdentifier expectedMailboxId1 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server1Host, server1Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox);
+    Mockito.when(_mailbox.isClosed()).thenReturn(false);
+    Mockito.when(_mailbox.receive())
+        .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("mailboxError")));
+
+    Object[] expRow3 = new Object[]{3, 3};
+    StringMailboxIdentifier expectedMailboxId2 =
+        new StringMailboxIdentifier(String.format("%s_%s", jobId, stageId), server2Host, server2Port, toHost, toPort);
+    Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2);
+    Mockito.when(_mailbox2.isClosed()).thenReturn(false);
+    Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3));
+    MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2),
+        RelDistribution.Type.HASH_DISTRIBUTED, toHost, toPort, jobId, stageId, null);
+    // Receive error block from first server.
+    TransferableBlock receivedBlock = receiveOp.nextBlock();
+    Assert.assertTrue(receivedBlock.isErrorBlock());
+    MetadataBlock error = (MetadataBlock) receivedBlock.getDataBlock();
+    Assert.assertEquals(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE), "mailboxError");
+  }
+
+  // TODO: This should probably be fixed.

Review Comment:
   can you explain what needs to be fixed?
   did you mean the mocked exception should bubble up?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org