You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/10/13 22:40:37 UTC

[GitHub] [pinot] walterddr commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

walterddr commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r995176549


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java:
##########
@@ -85,4 +97,28 @@ private boolean waitForInitialize()
   public String getMailboxId() {
     return _mailboxId;
   }
+
+  /**
+   * Converts the data sent by a {@link GrpcSendingMailbox} to a {@link TransferableBlock}.
+   *
+   * @param mailboxContent data sent by a GrpcSendingMailbox.
+   * @return null if the mailboxContent passed is null or empty. Will return an error block if the returned DataBlock
+   *         contains exceptions.
+   * @throws IOException if the MailboxContent cannot be converted to a TransferableBlock.
+   */
+  private TransferableBlock fromMailboxContent(@Nullable MailboxContent mailboxContent)
+      throws IOException {
+    if (mailboxContent == null) {
+      return null;
+    }
+    ByteBuffer byteBuffer = mailboxContent.getPayload().asReadOnlyByteBuffer();
+    if (byteBuffer.hasRemaining()) {
+      BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer);
+      if (dataBlock instanceof MetadataBlock && !dataBlock.getExceptions().isEmpty()) {
+        return TransferableBlockUtils.getErrorTransferableBlock(dataBlock.getExceptions());
+      }
+      return new TransferableBlock(dataBlock);
+    }
+    return null;
+  }

Review Comment:
   tag this as `@Nullable` return.
   on a second thought, should we ever return a null block?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryChannel.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+
+/**
+ * Used by {@link org.apache.pinot.query.mailbox.InMemoryMailboxService} for passing data between stages when sender
+ * and receiver are in the same process.
+ */
+public class InMemoryChannel<T> {
+  private final ArrayBlockingQueue<T> _channel;
+  private final String _hostname;
+  private final int _port;
+  private boolean _completed = false;
+
+  public InMemoryChannel(ArrayBlockingQueue<T> channel, String hostname, int port) {
+    _channel = channel;
+    _hostname = hostname;
+    _port = port;
+  }
+
+  public ArrayBlockingQueue<T> getChannel() {
+    return _channel;
+  }

Review Comment:
   IMO technically speaking we don't really need to keep this as a rapper of the array blocking queue b/c it can be directly coded inside the InMemoryMailboxService. 
   but i would let others also take a look and see if this abstract makes it easier to understand



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -116,26 +112,16 @@ protected TransferableBlock getNextBlock() {
       hasOpenedMailbox = false;
       for (ServerInstance sendingInstance : _sendingStageInstances) {
         try {
-          ReceivingMailbox<Mailbox.MailboxContent> receivingMailbox =
+          ReceivingMailbox<TransferableBlock> receivingMailbox =
               _mailboxService.getReceivingMailbox(toMailboxId(sendingInstance));
           // TODO this is not threadsafe.
           // make sure only one thread is checking receiving mailbox and calling receive() then close()
           if (!receivingMailbox.isClosed()) {
             hasOpenedMailbox = true;
-            Mailbox.MailboxContent mailboxContent = receivingMailbox.receive();
-            if (mailboxContent != null) {
-              ByteBuffer byteBuffer = mailboxContent.getPayload().asReadOnlyByteBuffer();
-              if (byteBuffer.hasRemaining()) {
-                BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer);
-                if (dataBlock instanceof MetadataBlock && !dataBlock.getExceptions().isEmpty()) {
-                  _upstreamErrorBlock = TransferableBlockUtils.getErrorTransferableBlock(dataBlock.getExceptions());
-                  return _upstreamErrorBlock;
-                }
-                if (dataBlock.getNumberOfRows() > 0) {
-                  // here we only return data table block when it is not empty.
-                  return new TransferableBlock(dataBlock);
-                }

Review Comment:
   this was not copied correctly over to mailbox side



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java:
##########
@@ -85,4 +97,28 @@ private boolean waitForInitialize()
   public String getMailboxId() {
     return _mailboxId;
   }
+
+  /**
+   * Converts the data sent by a {@link GrpcSendingMailbox} to a {@link TransferableBlock}.
+   *
+   * @param mailboxContent data sent by a GrpcSendingMailbox.
+   * @return null if the mailboxContent passed is null or empty. Will return an error block if the returned DataBlock
+   *         contains exceptions.
+   * @throws IOException if the MailboxContent cannot be converted to a TransferableBlock.
+   */
+  private TransferableBlock fromMailboxContent(@Nullable MailboxContent mailboxContent)
+      throws IOException {
+    if (mailboxContent == null) {
+      return null;
+    }
+    ByteBuffer byteBuffer = mailboxContent.getPayload().asReadOnlyByteBuffer();
+    if (byteBuffer.hasRemaining()) {
+      BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer);
+      if (dataBlock instanceof MetadataBlock && !dataBlock.getExceptions().isEmpty()) {
+        return TransferableBlockUtils.getErrorTransferableBlock(dataBlock.getExceptions());
+      }
+      return new TransferableBlock(dataBlock);

Review Comment:
   missing the row count check here.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java:
##########
@@ -91,7 +93,8 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, BrokerRoutingMa
         CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)),
         new WorkerManager(_reducerHostname, _reducerPort, routingManager));
     _queryDispatcher = new QueryDispatcher();
-    _mailboxService = new GrpcMailboxService(_reducerHostname, _reducerPort, config);
+    _mailboxService = new MultiplexingMailboxService(new GrpcMailboxService(_reducerHostname, _reducerPort, config),
+        new InMemoryMailboxService(_reducerHostname, _reducerPort));

Review Comment:
   multistage broker request handler shouldn't know about this. make this a multiplexingmailboxservice builder pattern
   - e.g.
   ```
   _mailboxService = MultiplexingMailboxService.newInstance(pinotConfig)
   
   // in MultiplexingMailboxService
   public static MultiplexingMailboxService newInstance(PinotConfiguration pinotConfig) {
     builder = MultiplexingMailboxService.Buidler();
     if (pinotConfig.contains(ENABLE_IN_MEMORY_MAILBOX)) {
       builder.add(new InMemoryMailboxService());
     }
     if (pintoConfig.contains(ENABLE_GRPC_MAILBOX)) {
       builder.add(new GRPCMailboxService());
     }
     return builder.build();
   }
   ```
   



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryChannel.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+
+/**
+ * Used by {@link org.apache.pinot.query.mailbox.InMemoryMailboxService} for passing data between stages when sender
+ * and receiver are in the same process.
+ */
+public class InMemoryChannel<T> {
+  private final ArrayBlockingQueue<T> _channel;
+  private final String _hostname;
+  private final int _port;
+  private boolean _completed = false;
+
+  public InMemoryChannel(ArrayBlockingQueue<T> channel, String hostname, int port) {
+    _channel = channel;
+    _hostname = hostname;
+    _port = port;
+  }
+
+  public ArrayBlockingQueue<T> getChannel() {
+    return _channel;
+  }
+
+  public void complete() {
+    _completed = true;
+  }
+
+  public boolean isCompleted() {
+    return _completed;
+  }

Review Comment:
   based on the concept of channel. this should never be completed (see why `ChannelManager` always keeps `ManagedChannel` alive in GRPC)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org