You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2023/07/29 01:01:16 UTC
[pinot] branch master updated: [multistage] Return empty block if MailboxRecv nodes have no paired MailboxSender (#11201)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 019f4c5fbf [multistage] Return empty block if MailboxRecv nodes have no paired MailboxSender (#11201)
019f4c5fbf is described below
commit 019f4c5fbf5b6976c61cd9f68247871dcf08163e
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Fri Jul 28 18:01:10 2023 -0700
[multistage] Return empty block if MailboxRecv nodes have no paired MailboxSender (#11201)
---
.../MultiStageBrokerRequestHandler.java | 8 +++++++
.../org/apache/pinot/query/QueryEnvironment.java | 1 -
.../pinot/query/planner/DispatchableSubPlan.java | 15 ++++++++++++-
.../planner/physical/DispatchablePlanMetadata.java | 25 ++++++++++++++++++----
.../planner/physical/PinotDispatchPlanner.java | 22 ++++++++++++++++++-
.../apache/pinot/query/routing/WorkerManager.java | 12 +++++++++++
.../operator/BaseMailboxReceiveOperator.java | 16 ++++++++------
.../operator/MailboxReceiveOperatorTest.java | 13 -----------
.../operator/SortedMailboxReceiveOperatorTest.java | 14 ------------
.../plan/pipeline/PipelineBreakerExecutorTest.java | 23 +++++++-------------
10 files changed, 93 insertions(+), 56 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 962f820350..8bbf955015 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -20,6 +20,7 @@ package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -46,6 +47,7 @@ import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.apache.pinot.common.response.broker.BrokerResponseStats;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.NamedThreadFactory;
@@ -242,6 +244,12 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
brokerResponse.setTimeUsedMs(totalTimeMs);
brokerResponse.setResultTable(queryResults);
+ dispatchableSubPlan.getTableToUnavailableSegmentsMap().forEach(
+ (table, segmentList) -> brokerResponse.addToExceptions(
+ new QueryProcessingException(QueryException.SERVER_SEGMENT_MISSING_ERROR_CODE,
+ String.format("Some segments are unavailable for table %s, unavailable segments: [%s]", table,
+ Arrays.toString(segmentList.toArray())))));
+
for (Map.Entry<Integer, ExecutionStatsAggregator> entry : stageIdStatsMap.entrySet()) {
if (entry.getKey() == 0) {
// Root stats are aggregated and added separately to broker response for backward compatibility
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 5963ba5f98..96974b9e40 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -351,7 +351,6 @@ public class QueryEnvironment {
PinotDispatchPlanner pinotDispatchPlanner =
new PinotDispatchPlanner(plannerContext, _workerManager, requestId, _tableCache);
- pinotDispatchPlanner.createDispatchableSubPlan(subPlanRoot);
return pinotDispatchPlanner.createDispatchableSubPlan(subPlanRoot);
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java
index 734698c122..4f99820351 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchableSubPlan.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.query.planner;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.calcite.util.Pair;
@@ -41,11 +43,14 @@ public class DispatchableSubPlan {
private final List<DispatchablePlanFragment> _queryStageList;
private final Set<String> _tableNames;
+ private final Map<String, Collection<String>> _tableToUnavailableSegmentsMap;
+
public DispatchableSubPlan(List<Pair<Integer, String>> fields, List<DispatchablePlanFragment> queryStageList,
- Set<String> tableNames) {
+ Set<String> tableNames, Map<String, Collection<String>> tableToUnavailableSegmentsMap) {
_queryResultFields = fields;
_queryStageList = queryStageList;
_tableNames = tableNames;
+ _tableToUnavailableSegmentsMap = tableToUnavailableSegmentsMap;
}
/**
@@ -71,4 +76,12 @@ public class DispatchableSubPlan {
public Set<String> getTableNames() {
return _tableNames;
}
+
+ /**
+ * Get the table to unavailable segments map
+ * @return table to unavailable segments map
+ */
+ public Map<String, Collection<String>> getTableToUnavailableSegmentsMap() {
+ return _tableToUnavailableSegmentsMap;
+ }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index 36d677d362..7e0e1cdab1 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -20,7 +20,9 @@ package org.apache.pinot.query.planner.physical;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
@@ -39,7 +41,7 @@ import org.apache.pinot.query.routing.QueryServerInstance;
* </ul>
*/
public class DispatchablePlanMetadata implements Serializable {
- private List<String> _scannedTables;
+ private final List<String> _scannedTables;
// used for assigning server/worker nodes.
private Map<QueryServerInstance, List<Integer>> _serverInstanceToWorkerIdMap;
@@ -51,7 +53,10 @@ public class DispatchablePlanMetadata implements Serializable {
// used for build mailboxes between workers.
// workerId -> {planFragmentId -> mailbox list}
- private Map<Integer, Map<Integer, MailboxMetadata>> _workerIdToMailboxesMap;
+ private final Map<Integer, Map<Integer, MailboxMetadata>> _workerIdToMailboxesMap;
+
+ // used for tracking unavailable segments from routing table, then assemble missing segments exception.
+ private final Map<String, Collection<String>> _tableToUnavailableSegmentsMap;
// time boundary info
private TimeBoundaryInfo _timeBoundaryInfo;
@@ -69,6 +74,7 @@ public class DispatchablePlanMetadata implements Serializable {
_workerIdToMailboxesMap = new HashMap<>();
_timeBoundaryInfo = null;
_requiresSingletonInstance = false;
+ _tableToUnavailableSegmentsMap = new HashMap<>();
}
public List<String> getScannedTables() {
@@ -137,11 +143,22 @@ public class DispatchablePlanMetadata implements Serializable {
_totalWorkerCount = totalWorkerCount;
}
+ public void addTableToUnavailableSegmentsMap(String table, Collection<String> unavailableSegments) {
+ if (!_tableToUnavailableSegmentsMap.containsKey(table)) {
+ _tableToUnavailableSegmentsMap.put(table, new HashSet<>());
+ }
+ _tableToUnavailableSegmentsMap.get(table).addAll(unavailableSegments);
+ }
+
+ public Map<String, Collection<String>> getTableToUnavailableSegmentsMap() {
+ return _tableToUnavailableSegmentsMap;
+ }
+
@Override
public String toString() {
return "DispatchablePlanMetadata{" + "_scannedTables=" + _scannedTables + ", _serverInstanceToWorkerIdMap="
+ _serverInstanceToWorkerIdMap + ", _workerIdToSegmentsMap=" + _workerIdToSegmentsMap
- + ", _workerIdToMailboxesMap=" + _workerIdToMailboxesMap
- + ", _timeBoundaryInfo=" + _timeBoundaryInfo + '}';
+ + ", _workerIdToMailboxesMap=" + _workerIdToMailboxesMap + ", _tableToUnavailableSegmentsMap="
+ + _tableToUnavailableSegmentsMap + ", _timeBoundaryInfo=" + _timeBoundaryInfo + '}';
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
index 521a99f39c..35c0a99ef4 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java
@@ -18,6 +18,10 @@
*/
package org.apache.pinot.query.planner.physical;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.DispatchableSubPlan;
@@ -84,6 +88,22 @@ public class PinotDispatchPlanner {
DispatchablePlanContext dispatchablePlanContext) {
return new DispatchableSubPlan(dispatchablePlanContext.getResultFields(),
dispatchablePlanContext.constructDispatchablePlanFragmentList(subPlanRoot),
- dispatchablePlanContext.getTableNames());
+ dispatchablePlanContext.getTableNames(),
+ populateTableUnavailableSegments(dispatchablePlanContext.getDispatchablePlanMetadataMap()));
+ }
+
+ private static Map<String, Collection<String>> populateTableUnavailableSegments(
+ Map<Integer, DispatchablePlanMetadata> dispatchablePlanMetadataMap) {
+ Map<String, Collection<String>> tableToUnavailableSegments = new HashMap<>();
+ dispatchablePlanMetadataMap.values()
+ .forEach(dispatchablePlanMetadata -> dispatchablePlanMetadata.getTableToUnavailableSegmentsMap().forEach(
+ (table, segments) -> {
+ if (!tableToUnavailableSegments.containsKey(table)) {
+ tableToUnavailableSegments.put(table, new HashSet<>());
+ }
+ tableToUnavailableSegments.get(table).addAll(segments);
+ }
+ ));
+ return tableToUnavailableSegments;
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 76e596a110..48506ecfab 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.routing;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -129,6 +130,11 @@ public class WorkerManager {
Preconditions.checkState(tableTypeToSegmentListMap.put(tableType, serverEntry.getValue()) == null,
"Entry for server {} and table type: {} already exist!", serverEntry.getKey(), tableType);
}
+
+ // attach unavailable segments to metadata
+ if (!routingTable.getUnavailableSegments().isEmpty()) {
+ metadata.addTableToUnavailableSegmentsMap(logicalTableName, routingTable.getUnavailableSegments());
+ }
}
int globalIdx = 0;
Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = new HashMap<>();
@@ -212,6 +218,12 @@ public class WorkerManager {
} else {
serverInstances = fetchServersForIntermediateStage(tableNames);
}
+ if (serverInstances.isEmpty()) {
+ LOGGER.error("[RequestId: {}] No server instance found for intermediate stage for tables: {}",
+ context.getRequestId(), tableNames);
+ throw new IllegalStateException(
+ "No server instance found for intermediate stage for tables: " + Arrays.toString(tableNames.toArray()));
+ }
DispatchablePlanMetadata metadata = context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
Map<String, String> options = context.getPlannerContext().getOptions();
int stageParallelism = Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 97ae1a5e80..6612837c41 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -59,13 +59,15 @@ public abstract class BaseMailboxReceiveOperator extends MultiStageOperator {
int workerId = context.getServer().workerId();
MailboxMetadata senderMailBoxMetadatas =
context.getStageMetadata().getWorkerMetadataList().get(workerId).getMailBoxInfosMap().get(senderStageId);
- Preconditions.checkState(senderMailBoxMetadatas != null && !senderMailBoxMetadatas.getMailBoxIdList().isEmpty(),
- "Failed to find mailbox for stage: %s",
- senderStageId);
- _mailboxIds = MailboxIdUtils.toMailboxIds(requestId, senderMailBoxMetadatas);
- _mailboxes = _mailboxIds.stream()
- .map(mailboxId -> _mailboxService.getReceivingMailbox(mailboxId))
- .collect(Collectors.toCollection(ArrayDeque::new));
+ if (senderMailBoxMetadatas != null && !senderMailBoxMetadatas.getMailBoxIdList().isEmpty()) {
+ _mailboxIds = MailboxIdUtils.toMailboxIds(requestId, senderMailBoxMetadatas);
+ _mailboxes = _mailboxIds.stream()
+ .map(mailboxId -> _mailboxService.getReceivingMailbox(mailboxId))
+ .collect(Collectors.toCollection(ArrayDeque::new));
+ } else {
+ _mailboxIds = Collections.emptyList();
+ _mailboxes = new ArrayDeque<>();
+ }
}
public List<String> getMailboxIds() {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index 48ace11a3f..4edbd0bd3b 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -102,19 +102,6 @@ public class MailboxReceiveOperatorTest {
_mocks.close();
}
- @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "Failed to find mailbox.*")
- public void shouldThrowSingletonNoMatchMailboxServer() {
- VirtualServerAddress server1 = new VirtualServerAddress("localhost", 456, 0);
- VirtualServerAddress server2 = new VirtualServerAddress("localhost", 789, 1);
- StageMetadata stageMetadata = new StageMetadata.Builder().setWorkerMetadataList(
- Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build())
- .collect(Collectors.toList())).build();
- OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata);
- //noinspection resource
- new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1);
- }
-
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
public void shouldThrowRangeDistributionNotSupported() {
OpChainExecutionContext context =
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 80cf7ed1c4..b03e41eac0 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -112,20 +112,6 @@ public class SortedMailboxReceiveOperatorTest {
_mocks.close();
}
- @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "Failed to find mailbox.*")
- public void shouldThrowSingletonNoMatchMailboxServer() {
- VirtualServerAddress server1 = new VirtualServerAddress("localhost", 456, 0);
- VirtualServerAddress server2 = new VirtualServerAddress("localhost", 789, 1);
- StageMetadata stageMetadata = new StageMetadata.Builder().setWorkerMetadataList(
- Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build())
- .collect(Collectors.toList())).build();
- OpChainExecutionContext context =
- OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata);
- //noinspection resource
- new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS,
- COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
- }
-
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
public void shouldThrowRangeDistributionNotSupported() {
OpChainExecutionContext context =
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index 6680b56942..a9aeb1ec2a 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -186,7 +186,7 @@ public class PipelineBreakerExecutorTest {
}
@Test
- public void shouldReturnErrorBlocksFailureWhenPBExecute() {
+ public void shouldReturnEmptyBlockWhenPBExecuteWithIncorrectMailboxNode() {
MailboxReceiveNode incorrectlyConfiguredMailboxNode =
new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
@@ -203,12 +203,9 @@ public class PipelineBreakerExecutorTest {
Assert.assertNotNull(pipelineBreakerResult);
Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
List<TransferableBlock> resultBlocks = pipelineBreakerResult.getResultMap().values().iterator().next();
- Assert.assertEquals(resultBlocks.size(), 1);
- Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
- Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
+ Assert.assertEquals(resultBlocks.size(), 0);
- // should have null stats from previous stage here
- Assert.assertNull(pipelineBreakerResult.getOpChainStats());
+ Assert.assertNotNull(pipelineBreakerResult.getOpChainStats());
}
@Test
@@ -242,7 +239,7 @@ public class PipelineBreakerExecutorTest {
}
@Test
- public void shouldReturnErrorBlocksWhenAnyPBFailure() {
+ public void shouldReturnWhenAnyPBReturnsEmpty() {
MailboxReceiveNode mailboxReceiveNode1 =
new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
null, null, false, false, null);
@@ -270,17 +267,13 @@ public class PipelineBreakerExecutorTest {
System.currentTimeMillis() + 10_000L, 0, false);
// then
- // should fail even if one of the 2 PB returns correct results.
+ // should pass when one PB returns result, the other returns empty.
Assert.assertNotNull(pipelineBreakerResult);
Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 2);
- for (List<TransferableBlock> resultBlocks : pipelineBreakerResult.getResultMap().values()) {
- Assert.assertEquals(resultBlocks.size(), 1);
- Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
- Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
- }
+ Assert.assertEquals(pipelineBreakerResult.getResultMap().get(0).size(), 1);
+ Assert.assertEquals(pipelineBreakerResult.getResultMap().get(1).size(), 0);
- // should have null stats from previous stage here
- Assert.assertNull(pipelineBreakerResult.getOpChainStats());
+ Assert.assertNotNull(pipelineBreakerResult.getOpChainStats());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org