You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2023/07/29 01:01:16 UTC

[pinot] branch master updated: [multistage] Return empty block if MailboxRecv nodes have no paired MailboxSender (#11201)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 019f4c5fbf [multistage] Return empty block if MailboxRecv nodes have no paired MailboxSender (#11201)
019f4c5fbf is described below

commit 019f4c5fbf5b6976c61cd9f68247871dcf08163e
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Fri Jul 28 18:01:10 2023 -0700

    [multistage] Return empty block if MailboxRecv nodes have no paired MailboxSender (#11201)
---
 .../MultiStageBrokerRequestHandler.java            |  8 +++++++
 .../org/apache/pinot/query/QueryEnvironment.java   |  1 -
 .../pinot/query/planner/DispatchableSubPlan.java   | 15 ++++++++++++-
 .../planner/physical/DispatchablePlanMetadata.java | 25 ++++++++++++++++++----
 .../planner/physical/PinotDispatchPlanner.java     | 22 ++++++++++++++++++-
 .../apache/pinot/query/routing/WorkerManager.java  | 12 +++++++++++
 .../operator/BaseMailboxReceiveOperator.java       | 16 ++++++++------
 .../operator/MailboxReceiveOperatorTest.java       | 13 -----------
 .../operator/SortedMailboxReceiveOperatorTest.java | 14 ------------
 .../plan/pipeline/PipelineBreakerExecutorTest.java | 23 +++++++-------------
 10 files changed, 93 insertions(+), 56 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 962f820350..8bbf955015 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -20,6 +20,7 @@ package org.apache.pinot.broker.requesthandler;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +47,7 @@ import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
 import org.apache.pinot.common.response.broker.BrokerResponseStats;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.NamedThreadFactory;
@@ -242,6 +244,12 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
     brokerResponse.setTimeUsedMs(totalTimeMs);
     brokerResponse.setResultTable(queryResults);
 
+    dispatchableSubPlan.getTableToUnavailableSegmentsMap().forEach(
+        (table, segmentList) -> brokerResponse.addToExceptions(
+            new QueryProcessingException(QueryException.SERVER_SEGMENT_MISSING_ERROR_CODE,
+                String.format("Some segments are unavailable for table %s, unavailable segments: [%s]", table,
+                    Arrays.toString(segmentList.toArray())))));
+
     for (Map.Entry<Integer, ExecutionStatsAggregator> entry : stageIdStatsMap.entrySet()) {
       if (entry.getKey() == 0) {
         // Root stats are aggregated and added separately to broker response for backward compatibility
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 5963ba5f98..96974b9e40 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -351,7 +351,6 @@ public class QueryEnvironment {
 
     PinotDispatchPlanner pinotDispatchPlanner =
         new PinotDispatchPlanner(plannerContext, _workerManager, requestId, _tableCache);
-    pinotDispatchPlanner.createDispatchableSubPlan(subPlanRoot);
     return pinotDispatchPlanner.createDispatchableSubPlan(subPlanRoot);
   }
 
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java
index 734698c122..4f99820351 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.query.planner;
 
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.calcite.util.Pair;
 
@@ -41,11 +43,14 @@ public class DispatchableSubPlan {
   private final List<DispatchablePlanFragment> _queryStageList;
   private final Set<String> _tableNames;
 
+  private final Map<String, Collection<String>> _tableToUnavailableSegmentsMap;
+
   public DispatchableSubPlan(List<Pair<Integer, String>> fields, List<DispatchablePlanFragment> queryStageList,
-      Set<String> tableNames) {
+      Set<String> tableNames, Map<String, Collection<String>> tableToUnavailableSegmentsMap) {
     _queryResultFields = fields;
     _queryStageList = queryStageList;
     _tableNames = tableNames;
+    _tableToUnavailableSegmentsMap = tableToUnavailableSegmentsMap;
   }
 
   /**
@@ -71,4 +76,12 @@ public class DispatchableSubPlan {
   public Set<String> getTableNames() {
     return _tableNames;
   }
+
+  /**
+   * Get the table to unavailable segments map
+   * @return table to unavailable segments map
+   */
+  public Map<String, Collection<String>> getTableToUnavailableSegmentsMap() {
+    return _tableToUnavailableSegmentsMap;
+  }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index 36d677d362..7e0e1cdab1 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -20,7 +20,9 @@ package org.apache.pinot.query.planner.physical;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
@@ -39,7 +41,7 @@ import org.apache.pinot.query.routing.QueryServerInstance;
  * </ul>
  */
 public class DispatchablePlanMetadata implements Serializable {
-  private List<String> _scannedTables;
+  private final List<String> _scannedTables;
 
   // used for assigning server/worker nodes.
   private Map<QueryServerInstance, List<Integer>> _serverInstanceToWorkerIdMap;
@@ -51,7 +53,10 @@ public class DispatchablePlanMetadata implements Serializable {
 
   // used for build mailboxes between workers.
   // workerId -> {planFragmentId -> mailbox list}
-  private Map<Integer, Map<Integer, MailboxMetadata>> _workerIdToMailboxesMap;
+  private final Map<Integer, Map<Integer, MailboxMetadata>> _workerIdToMailboxesMap;
+
+  // used for tracking unavailable segments from routing table, then assemble missing segments exception.
+  private final Map<String, Collection<String>> _tableToUnavailableSegmentsMap;
 
   // time boundary info
   private TimeBoundaryInfo _timeBoundaryInfo;
@@ -69,6 +74,7 @@ public class DispatchablePlanMetadata implements Serializable {
     _workerIdToMailboxesMap = new HashMap<>();
     _timeBoundaryInfo = null;
     _requiresSingletonInstance = false;
+    _tableToUnavailableSegmentsMap = new HashMap<>();
   }
 
   public List<String> getScannedTables() {
@@ -137,11 +143,22 @@ public class DispatchablePlanMetadata implements Serializable {
     _totalWorkerCount = totalWorkerCount;
   }
 
+  public void addTableToUnavailableSegmentsMap(String table, Collection<String> unavailableSegments) {
+    if (!_tableToUnavailableSegmentsMap.containsKey(table)) {
+      _tableToUnavailableSegmentsMap.put(table, new HashSet<>());
+    }
+    _tableToUnavailableSegmentsMap.get(table).addAll(unavailableSegments);
+  }
+
+  public Map<String, Collection<String>> getTableToUnavailableSegmentsMap() {
+    return _tableToUnavailableSegmentsMap;
+  }
+
   @Override
   public String toString() {
     return "DispatchablePlanMetadata{" + "_scannedTables=" + _scannedTables + ", _serverInstanceToWorkerIdMap="
         + _serverInstanceToWorkerIdMap + ", _workerIdToSegmentsMap=" + _workerIdToSegmentsMap
-        + ", _workerIdToMailboxesMap=" + _workerIdToMailboxesMap
-        + ", _timeBoundaryInfo=" + _timeBoundaryInfo + '}';
+        + ", _workerIdToMailboxesMap=" + _workerIdToMailboxesMap + ", _tableToUnavailableSegmentsMap="
+        + _tableToUnavailableSegmentsMap + ", _timeBoundaryInfo=" + _timeBoundaryInfo + '}';
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
index 521a99f39c..35c0a99ef4 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pinot.query.planner.physical;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.query.context.PlannerContext;
 import org.apache.pinot.query.planner.DispatchableSubPlan;
@@ -84,6 +88,22 @@ public class PinotDispatchPlanner {
       DispatchablePlanContext dispatchablePlanContext) {
     return new DispatchableSubPlan(dispatchablePlanContext.getResultFields(),
         dispatchablePlanContext.constructDispatchablePlanFragmentList(subPlanRoot),
-        dispatchablePlanContext.getTableNames());
+        dispatchablePlanContext.getTableNames(),
+        populateTableUnavailableSegments(dispatchablePlanContext.getDispatchablePlanMetadataMap()));
+  }
+
+  private static Map<String, Collection<String>> populateTableUnavailableSegments(
+      Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap) {
+    Map<String, Collection<String>> tableToUnavailableSegments = new HashMap<>();
+    dispatchablePlanMetadataMap.values()
+        .forEach(dispatchablePlanMetadata -> dispatchablePlanMetadata.getTableToUnavailableSegmentsMap().forEach(
+            (table, segments) -> {
+              if (!tableToUnavailableSegments.containsKey(table)) {
+                tableToUnavailableSegments.put(table, new HashSet<>());
+              }
+              tableToUnavailableSegments.get(table).addAll(segments);
+            }
+        ));
+    return tableToUnavailableSegments;
   }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 76e596a110..48506ecfab 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.routing;
 
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -129,6 +130,11 @@ public class WorkerManager {
         Preconditions.checkState(tableTypeToSegmentListMap.put(tableType, serverEntry.getValue()) == null,
             "Entry for server {} and table type: {} already exist!", serverEntry.getKey(), tableType);
       }
+
+      // attach unavailable segments to metadata
+      if (!routingTable.getUnavailableSegments().isEmpty()) {
+        metadata.addTableToUnavailableSegmentsMap(logicalTableName, routingTable.getUnavailableSegments());
+      }
     }
     int globalIdx = 0;
     Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = new HashMap<>();
@@ -212,6 +218,12 @@ public class WorkerManager {
     } else {
       serverInstances = fetchServersForIntermediateStage(tableNames);
     }
+    if (serverInstances.isEmpty()) {
+      LOGGER.error("[RequestId: {}] No server instance found for intermediate stage for tables: {}",
+          context.getRequestId(), tableNames);
+      throw new IllegalStateException(
+          "No server instance found for intermediate stage for tables: " + Arrays.toString(tableNames.toArray()));
+    }
     DispatchablePlanMetadata metadata = context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
     Map<String, String> options = context.getPlannerContext().getOptions();
     int stageParallelism = Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 97ae1a5e80..6612837c41 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -59,13 +59,15 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {
     int workerId = context.getServer().workerId();
     MailboxMetadata senderMailBoxMetadatas =
         context.getStageMetadata().getWorkerMetadataList().get(workerId).getMailBoxInfosMap().get(senderStageId);
-    Preconditions.checkState(senderMailBoxMetadatas != null && !senderMailBoxMetadatas.getMailBoxIdList().isEmpty(),
-        "Failed to find mailbox for stage: %s",
-        senderStageId);
-    _mailboxIds = MailboxIdUtils.toMailboxIds(requestId, senderMailBoxMetadatas);
-    _mailboxes = _mailboxIds.stream()
-        .map(mailboxId -> _mailboxService.getReceivingMailbox(mailboxId))
-        .collect(Collectors.toCollection(ArrayDeque::new));
+    if (senderMailBoxMetadatas != null && !senderMailBoxMetadatas.getMailBoxIdList().isEmpty()) {
+      _mailboxIds = MailboxIdUtils.toMailboxIds(requestId, senderMailBoxMetadatas);
+      _mailboxes = _mailboxIds.stream()
+          .map(mailboxId -> _mailboxService.getReceivingMailbox(mailboxId))
+          .collect(Collectors.toCollection(ArrayDeque::new));
+    } else {
+      _mailboxIds = Collections.emptyList();
+      _mailboxes = new ArrayDeque<>();
+    }
   }
 
   public List<String> getMailboxIds() {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 48ace11a3f..4edbd0bd3b 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -102,19 +102,6 @@ public class MailboxReceiveOperatorTest {
     _mocks.close();
   }
 
-  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "Failed to find mailbox.*")
-  public void shouldThrowSingletonNoMatchMailboxServer() {
-    VirtualServerAddress server1 = new VirtualServerAddress("localhost", 456, 0);
-    VirtualServerAddress server2 = new VirtualServerAddress("localhost", 789, 1);
-    StageMetadata stageMetadata = new StageMetadata.Builder().setWorkerMetadataList(
-        Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build())
-            .collect(Collectors.toList())).build();
-    OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata);
-    //noinspection resource
-    new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1);
-  }
-
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
   public void shouldThrowRangeDistributionNotSupported() {
     OpChainExecutionContext context =
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 80cf7ed1c4..b03e41eac0 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -112,20 +112,6 @@ public class SortedMailboxReceiveOperatorTest {
     _mocks.close();
   }
 
-  @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "Failed to find mailbox.*")
-  public void shouldThrowSingletonNoMatchMailboxServer() {
-    VirtualServerAddress server1 = new VirtualServerAddress("localhost", 456, 0);
-    VirtualServerAddress server2 = new VirtualServerAddress("localhost", 789, 1);
-    StageMetadata stageMetadata = new StageMetadata.Builder().setWorkerMetadataList(
-        Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build())
-            .collect(Collectors.toList())).build();
-    OpChainExecutionContext context =
-        OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata);
-    //noinspection resource
-    new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS,
-        COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
-  }
-
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
   public void shouldThrowRangeDistributionNotSupported() {
     OpChainExecutionContext context =
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index 6680b56942..a9aeb1ec2a 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -186,7 +186,7 @@ public class PipelineBreakerExecutorTest {
   }
 
   @Test
-  public void shouldReturnErrorBlocksFailureWhenPBExecute() {
+  public void shouldReturnEmptyBlockWhenPBExecuteWithIncorrectMailboxNode() {
     MailboxReceiveNode incorrectlyConfiguredMailboxNode =
         new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
@@ -203,12 +203,9 @@ public class PipelineBreakerExecutorTest {
     Assert.assertNotNull(pipelineBreakerResult);
     Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
     List<TransferableBlock> resultBlocks = pipelineBreakerResult.getResultMap().values().iterator().next();
-    Assert.assertEquals(resultBlocks.size(), 1);
-    Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
-    Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
+    Assert.assertEquals(resultBlocks.size(), 0);
 
-    // should have null stats from previous stage here
-    Assert.assertNull(pipelineBreakerResult.getOpChainStats());
+    Assert.assertNotNull(pipelineBreakerResult.getOpChainStats());
   }
 
   @Test
@@ -242,7 +239,7 @@ public class PipelineBreakerExecutorTest {
   }
 
   @Test
-  public void shouldReturnErrorBlocksWhenAnyPBFailure() {
+  public void shouldReturnWhenAnyPBReturnsEmpty() {
     MailboxReceiveNode mailboxReceiveNode1 =
         new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
@@ -270,17 +267,13 @@ public class PipelineBreakerExecutorTest {
             System.currentTimeMillis() + 10_000L, 0, false);
 
     // then
-    // should fail even if one of the 2 PB returns correct results.
+    // should pass when one PB returns result, the other returns empty.
     Assert.assertNotNull(pipelineBreakerResult);
     Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 2);
-    for (List<TransferableBlock> resultBlocks : pipelineBreakerResult.getResultMap().values()) {
-      Assert.assertEquals(resultBlocks.size(), 1);
-      Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
-      Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
-    }
+    Assert.assertEquals(pipelineBreakerResult.getResultMap().get(0).size(), 1);
+    Assert.assertEquals(pipelineBreakerResult.getResultMap().get(1).size(), 0);
 
-    // should have null stats from previous stage here
-    Assert.assertNull(pipelineBreakerResult.getOpChainStats());
+    Assert.assertNotNull(pipelineBreakerResult.getOpChainStats());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org