You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/25 01:19:58 UTC

[pinot] branch master updated: fix excpetion during exchange routing causes stucked pipeline (#10802)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 00d3133ccc fix excpetion during exchange routing causes stucked pipeline (#10802)
00d3133ccc is described below

commit 00d3133ccc4c878f4371a213735e3558510a0d0a
Author: Rong Rong <ro...@apache.org>
AuthorDate: Wed May 24 18:19:52 2023 -0700

    fix excpetion during exchange routing causes stucked pipeline (#10802)
    
    - mailbox service thread to send error during exchange route method.
    - improve exception messaging by adding more LOGGER in place.
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../runtime/operator/exchange/BlockExchange.java   | 27 ++++++++++++--
 .../apache/pinot/query/service/QueryServer.java    | 13 +++++--
 .../operator/exchange/BlockExchangeTest.java       | 41 ++++++++++++++++++++++
 3 files changed, 77 insertions(+), 4 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
index a7da7b93c8..e3f4a48ddf 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchange.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import org.apache.calcite.rel.RelDistribution;
 import org.apache.pinot.common.datablock.DataBlock;
@@ -33,6 +34,8 @@ import org.apache.pinot.query.runtime.blocks.BlockSplitter;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.OpChainId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -45,6 +48,7 @@ import org.apache.pinot.query.runtime.operator.OpChainId;
  */
 public abstract class BlockExchange {
   public static final int DEFAULT_MAX_PENDING_BLOCKS = 5;
+  private static final Logger LOGGER = LoggerFactory.getLogger(BlockExchange.class);
   // TODO: Deduct this value via grpc config maximum byte size; and make it configurable with override.
   // TODO: Max block size is a soft limit. only counts fixedSize datatable byte buffer
   private static final int MAX_MAILBOX_CONTENT_SIZE_BYTES = 4 * 1024 * 1024;
@@ -56,7 +60,7 @@ public abstract class BlockExchange {
   private final long _deadlineMs;
 
   private final BlockingQueue<TransferableBlock> _queue = new ArrayBlockingQueue<>(DEFAULT_MAX_PENDING_BLOCKS);
-
+  private final AtomicReference<TransferableBlock> _errorBlock = new AtomicReference<>();
 
   public static BlockExchange getExchange(OpChainId opChainId, List<SendingMailbox> sendingMailboxes,
       RelDistribution.Type exchangeType, KeySelector<Object[], Object[]> selector, BlockSplitter splitter,
@@ -100,6 +104,10 @@ public abstract class BlockExchange {
     try {
       TransferableBlock block;
       long timeoutMs = _deadlineMs - System.currentTimeMillis();
+      if (_errorBlock.get() != null) {
+        LOGGER.debug("Exchange: {} is already cancelled or errored out internally, ignore the late block", _opChainId);
+        return _errorBlock.get();
+      }
       block = _queue.poll(timeoutMs, TimeUnit.MILLISECONDS);
       if (block == null) {
         block = TransferableBlockUtils.getErrorTransferableBlock(
@@ -117,8 +125,10 @@ public abstract class BlockExchange {
       }
       return block;
     } catch (Exception e) {
-      return TransferableBlockUtils.getErrorTransferableBlock(
+      TransferableBlock errorBlock = TransferableBlockUtils.getErrorTransferableBlock(
           new RuntimeException("Exception while sending data via exchange for opChain: " + _opChainId));
+      setErrorBlock(errorBlock);
+      return errorBlock;
     }
   }
 
@@ -137,6 +147,19 @@ public abstract class BlockExchange {
     }
   }
 
+  private void setErrorBlock(TransferableBlock errorBlock) {
+    if (_errorBlock.compareAndSet(null, errorBlock)) {
+      try {
+        for (SendingMailbox sendingMailbox : _sendingMailboxes) {
+          sendBlock(sendingMailbox, errorBlock);
+        }
+      } catch (Exception e) {
+        LOGGER.error("error while sending exception block via exchange for opChain: " + _opChainId, e);
+      }
+      _queue.clear();
+    }
+  }
+
   protected abstract void route(List<SendingMailbox> destinations, TransferableBlock block)
       throws Exception;
 
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
index 4cd92ab32e..d88b87220e 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java
@@ -85,11 +85,13 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
     // Deserialize the request
     DistributedStagePlan distributedStagePlan;
     Map<String, String> requestMetadataMap;
+    long requestId = -1;
     try {
       distributedStagePlan = QueryPlanSerDeUtils.deserialize(request.getStagePlan());
       requestMetadataMap = request.getMetadataMap();
+      requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
     } catch (Exception e) {
-      LOGGER.error("Caught exception while deserializing the request: {}", request, e);
+      LOGGER.error("Caught exception while deserializing the request: {}, payload: {}", requestId, request, e);
       responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException());
       return;
     }
@@ -100,6 +102,8 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
           .putMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_OK, "").build());
       responseObserver.onCompleted();
     } catch (Throwable t) {
+      LOGGER.error("Caught exception while compiling opChain for request: {}, stage: {}", requestId,
+          distributedStagePlan.getStageId(), t);
       responseObserver.onNext(Worker.QueryResponse.newBuilder()
           .putMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR, QueryException.getTruncatedStackTrace(t))
           .build());
@@ -109,7 +113,12 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
 
   @Override
   public void cancel(Worker.CancelRequest request, StreamObserver<Worker.CancelResponse> responseObserver) {
-    _queryRunner.cancel(request.getRequestId());
+    try {
+      _queryRunner.cancel(request.getRequestId());
+    } catch (Throwable t) {
+      LOGGER.error("Caught exception while cancelling opChain for request: {}", request.getRequestId(), t);
+    }
+    // we always return completed even if cancel attempt fails, server will self clean up in this case.
     responseObserver.onCompleted();
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
index dc874e5a32..ab056c024b 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/exchange/BlockExchangeTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator.exchange;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
+import java.io.IOException;
 import java.util.List;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
@@ -101,6 +102,30 @@ public class BlockExchangeTest {
     Mockito.verify(_mailbox2, Mockito.never()).send(Mockito.any());
   }
 
+  @Test
+  public void shouldSendErrorBlockIfExchangeInternalThrowException()
+      throws Exception {
+    // Given:
+    List<SendingMailbox> destinations = ImmutableList.of(_mailbox1, _mailbox2);
+    BlockExchange exchange = new ThrowingBlockExchange(destinations);
+    TransferableBlock block = new TransferableBlock(ImmutableList.of(new Object[]{"val"}),
+        new DataSchema(new String[]{"foo"}, new ColumnDataType[]{ColumnDataType.STRING}), DataBlock.Type.ROW);
+
+    // When:
+    exchange.offerBlock(block, Long.MAX_VALUE);
+    exchange.send();
+
+    // Then:
+    ArgumentCaptor<TransferableBlock> captor = ArgumentCaptor.forClass(TransferableBlock.class);
+    Mockito.verify(_mailbox1).complete();
+    Mockito.verify(_mailbox1, Mockito.times(1)).send(captor.capture());
+    Assert.assertTrue(captor.getValue().isErrorBlock());
+
+    Mockito.verify(_mailbox2).complete();
+    Mockito.verify(_mailbox2, Mockito.times(1)).send(captor.capture());
+    Assert.assertTrue(captor.getValue().isErrorBlock());
+  }
+
   @Test
   public void shouldSplitBlocks()
       throws Exception {
@@ -152,4 +177,20 @@ public class BlockExchangeTest {
       }
     }
   }
+
+  private static class ThrowingBlockExchange extends BlockExchange {
+    protected ThrowingBlockExchange(List<SendingMailbox> destinations) {
+      this(destinations, (block, type, size) -> Iterators.singletonIterator(block));
+    }
+
+    protected ThrowingBlockExchange(List<SendingMailbox> destinations, BlockSplitter splitter) {
+      super(new OpChainId(1, 2, 3), destinations, splitter, (opChainId) -> { }, Long.MAX_VALUE);
+    }
+
+    @Override
+    protected void route(List<SendingMailbox> destinations, TransferableBlock block)
+        throws Exception {
+      throw new IOException("Deliberate I/O Exception routing");
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org