You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "walterddr (via GitHub)" <gi...@apache.org> on 2023/05/12 22:03:01 UTC

[GitHub] [pinot] walterddr commented on a diff in pull request #10761: [multistage] sending exchange service

walterddr commented on code in PR #10761:
URL: https://github.com/apache/pinot/pull/10761#discussion_r1192826109


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/ExchangeService.java:
##########
@@ -0,0 +1,60 @@
+package org.apache.pinot.query.runtime.operator.exchange;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.OpChainId;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExchangeService {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ExchangeService.class);
+  private static final int DANGLING_EXCHANGE_EXPIRY_SECONDS = 300;
+
+  private final Cache<OpChainId, Future<?>> _submittedExchangeCache =
+      CacheBuilder.newBuilder().expireAfterAccess(DANGLING_EXCHANGE_EXPIRY_SECONDS, TimeUnit.SECONDS)
+          .removalListener((RemovalListener<OpChainId, Future<?>>) notification -> {
+            if (notification.wasEvicted()) {
+              Future<?> future = notification.getValue();
+              if (!future.isDone()) {
+                LOGGER.warn("Evicting dangling exchange request for {}}", notification.getKey());
+                future.cancel(true);
+              }
+            }
+          }).build();
+  private final ExecutorService _exchangeExecutor;
+
+  public ExchangeService(String hostname, int port, PinotConfiguration config) {
+    _exchangeExecutor = Executors.newCachedThreadPool();
+    LOGGER.info("Initialized ExchangeService with hostname: {}, port: {}", hostname, port);
+  }
+
+  /**
+   * submit a block exchange to sending service for a single OpChain.
+   *
+   * Notice that the logic inside the {@link BlockExchange#send()} should guarantee the submitted Runnable object
+   *     to be terminated successfully or after opChain timeout.
+   *
+   * @param blockExchange the exchange object of the OpChain with all the pending data to be sent.
+   */
+  public void submitExchangeRequest(OpChainId opChainId, BlockExchange blockExchange) {
+    _submittedExchangeCache.put(opChainId, _exchangeExecutor.submit(() -> {
+      TransferableBlock block = blockExchange.send();
+      while (!TransferableBlockUtils.isEndOfStream(block)) {
+        block = blockExchange.send();
+      }
+    }));

Review Comment:
   this will submit the send exchange to another threadpool for execution. 
   - if any of the GRPC/In-mem channel is blocked it should back propagate the condition to BlockExchange class and cause it to slow down producing new TransferrableBlocks
   - anytime a block is sent successful it will trigger the opChain to produce more block (if not already in running state)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org