You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/09/28 22:57:25 UTC

[GitHub] [pinot] ankitsultana opened a new pull request, #9484: [Draft] [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

ankitsultana opened a new pull request, #9484:
URL: https://github.com/apache/pinot/pull/9484

   WIP. This does the following:
   
   1. `MailboxService` uses `TransferableBlock`. The serde to `MailboxContent` is moved within GrpcMailbox.
   2. Adds a new `InMemoryMailboxService`. If the stage is local, then the `TransferableBlock` can be passed from sender to receiver simply using a queue.
   3. Adds a `MultiplexingMailboxService` which can choose either gRPC based mailbox-service or in-memory mailbox-service.
   
   Optimizations:
   
   1. This avoids MailboxContent serde if sender/receiver stages are local. This can save ~10% latency overhead (based on the query).
   2. Saves memory. If we use gRPC mailbox then we'll copy the buffer to create MailboxContent.
   3. [Not part of this PR] This will help avoid List<Object[]> _rows ==> DataTable ==> List<Object[]> _rows serde.
   
   These optimizations are most helpful if the stages deal with large data (Amdahl's basically)
   
   cc: @walterddr @siddharthteotia 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r998761956


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
+  private final BlockingQueue<TransferableBlock> _queue;
+  private final String _mailboxId;
+
+  public InMemorySendingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public void send(TransferableBlock data)
+      throws UnsupportedOperationException {
+    try {
+      if (!_queue.offer(
+          data, InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+        throw new RuntimeException(String.format("Timed out when sending block in mailbox=%s", _mailboxId));
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Interrupted trying to send data through the channel", e);
+    }
+  }
+
+  @Override
+  public void complete() {

Review Comment:
   it is guarantee that the last message is the EOS message which serves the purpose of marking it "closed/completed".
   
   this is probably also the reason why the close flag didn't set to volatile b/c the thread that process the EOS content is also the one that sets the flag (not the otherway around)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r998763054


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
+  private final BlockingQueue<TransferableBlock> _queue;
+  private final String _mailboxId;
+
+  public InMemorySendingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public void send(TransferableBlock data)
+      throws UnsupportedOperationException {

Review Comment:
   yeah interface added this exception b/c it is possible to send some data block format that we don't support for now. we can keep this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r998757505


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryMailboxService implements MailboxService<TransferableBlock> {
+  // channel manager
+  private final String _hostname;
+  private final int _mailboxPort;
+  static final int DEFAULT_CHANNEL_CAPACITY = 5;
+  // TODO: This should come from a config and should be consistent with the timeout for GrpcMailboxService
+  static final int DEFAULT_CHANNEL_TIMEOUT_SECONDS = 120;
+
+  // maintaining a list of registered mailboxes.
+  private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, SendingMailbox<TransferableBlock>> _sendingMailboxMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ArrayBlockingQueue<TransferableBlock>> _channelMap =
+      new ConcurrentHashMap<>();
+
+  public InMemoryMailboxService(String hostname, int mailboxPort) {
+    _hostname = hostname;
+    _mailboxPort = mailboxPort;
+  }
+
+  @Override
+  public void start() {
+  }
+
+  @Override
+  public void shutdown() {

Review Comment:
   yeah we had a discussion and i created a tracker but never get to it.
   
   but upon thinking although the affect is not much (just a simple object reference), it would be a big problem when stuff error out, see my comment in the new issue: https://github.com/apache/pinot/issues/9626



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr merged pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
walterddr merged PR #9484:
URL: https://github.com/apache/pinot/pull/9484


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r998761673


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryReceivingMailbox implements ReceivingMailbox<TransferableBlock> {
+  private final String _mailboxId;
+  private final BlockingQueue<TransferableBlock> _queue;
+  private boolean _closed;
+
+  public InMemoryReceivingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+    _closed = false;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public TransferableBlock receive()
+      throws Exception {
+    TransferableBlock block = _queue.poll(
+        InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    if (block == null) {
+      throw new RuntimeException(String.format("Timed out waiting for data block on mailbox=%s", _mailboxId));
+    }
+    if (block.isEndOfStreamBlock()) {
+      _closed = true;
+    }
+    return block;
+  }
+
+  @Override
+  public boolean isInitialized() {
+    return true;
+  }
+
+  @Override
+  public boolean isClosed() {
+    return _closed && _queue.size() == 0;

Review Comment:
   in GRPC isClosed() <=> isCompleted(); i think the wording might be misleading in this case



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
+  private final BlockingQueue<TransferableBlock> _queue;
+  private final String _mailboxId;
+
+  public InMemorySendingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public void send(TransferableBlock data)
+      throws UnsupportedOperationException {
+    try {
+      if (!_queue.offer(
+          data, InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+        throw new RuntimeException(String.format("Timed out when sending block in mailbox=%s", _mailboxId));
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Interrupted trying to send data through the channel", e);
+    }
+  }
+
+  @Override
+  public void complete() {

Review Comment:
   it is guarantee that the last message is the EOS message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9484: [Draft] [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r984863776


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java:
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class MultiplexingMailboxService implements MailboxService<TransferableBlock> {

Review Comment:
   this is a GOOD IDEA!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r998723428


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryReceivingMailbox implements ReceivingMailbox<TransferableBlock> {
+  private final String _mailboxId;
+  private final BlockingQueue<TransferableBlock> _queue;
+  private boolean _closed;

Review Comment:
   nit: volatile - are there any other concerns with concurrency here?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryMailboxService implements MailboxService<TransferableBlock> {
+  // channel manager
+  private final String _hostname;
+  private final int _mailboxPort;
+  static final int DEFAULT_CHANNEL_CAPACITY = 5;
+  // TODO: This should come from a config and should be consistent with the timeout for GrpcMailboxService
+  static final int DEFAULT_CHANNEL_TIMEOUT_SECONDS = 120;
+
+  // maintaining a list of registered mailboxes.
+  private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, SendingMailbox<TransferableBlock>> _sendingMailboxMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ArrayBlockingQueue<TransferableBlock>> _channelMap =
+      new ConcurrentHashMap<>();
+
+  public InMemoryMailboxService(String hostname, int mailboxPort) {
+    _hostname = hostname;
+    _mailboxPort = mailboxPort;
+  }
+
+  @Override
+  public void start() {
+  }
+
+  @Override
+  public void shutdown() {

Review Comment:
   when do the maps get cleaned up? (esp. regarding a cancelled query or one that times out)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryMailboxService implements MailboxService<TransferableBlock> {
+  // channel manager
+  private final String _hostname;
+  private final int _mailboxPort;
+  static final int DEFAULT_CHANNEL_CAPACITY = 5;
+  // TODO: This should come from a config and should be consistent with the timeout for GrpcMailboxService
+  static final int DEFAULT_CHANNEL_TIMEOUT_SECONDS = 120;
+
+  // maintaining a list of registered mailboxes.
+  private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, SendingMailbox<TransferableBlock>> _sendingMailboxMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ArrayBlockingQueue<TransferableBlock>> _channelMap =
+      new ConcurrentHashMap<>();

Review Comment:
   it's usually safer to wrap `ReceivingMailbox`, `SendingMailbox` and `ArrayBlockingQueue` all in a single `MailboxState` class, and then maintain just a single `Map<String, MailboxState>` that sets all three of these at once in a single `computeIfAbsent` call.
   
   I know it's unlikely that there will be a race, but it's easier to reason about when it's guaranteed there won't be!



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
+  private final BlockingQueue<TransferableBlock> _queue;
+  private final String _mailboxId;
+
+  public InMemorySendingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public void send(TransferableBlock data)
+      throws UnsupportedOperationException {

Review Comment:
   nit `throws UnsupportedOperationException` can be removed?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryReceivingMailbox implements ReceivingMailbox<TransferableBlock> {
+  private final String _mailboxId;
+  private final BlockingQueue<TransferableBlock> _queue;
+  private boolean _closed;
+
+  public InMemoryReceivingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+    _closed = false;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public TransferableBlock receive()
+      throws Exception {
+    TransferableBlock block = _queue.poll(
+        InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    if (block == null) {
+      throw new RuntimeException(String.format("Timed out waiting for data block on mailbox=%s", _mailboxId));
+    }
+    if (block.isEndOfStreamBlock()) {
+      _closed = true;
+    }
+    return block;
+  }
+
+  @Override
+  public boolean isInitialized() {
+    return true;
+  }
+
+  @Override
+  public boolean isClosed() {
+    return _closed && _queue.size() == 0;

Review Comment:
   why do we check that the `_queue.size() == 0`? if I cancel the query or it times out and I close the mailbox I shouldn't be forced to process the queue



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
+  private final BlockingQueue<TransferableBlock> _queue;
+  private final String _mailboxId;
+
+  public InMemorySendingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public void send(TransferableBlock data)
+      throws UnsupportedOperationException {
+    try {
+      if (!_queue.offer(
+          data, InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+        throw new RuntimeException(String.format("Timed out when sending block in mailbox=%s", _mailboxId));
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Interrupted trying to send data through the channel", e);
+    }
+  }
+
+  @Override
+  public void complete() {

Review Comment:
   do we want to send a sentinel block here to make sure that the receiving side will terminate (instead of waiting for `DEFAULT_CHANNEL_TIMEOUT_SECONDS`)? or is there any other mechanism to make sure that in the non-happy path the receiver won't need to wait the whole timeout?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryReceivingMailbox implements ReceivingMailbox<TransferableBlock> {
+  private final String _mailboxId;
+  private final BlockingQueue<TransferableBlock> _queue;
+  private boolean _closed;
+
+  public InMemoryReceivingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+    _closed = false;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public TransferableBlock receive()
+      throws Exception {
+    TransferableBlock block = _queue.poll(
+        InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS);

Review Comment:
   we should consider matching the behavior of `GrpcReceivingMailbox` and time out after a very short interval (e.g. 100ms) and then return `null`. in this case, the `MailboxReceiveOperator` will just cycle back through so long as this mailbox isn't closed
   
   we'll probably end up changing the threading model, but it would be nice to not make this block essentially indefinitely in case there's data in a different mailbox for us to read from



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java:
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class MultiplexingMailboxService implements MailboxService<TransferableBlock> {

Review Comment:
   +1 very clean!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] agavra commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
agavra commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r998783767


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
+  private final BlockingQueue<TransferableBlock> _queue;
+  private final String _mailboxId;
+
+  public InMemorySendingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public void send(TransferableBlock data)
+      throws UnsupportedOperationException {
+    try {
+      if (!_queue.offer(
+          data, InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+        throw new RuntimeException(String.format("Timed out when sending block in mailbox=%s", _mailboxId));
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Interrupted trying to send data through the channel", e);
+    }
+  }
+
+  @Override
+  public void complete() {

Review Comment:
   > it is guarantee that the last message is the EOS
   
   is that true even in the error scenarios? I was worried about cases like timeout, where we'd want to timeout quickly instead of blocking for 120s



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r995377631


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -116,26 +112,16 @@ protected TransferableBlock getNextBlock() {
       hasOpenedMailbox = false;
       for (ServerInstance sendingInstance : _sendingStageInstances) {
         try {
-          ReceivingMailbox<Mailbox.MailboxContent> receivingMailbox =
+          ReceivingMailbox<TransferableBlock> receivingMailbox =
               _mailboxService.getReceivingMailbox(toMailboxId(sendingInstance));
           // TODO this is not threadsafe.
           // make sure only one thread is checking receiving mailbox and calling receive() then close()
           if (!receivingMailbox.isClosed()) {
             hasOpenedMailbox = true;
-            Mailbox.MailboxContent mailboxContent = receivingMailbox.receive();
-            if (mailboxContent != null) {
-              ByteBuffer byteBuffer = mailboxContent.getPayload().asReadOnlyByteBuffer();
-              if (byteBuffer.hasRemaining()) {
-                BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer);
-                if (dataBlock instanceof MetadataBlock && !dataBlock.getExceptions().isEmpty()) {
-                  _upstreamErrorBlock = TransferableBlockUtils.getErrorTransferableBlock(dataBlock.getExceptions());
-                  return _upstreamErrorBlock;
-                }
-                if (dataBlock.getNumberOfRows() > 0) {
-                  // here we only return data table block when it is not empty.
-                  return new TransferableBlock(dataBlock);
-                }

Review Comment:
   Other than the number of rows check is there anything else which is not equivalent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r998735015


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryMailboxService implements MailboxService<TransferableBlock> {
+  // channel manager
+  private final String _hostname;
+  private final int _mailboxPort;
+  static final int DEFAULT_CHANNEL_CAPACITY = 5;
+  // TODO: This should come from a config and should be consistent with the timeout for GrpcMailboxService
+  static final int DEFAULT_CHANNEL_TIMEOUT_SECONDS = 120;
+
+  // maintaining a list of registered mailboxes.
+  private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, SendingMailbox<TransferableBlock>> _sendingMailboxMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ArrayBlockingQueue<TransferableBlock>> _channelMap =
+      new ConcurrentHashMap<>();
+
+  public InMemoryMailboxService(String hostname, int mailboxPort) {
+    _hostname = hostname;
+    _mailboxPort = mailboxPort;
+  }
+
+  @Override
+  public void start() {
+  }
+
+  @Override
+  public void shutdown() {

Review Comment:
   This is a known leak. @walterddr is planning to fix it with query cancellation afaik



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r998837096


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryReceivingMailbox implements ReceivingMailbox<TransferableBlock> {
+  private final String _mailboxId;
+  private final BlockingQueue<TransferableBlock> _queue;
+  private boolean _closed;
+
+  public InMemoryReceivingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+    _closed = false;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public TransferableBlock receive()
+      throws Exception {
+    TransferableBlock block = _queue.poll(
+        InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS);

Review Comment:
   Good catch. The default timeout is reduced to 1s (same as `MailboxContentStreamObserver`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r997102381


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java:
##########
@@ -85,4 +104,27 @@ private boolean waitForInitialize()
   public String getMailboxId() {
     return _mailboxId;
   }
+
+  /**
+   * Converts the data sent by a {@link GrpcSendingMailbox} to a {@link TransferableBlock}.
+   *
+   * @param mailboxContent data sent by a GrpcSendingMailbox.
+   * @return null if the received MailboxContent didn't have any rows.
+   * @throws IOException if the MailboxContent cannot be converted to a TransferableBlock.

Review Comment:
   same as above plz change wording in javadoc



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryChannel.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+
+/**
+ * Used by {@link org.apache.pinot.query.mailbox.InMemoryMailboxService} for passing data between stages when sender
+ * and receiver are in the same process.
+ */
+public class InMemoryChannel<T> {

Review Comment:
   i think the part that confuses me is this `InMemoryChannel` 
   
   1. this is not the equivalent of the `ManagedChannel` in `ChannelManager`. it is actually a wrapper of the blocking queue. 
   2. the most resembling class should be the `MailboxContentStreamObserver`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryChannel.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+
+/**
+ * Used by {@link org.apache.pinot.query.mailbox.InMemoryMailboxService} for passing data between stages when sender
+ * and receiver are in the same process.
+ */
+public class InMemoryChannel<T> {
+  private final ArrayBlockingQueue<T> _channel;
+  private final String _hostname;
+  private final int _port;
+  private boolean _completed = false;
+
+  public InMemoryChannel(ArrayBlockingQueue<T> channel, String hostname, int port) {
+    _channel = channel;
+    _hostname = hostname;
+    _port = port;
+  }
+
+  public ArrayBlockingQueue<T> getChannel() {
+    return _channel;
+  }
+
+  public void complete() {
+    _completed = true;
+  }
+
+  public boolean isCompleted() {
+    return _completed;
+  }

Review Comment:
   ignored this comment. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryChannel.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+
+/**
+ * Used by {@link org.apache.pinot.query.mailbox.InMemoryMailboxService} for passing data between stages when sender
+ * and receiver are in the same process.
+ */
+public class InMemoryChannel<T> {
+  private final ArrayBlockingQueue<T> _channel;
+  private final String _hostname;
+  private final int _port;
+  private boolean _completed = false;
+
+  public InMemoryChannel(ArrayBlockingQueue<T> channel, String hostname, int port) {
+    _channel = channel;
+    _hostname = hostname;
+    _port = port;
+  }
+
+  public ArrayBlockingQueue<T> getChannel() {
+    return _channel;
+  }

Review Comment:
   so in GRPC mailbox the mailbox itself doesn't keep the info regarding a complete signal. which make sense b/c the sender is on RPC. 
   - for example the last metadatablock might get lost during the transport 
   - GRPC helps to make sure the `onComplete()` method is called.
   
   for in-memory mailbox, one can simply check whether a metadatablock is received to indicate whether it is completed (as you shall never lose a block on the ArrayBlockingQueue)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java:
##########
@@ -64,6 +65,12 @@ public MailboxContentStreamObserver(GrpcMailboxService mailboxService,
     _isEnabledFeedback = isEnabledFeedback;
   }
 
+  /**
+   * This method may return null. It can happen if there's a large enough gap between the time the receiver received
+   * the last block and the StreamObserver was detected as complete. In that case, the MailboxReceiveOperator would
+   * try to call this method again but since there's no new content to be received, we'll exit the loop. Returning
+   * null here means that MailboxReceiveOperator won't consider this as an error.
+   */

Review Comment:
   1. yes this is the correct behavior.
   2. we don't rely on this b/c we rely on `onCompleted()`



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.query.mailbox.channel.InMemoryChannel;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryMailboxService implements MailboxService<TransferableBlock> {
+  // channel manager
+  private final String _hostname;
+  private final int _mailboxPort;
+  static final int DEFAULT_CHANNEL_CAPACITY = 5;
+  // TODO: This should come from a config and should be consistent with the timeout for GrpcMailboxService
+  static final int DEFAULT_CHANNEL_TIMEOUT_SECONDS = 120;
+
+  // maintaining a list of registered mailboxes.
+  private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, SendingMailbox<TransferableBlock>> _sendingMailboxMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, InMemoryChannel<TransferableBlock>> _channelMap = new ConcurrentHashMap<>();

Review Comment:
   let's just use an array blocking queue here. it is simpler.
   also see my other comment in the InMemoryChannel class



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java:
##########
@@ -50,15 +58,26 @@ public void init(MailboxContentStreamObserver streamObserver) {
     }
   }
 
+  /**
+   * Polls the underlying channel and converts the received data into a TransferableBlock. This may return null in the
+   * following cases:
+   *
+   * <p>
+   *  1. If the mailbox hasn't initialized yet. This means we haven't received any data yet.
+   *  2. If the received block from the sender didn't have any rows.

Review Comment:
   if the received block is a metadata block it will still return a metadata block with 0 rows right? shouldn't this be 
   
   ```suggestion
      *  2. If the received block from the sender is a data block but have 0 rows.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r995176549


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java:
##########
@@ -85,4 +97,28 @@ private boolean waitForInitialize()
   public String getMailboxId() {
     return _mailboxId;
   }
+
+  /**
+   * Converts the data sent by a {@link GrpcSendingMailbox} to a {@link TransferableBlock}.
+   *
+   * @param mailboxContent data sent by a GrpcSendingMailbox.
+   * @return null if the mailboxContent passed is null or empty. Will return an error block if the returned DataBlock
+   *         contains exceptions.
+   * @throws IOException if the MailboxContent cannot be converted to a TransferableBlock.
+   */
+  private TransferableBlock fromMailboxContent(@Nullable MailboxContent mailboxContent)
+      throws IOException {
+    if (mailboxContent == null) {
+      return null;
+    }
+    ByteBuffer byteBuffer = mailboxContent.getPayload().asReadOnlyByteBuffer();
+    if (byteBuffer.hasRemaining()) {
+      BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer);
+      if (dataBlock instanceof MetadataBlock && !dataBlock.getExceptions().isEmpty()) {
+        return TransferableBlockUtils.getErrorTransferableBlock(dataBlock.getExceptions());
+      }
+      return new TransferableBlock(dataBlock);
+    }
+    return null;
+  }

Review Comment:
   tag this as `@Nullable` return.
   on a second thought, should we ever return a null block?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryChannel.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+
+/**
+ * Used by {@link org.apache.pinot.query.mailbox.InMemoryMailboxService} for passing data between stages when sender
+ * and receiver are in the same process.
+ */
+public class InMemoryChannel<T> {
+  private final ArrayBlockingQueue<T> _channel;
+  private final String _hostname;
+  private final int _port;
+  private boolean _completed = false;
+
+  public InMemoryChannel(ArrayBlockingQueue<T> channel, String hostname, int port) {
+    _channel = channel;
+    _hostname = hostname;
+    _port = port;
+  }
+
+  public ArrayBlockingQueue<T> getChannel() {
+    return _channel;
+  }

Review Comment:
   IMO technically speaking we don't really need to keep this as a rapper of the array blocking queue b/c it can be directly coded inside the InMemoryMailboxService. 
   but i would let others also take a look and see if this abstract makes it easier to understand



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java:
##########
@@ -116,26 +112,16 @@ protected TransferableBlock getNextBlock() {
       hasOpenedMailbox = false;
       for (ServerInstance sendingInstance : _sendingStageInstances) {
         try {
-          ReceivingMailbox<Mailbox.MailboxContent> receivingMailbox =
+          ReceivingMailbox<TransferableBlock> receivingMailbox =
               _mailboxService.getReceivingMailbox(toMailboxId(sendingInstance));
           // TODO this is not threadsafe.
           // make sure only one thread is checking receiving mailbox and calling receive() then close()
           if (!receivingMailbox.isClosed()) {
             hasOpenedMailbox = true;
-            Mailbox.MailboxContent mailboxContent = receivingMailbox.receive();
-            if (mailboxContent != null) {
-              ByteBuffer byteBuffer = mailboxContent.getPayload().asReadOnlyByteBuffer();
-              if (byteBuffer.hasRemaining()) {
-                BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer);
-                if (dataBlock instanceof MetadataBlock && !dataBlock.getExceptions().isEmpty()) {
-                  _upstreamErrorBlock = TransferableBlockUtils.getErrorTransferableBlock(dataBlock.getExceptions());
-                  return _upstreamErrorBlock;
-                }
-                if (dataBlock.getNumberOfRows() > 0) {
-                  // here we only return data table block when it is not empty.
-                  return new TransferableBlock(dataBlock);
-                }

Review Comment:
   this was not copied correctly over to mailbox side



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java:
##########
@@ -85,4 +97,28 @@ private boolean waitForInitialize()
   public String getMailboxId() {
     return _mailboxId;
   }
+
+  /**
+   * Converts the data sent by a {@link GrpcSendingMailbox} to a {@link TransferableBlock}.
+   *
+   * @param mailboxContent data sent by a GrpcSendingMailbox.
+   * @return null if the mailboxContent passed is null or empty. Will return an error block if the returned DataBlock
+   *         contains exceptions.
+   * @throws IOException if the MailboxContent cannot be converted to a TransferableBlock.
+   */
+  private TransferableBlock fromMailboxContent(@Nullable MailboxContent mailboxContent)
+      throws IOException {
+    if (mailboxContent == null) {
+      return null;
+    }
+    ByteBuffer byteBuffer = mailboxContent.getPayload().asReadOnlyByteBuffer();
+    if (byteBuffer.hasRemaining()) {
+      BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer);
+      if (dataBlock instanceof MetadataBlock && !dataBlock.getExceptions().isEmpty()) {
+        return TransferableBlockUtils.getErrorTransferableBlock(dataBlock.getExceptions());
+      }
+      return new TransferableBlock(dataBlock);

Review Comment:
   missing the row count check here.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java:
##########
@@ -91,7 +93,8 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, BrokerRoutingMa
         CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)),
         new WorkerManager(_reducerHostname, _reducerPort, routingManager));
     _queryDispatcher = new QueryDispatcher();
-    _mailboxService = new GrpcMailboxService(_reducerHostname, _reducerPort, config);
+    _mailboxService = new MultiplexingMailboxService(new GrpcMailboxService(_reducerHostname, _reducerPort, config),
+        new InMemoryMailboxService(_reducerHostname, _reducerPort));

Review Comment:
   multistage broker request handler shouldn't know about this. make this a multiplexingmailboxservice builder pattern
   - e.g.
   ```
   _mailboxService = MultiplexingMailboxService.newInstance(pinotConfig)
   
   // in MultiplexingMailboxService
   public static MultiplexingMailboxService newInstance(PinotConfiguration pinotConfig) {
     builder = MultiplexingMailboxService.Buidler();
     if (pinotConfig.contains(ENABLE_IN_MEMORY_MAILBOX)) {
       builder.add(new InMemoryMailboxService());
     }
     if (pintoConfig.contains(ENABLE_GRPC_MAILBOX)) {
       builder.add(new GRPCMailboxService());
     }
     return builder.build();
   }
   ```
   



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryChannel.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+
+/**
+ * Used by {@link org.apache.pinot.query.mailbox.InMemoryMailboxService} for passing data between stages when sender
+ * and receiver are in the same process.
+ */
+public class InMemoryChannel<T> {
+  private final ArrayBlockingQueue<T> _channel;
+  private final String _hostname;
+  private final int _port;
+  private boolean _completed = false;
+
+  public InMemoryChannel(ArrayBlockingQueue<T> channel, String hostname, int port) {
+    _channel = channel;
+    _hostname = hostname;
+    _port = port;
+  }
+
+  public ArrayBlockingQueue<T> getChannel() {
+    return _channel;
+  }
+
+  public void complete() {
+    _completed = true;
+  }
+
+  public boolean isCompleted() {
+    return _completed;
+  }

Review Comment:
   based on the concept of channel. this should never be completed (see why `ChannelManager` always keeps `ManagedChannel` alive in GRPC)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r998892491


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryReceivingMailbox implements ReceivingMailbox<TransferableBlock> {
+  private final String _mailboxId;
+  private final BlockingQueue<TransferableBlock> _queue;
+  private boolean _closed;
+
+  public InMemoryReceivingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+    _closed = false;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public TransferableBlock receive()
+      throws Exception {
+    TransferableBlock block = _queue.poll(
+        InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    if (block == null) {
+      throw new RuntimeException(String.format("Timed out waiting for data block on mailbox=%s", _mailboxId));
+    }
+    if (block.isEndOfStreamBlock()) {
+      _closed = true;
+    }
+    return block;
+  }
+
+  @Override
+  public boolean isInitialized() {
+    return true;
+  }
+
+  @Override
+  public boolean isClosed() {
+    return _closed && _queue.size() == 0;

Review Comment:
   upon checking the usage of the interface method `isClosed()`. it is actually in the non-threadsafe mailboxreceiveoperator. 
   
   - i think for now we can keep this b/c grpc mailbox does basically the same thing - it should really be called "shouldAnotherBlockBeExpected"
   - later with #9615 we should redefine this API (or create a new one)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r998737649


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
+  private final BlockingQueue<TransferableBlock> _queue;
+  private final String _mailboxId;
+
+  public InMemorySendingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public void send(TransferableBlock data)
+      throws UnsupportedOperationException {

Review Comment:
   The interface throws it. Not sure why. @walterddr can it be removed now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r998890415


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
+  private final BlockingQueue<TransferableBlock> _queue;
+  private final String _mailboxId;
+
+  public InMemorySendingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public void send(TransferableBlock data)
+      throws UnsupportedOperationException {
+    try {
+      if (!_queue.offer(
+          data, InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+        throw new RuntimeException(String.format("Timed out when sending block in mailbox=%s", _mailboxId));
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Interrupted trying to send data through the channel", e);
+    }
+  }
+
+  @Override
+  public void complete() {

Review Comment:
   > is that true even in the error scenarios? I was worried about cases like timeout, where we'd want to timeout quickly instead of blocking for 120s
   
   - if the error is sent from the sender then yes that will be the last message;
   - if the error occurs after a normal message was delivered then the error will originate from where the error was thrown, from that point the originated operator will stop returning any new data blocks
   
   so yes it is guaranteed in the operator level; but it is not guarantee in the mailbox level
   
   > The 120s timeout was definitely a bug. Reduced it to 1s. I have mentioned some timeout scenario handling in the other comment regarding _queue.size() == 0.
   
   bare in mind that the 100ms timeout for GRPCReceivingMailbox is only the timeout upon initialization; from that point there's no pull-with-time-out. it is a pushed model managed by GRPC
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r998890415


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
+  private final BlockingQueue<TransferableBlock> _queue;
+  private final String _mailboxId;
+
+  public InMemorySendingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public void send(TransferableBlock data)
+      throws UnsupportedOperationException {
+    try {
+      if (!_queue.offer(
+          data, InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+        throw new RuntimeException(String.format("Timed out when sending block in mailbox=%s", _mailboxId));
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Interrupted trying to send data through the channel", e);
+    }
+  }
+
+  @Override
+  public void complete() {

Review Comment:
   > is that true even in the error scenarios? I was worried about cases like timeout, where we'd want to timeout quickly instead of blocking for 120s
   
   - if the error is sent from the sender then yes that will be the last message;
   - if the error occurs after a normal message was delivered then the error will originate from where the error was thrown, from that point the originated operator will stop returning any new data blocks
   
   so yes it is guaranteed in the operator level; but it is not guarantee in the mailbox level. meaning the mailbox could still be receiving data afterwards, just no thread will dequeue from that receiving buffer. exactly the scenario i described in https://github.com/apache/pinot/issues/9626
   
   > The 120s timeout was definitely a bug. Reduced it to 1s. I have mentioned some timeout scenario handling in the other comment regarding _queue.size() == 0.
   
   bare in mind that the 100ms timeout for GRPCReceivingMailbox is only the timeout upon initialization; from that point there's no pull-with-time-out. it is a pushed model managed by GRPC
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r995376873


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/InMemoryChannel.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+
+/**
+ * Used by {@link org.apache.pinot.query.mailbox.InMemoryMailboxService} for passing data between stages when sender
+ * and receiver are in the same process.
+ */
+public class InMemoryChannel<T> {
+  private final ArrayBlockingQueue<T> _channel;
+  private final String _hostname;
+  private final int _port;
+  private boolean _completed = false;
+
+  public InMemoryChannel(ArrayBlockingQueue<T> channel, String hostname, int port) {
+    _channel = channel;
+    _hostname = hostname;
+    _port = port;
+  }
+
+  public ArrayBlockingQueue<T> getChannel() {
+    return _channel;
+  }

Review Comment:
   In `MailboxReceiveOperator` we check if the mailbox is closed. For `InMemoryReceivingMailbox`, we need to confirm that:
   
   1. The sender is done sending. When the sender is done, it will call complete here.
   2. The ArrayBlockingQueue is empty.
   
   If both of these are true, then we say that the mailbox is closed. I think this is actually equivalent to `MailboxContentStreamObserver` and not the channel itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r997098521


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcReceivingMailbox.java:
##########
@@ -50,15 +58,26 @@ public void init(MailboxContentStreamObserver streamObserver) {
     }
   }
 
+  /**
+   * Polls the underlying channel and converts the received data into a TransferableBlock. This may return null in the
+   * following cases:
+   *
+   * <p>
+   *  1. If the mailbox hasn't initialized yet. This means we haven't received any data yet.
+   *  2. If the received block from the sender didn't have any rows.
+   * </p>
+   */
   @Override
-  public MailboxContent receive()
+  public TransferableBlock receive()
       throws Exception {
     MailboxContent mailboxContent = null;
     if (waitForInitialize()) {
       mailboxContent = _contentStreamObserver.poll();
       _totalMsgReceived.incrementAndGet();
+    } else {
+      return null;
     }
-    return mailboxContent;
+    return fromMailboxContent(mailboxContent);

Review Comment:
   ```
       if (!waitForInitialize()) {
         return null;
       }
       mailboxContent = _contentStreamObserver.poll();
       _totalMsgReceived.incrementAndGet();
       return fromMailboxContent(mailboxContent);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #9484:
URL: https://github.com/apache/pinot/pull/9484#issuecomment-1281377735

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/9484?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#9484](https://codecov.io/gh/apache/pinot/pull/9484?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ed23523) into [master](https://codecov.io/gh/apache/pinot/commit/a8d95f234a85e2aab10cb9090bb766c08eb7edb7?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a8d95f2) will **decrease** coverage by `44.07%`.
   > The diff coverage is `0.92%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #9484       +/-   ##
   =============================================
   - Coverage     69.95%   25.87%   -44.08%     
   + Complexity     4946       44     -4902     
   =============================================
     Files          1936     1929        -7     
     Lines        103474   103220      -254     
     Branches      15714    15679       -35     
   =============================================
   - Hits          72383    26706    -45677     
   - Misses        25984    73818    +47834     
   + Partials       5107     2696     -2411     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `25.87% <0.92%> (-0.06%)` | :arrow_down: |
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/9484?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...apache/pinot/query/mailbox/GrpcMailboxService.java](https://codecov.io/gh/apache/pinot/pull/9484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9HcnBjTWFpbGJveFNlcnZpY2UuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ache/pinot/query/mailbox/GrpcReceivingMailbox.java](https://codecov.io/gh/apache/pinot/pull/9484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9HcnBjUmVjZWl2aW5nTWFpbGJveC5qYXZh) | `0.00% <0.00%> (-95.24%)` | :arrow_down: |
   | [...apache/pinot/query/mailbox/GrpcSendingMailbox.java](https://codecov.io/gh/apache/pinot/pull/9484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9HcnBjU2VuZGluZ01haWxib3guamF2YQ==) | `0.00% <0.00%> (-96.00%)` | :arrow_down: |
   | [...he/pinot/query/mailbox/InMemoryMailboxService.java](https://codecov.io/gh/apache/pinot/pull/9484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9Jbk1lbW9yeU1haWxib3hTZXJ2aWNlLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [.../pinot/query/mailbox/InMemoryReceivingMailbox.java](https://codecov.io/gh/apache/pinot/pull/9484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9Jbk1lbW9yeVJlY2VpdmluZ01haWxib3guamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...he/pinot/query/mailbox/InMemorySendingMailbox.java](https://codecov.io/gh/apache/pinot/pull/9484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9Jbk1lbW9yeVNlbmRpbmdNYWlsYm94LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...inot/query/mailbox/MultiplexingMailboxService.java](https://codecov.io/gh/apache/pinot/pull/9484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9NdWx0aXBsZXhpbmdNYWlsYm94U2VydmljZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...e/pinot/query/mailbox/StringMailboxIdentifier.java](https://codecov.io/gh/apache/pinot/pull/9484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9TdHJpbmdNYWlsYm94SWRlbnRpZmllci5qYXZh) | `0.00% <0.00%> (-79.32%)` | :arrow_down: |
   | [.../mailbox/channel/MailboxContentStreamObserver.java](https://codecov.io/gh/apache/pinot/pull/9484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9jaGFubmVsL01haWxib3hDb250ZW50U3RyZWFtT2JzZXJ2ZXIuamF2YQ==) | `0.00% <0.00%> (-72.10%)` | :arrow_down: |
   | [...va/org/apache/pinot/query/runtime/QueryRunner.java](https://codecov.io/gh/apache/pinot/pull/9484/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9RdWVyeVJ1bm5lci5qYXZh) | `0.00% <0.00%> (-78.95%)` | :arrow_down: |
   | ... and [1414 more](https://codecov.io/gh/apache/pinot/pull/9484/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r996227218


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentStreamObserver.java:
##########
@@ -64,6 +65,12 @@ public MailboxContentStreamObserver(GrpcMailboxService mailboxService,
     _isEnabledFeedback = isEnabledFeedback;
   }
 
+  /**
+   * This method may return null. It can happen if there's a large enough gap between the time the receiver received
+   * the last block and the StreamObserver was detected as complete. In that case, the MailboxReceiveOperator would
+   * try to call this method again but since there's no new content to be received, we'll exit the loop. Returning
+   * null here means that MailboxReceiveOperator won't consider this as an error.
+   */

Review Comment:
   @walterddr : Highlighting this since we wanted to discuss about when poll here would return null. I think there's only 1 case which I have highlighted above. Lmk your thoughts.
   
   Also an interesting thing I saw was that we are not processing `MAILBOX_METADATA_END_OF_STREAM_KEY` on the receiver side, though the sender does set it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on PR #9484:
URL: https://github.com/apache/pinot/pull/9484#issuecomment-1281321414

   @walterddr : Removed `InMemoryChannel`. I agree using a Queue directly is better. I am using `isEndOfStreamBlock` to determine when the receiving mailbox is done receiving now. Can you take a look again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r998760609


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryMailboxService.java:
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryMailboxService implements MailboxService<TransferableBlock> {
+  // channel manager
+  private final String _hostname;
+  private final int _mailboxPort;
+  static final int DEFAULT_CHANNEL_CAPACITY = 5;
+  // TODO: This should come from a config and should be consistent with the timeout for GrpcMailboxService
+  static final int DEFAULT_CHANNEL_TIMEOUT_SECONDS = 120;
+
+  // maintaining a list of registered mailboxes.
+  private final ConcurrentHashMap<String, ReceivingMailbox<TransferableBlock>> _receivingMailboxMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, SendingMailbox<TransferableBlock>> _sendingMailboxMap =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ArrayBlockingQueue<TransferableBlock>> _channelMap =
+      new ConcurrentHashMap<>();

Review Comment:
   i think this was my original split in grpc mailbox ( this was necessary as the channels and mailbox are separately managed) 
   but in this case all 3 are guaranteed 1-1 mapping b/c they are all in the same JVM
   
   I am ok with both approaches since 
   - grouping them make more clear code smell, we can precisely managed them together because it is a special property of the in-mem mailbox
   - separating them makes logical sense as separation of concern (e.g. the sender normally won't have access to the receiving side info, this just happen to be the case for in-mem mailbox)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r998844286


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemoryReceivingMailbox.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemoryReceivingMailbox implements ReceivingMailbox<TransferableBlock> {
+  private final String _mailboxId;
+  private final BlockingQueue<TransferableBlock> _queue;
+  private boolean _closed;
+
+  public InMemoryReceivingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+    _closed = false;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public TransferableBlock receive()
+      throws Exception {
+    TransferableBlock block = _queue.poll(
+        InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    if (block == null) {
+      throw new RuntimeException(String.format("Timed out waiting for data block on mailbox=%s", _mailboxId));
+    }
+    if (block.isEndOfStreamBlock()) {
+      _closed = true;
+    }
+    return block;
+  }
+
+  @Override
+  public boolean isInitialized() {
+    return true;
+  }
+
+  @Override
+  public boolean isClosed() {
+    return _closed && _queue.size() == 0;

Review Comment:
   Ideally there shouldn't be any case where `_closed` is true but `_queue.size()` is not 0. Only case where it may happen is when the sender sends something immediately after sending the EOS block. We can also throw an exception here in that case.
   
   To better understand this, this is how I believe some of the scenarios will be handled:
   
   1. If the sender has died, it would have sent an end of stream block and then terminated on its end. The receiver here would then get the end of stream block and send it upstream to MailboxReceiveOperator. The queue should be empty after `_closed` is marked true.
   2. If for some reason the sender was not able to send the EOS block, MailboxReceiveOperator will time out after the query timeout is reached.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #9484: [multistage] Introduce InMemoryMailboxService for Optimizing Join Performance

Posted by GitBox <gi...@apache.org>.
ankitsultana commented on code in PR #9484:
URL: https://github.com/apache/pinot/pull/9484#discussion_r998845356


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/InMemorySendingMailbox.java:
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class InMemorySendingMailbox implements SendingMailbox<TransferableBlock> {
+  private final BlockingQueue<TransferableBlock> _queue;
+  private final String _mailboxId;
+
+  public InMemorySendingMailbox(String mailboxId, BlockingQueue<TransferableBlock> queue) {
+    _mailboxId = mailboxId;
+    _queue = queue;
+  }
+
+  @Override
+  public String getMailboxId() {
+    return _mailboxId;
+  }
+
+  @Override
+  public void send(TransferableBlock data)
+      throws UnsupportedOperationException {
+    try {
+      if (!_queue.offer(
+          data, InMemoryMailboxService.DEFAULT_CHANNEL_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+        throw new RuntimeException(String.format("Timed out when sending block in mailbox=%s", _mailboxId));
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Interrupted trying to send data through the channel", e);
+    }
+  }
+
+  @Override
+  public void complete() {

Review Comment:
   The 120s timeout was definitely a bug. Reduced it to 1s. I have mentioned some timeout scenario handling in the other comment regarding `_queue.size() == 0`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org