You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/06/22 22:45:25 UTC
[pinot] branch master updated: [multistage][bugfix] fix pipeline breaker error block populate (#10957)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 24c5d8fe85 [multistage][bugfix] fix pipeline breaker error block populate (#10957)
24c5d8fe85 is described below
commit 24c5d8fe8521b91e68dd07294a3f57b067c0d525
Author: Rong Rong <ro...@apache.org>
AuthorDate: Thu Jun 22 15:45:17 2023 -0700
[multistage][bugfix] fix pipeline breaker error block populate (#10957)
* [clean up] make pipeline breaker executor always return without exception
* [clean up] refactor server plan request util entry point
* [test] add QueryRunner test as well as PB unit-tests
---------
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../apache/pinot/query/runtime/QueryRunner.java | 87 ++----
.../plan/pipeline/PipelineBreakerContext.java | 16 +-
.../plan/pipeline/PipelineBreakerExecutor.java | 36 ++-
.../plan/pipeline/PipelineBreakerOperator.java | 2 +-
.../plan/pipeline/PipelineBreakerResult.java | 3 +
.../plan/pipeline/PipelineBreakerVisitor.java | 2 +-
.../plan/server/ServerPlanRequestUtils.java | 51 +++-
.../pinot/query/runtime/QueryRunnerTest.java | 6 +
.../plan/pipeline/PipelineBreakerExecutorTest.java | 311 +++++++++++++++++++++
9 files changed, 427 insertions(+), 87 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index aa16f0b220..196df534d8 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -31,7 +31,6 @@ import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -42,7 +41,6 @@ import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.executor.RoundRobinScheduler;
import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
@@ -53,19 +51,14 @@ import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
import org.apache.pinot.query.service.QueryConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -139,8 +132,7 @@ public class QueryRunner {
* <p>This execution entry point should be asynchronously called by the request handler and caller should not wait
* for results/exceptions.</p>
*/
- public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap)
- throws Exception {
+ public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
boolean isTraceEnabled =
@@ -148,22 +140,14 @@ public class QueryRunner {
long deadlineMs = System.currentTimeMillis() + timeoutMs;
// run pre-stage execution for all pipeline breakers
- PipelineBreakerResult pipelineBreakerResult;
- try {
- pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService,
- distributedStagePlan, deadlineMs, requestId, isTraceEnabled);
- } catch (Exception e) {
- LOGGER.error("Error executing pre-stage pipeline breaker for: {}:{}", requestId,
- distributedStagePlan.getStageId(), e);
- _scheduler.cancel(requestId);
- throw e;
- }
+ PipelineBreakerResult pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
+ _mailboxService, distributedStagePlan, deadlineMs, requestId, isTraceEnabled);
// run OpChain
if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
try {
- OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, pipelineBreakerResult,
- timeoutMs, deadlineMs, requestId);
+ OpChain rootOperator = compileLeafStage(requestId, distributedStagePlan, requestMetadataMap,
+ pipelineBreakerResult, deadlineMs, isTraceEnabled);
_scheduler.register(rootOperator);
} catch (Exception e) {
LOGGER.error("Error executing leaf stage for: {}:{}", requestId, distributedStagePlan.getStageId(), e);
@@ -172,10 +156,8 @@ public class QueryRunner {
}
} else {
try {
- PlanNode stageRoot = distributedStagePlan.getStageRoot();
- OpChain rootOperator = PhysicalPlanVisitor.walkPlanNode(stageRoot,
- new PhysicalPlanContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), deadlineMs,
- distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled));
+ OpChain rootOperator = compileIntermediateStage(requestId, distributedStagePlan, requestMetadataMap,
+ pipelineBreakerResult, deadlineMs, isTraceEnabled);
_scheduler.register(rootOperator);
} catch (Exception e) {
LOGGER.error("Error executing intermediate stage for: {}:{}", requestId, distributedStagePlan.getStageId(), e);
@@ -199,15 +181,24 @@ public class QueryRunner {
return _queryWorkerIntermExecutorService;
}
- private OpChain compileLeafStage(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap,
- PipelineBreakerResult pipelineBreakerResult, long timeoutMs, long deadlineMs, long requestId) {
- boolean isTraceEnabled =
- Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
+ private OpChain compileIntermediateStage(long requestId, DistributedStagePlan distributedStagePlan,
+ Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult, long deadlineMs,
+ boolean isTraceEnabled) {
+ PlanNode stageRoot = distributedStagePlan.getStageRoot();
+ return PhysicalPlanVisitor.walkPlanNode(stageRoot,
+ new PhysicalPlanContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), deadlineMs,
+ distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), pipelineBreakerResult,
+ isTraceEnabled));
+ }
+
+ private OpChain compileLeafStage(long requestId, DistributedStagePlan distributedStagePlan,
+ Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult, long deadlineMs,
+ boolean isTraceEnabled) {
PhysicalPlanContext planContext = new PhysicalPlanContext(_mailboxService, requestId,
distributedStagePlan.getStageId(), deadlineMs, distributedStagePlan.getServer(),
distributedStagePlan.getStageMetadata(), pipelineBreakerResult, isTraceEnabled);
- List<ServerPlanRequestContext> serverPlanRequestContexts =
- constructServerQueryRequests(planContext, distributedStagePlan, requestMetadataMap, _helixPropertyStore);
+ List<ServerPlanRequestContext> serverPlanRequestContexts = ServerPlanRequestUtils.constructServerQueryRequests(
+ planContext, distributedStagePlan, requestMetadataMap, _helixPropertyStore);
List<ServerQueryRequest> serverQueryRequests = new ArrayList<>(serverPlanRequestContexts.size());
for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) {
serverQueryRequests.add(new ServerQueryRequest(requestContext.getInstanceRequest(),
@@ -225,40 +216,6 @@ public class QueryRunner {
return new OpChain(opChainExecutionContext, mailboxSendOperator, Collections.emptyList());
}
- private static List<ServerPlanRequestContext> constructServerQueryRequests(PhysicalPlanContext planContext,
- DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap,
- ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
- StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
- WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
- String rawTableName = StageMetadata.getTableName(stageMetadata);
- Map<String, List<String>> tableToSegmentListMap = WorkerMetadata.getTableSegmentsMap(workerMetadata);
- List<ServerPlanRequestContext> requests = new ArrayList<>();
- for (Map.Entry<String, List<String>> tableEntry : tableToSegmentListMap.entrySet()) {
- String tableType = tableEntry.getKey();
- // ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it should not cause too much out-of-the-box
- // network traffic. but there's chance to improve this:
- // TODO: use TableDataManager: it is already getting tableConfig and Schema when processing segments.
- if (TableType.OFFLINE.name().equals(tableType)) {
- TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
- TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
- Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
- TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
- requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig,
- schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, tableEntry.getValue()));
- } else if (TableType.REALTIME.name().equals(tableType)) {
- TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
- TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
- Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
- TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
- requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig,
- schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, tableEntry.getValue()));
- } else {
- throw new IllegalArgumentException("Unsupported table type key: " + tableType);
- }
- }
- return requests;
- }
-
private InstanceResponseBlock processServerQueryRequest(ServerQueryRequest request) {
InstanceResponseBlock result;
try {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java
index 110c8e22e5..b8b50746aa 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java
@@ -25,19 +25,15 @@ import org.apache.pinot.query.planner.plannode.PlanNode;
/**
- * This class used to record the pipeline breaker operator that needs to be run before the main opChain.
+ * This class used by {@link PipelineBreakerVisitor} as context to detect the {@link PlanNode} that needs to be run
+ * before the main opChain starts.
*/
-public class PipelineBreakerContext {
- private final Map<Integer, PlanNode> _pipelineBreakerMap = new HashMap<>();
+class PipelineBreakerContext {
private final Map<PlanNode, Integer> _planNodeObjectToIdMap = new HashMap<>();
+ private final Map<Integer, PlanNode> _pipelineBreakerMap = new HashMap<>();
- private final boolean _isLeafStage;
private int _currentNodeId = 0;
- public PipelineBreakerContext(boolean isLeafStage) {
- _isLeafStage = isLeafStage;
- }
-
public void addPipelineBreaker(MailboxReceiveNode mailboxReceiveNode) {
int nodeId = _planNodeObjectToIdMap.get(mailboxReceiveNode);
_pipelineBreakerMap.put(nodeId, mailboxReceiveNode);
@@ -55,8 +51,4 @@ public class PipelineBreakerContext {
public Map<Integer, PlanNode> getPipelineBreakerMap() {
return _pipelineBreakerMap;
}
-
- public boolean isLeafStage() {
- return _isLeafStage;
- }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index 0202127314..fa33192a64 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query.runtime.plan.pipeline;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,6 +30,7 @@ import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -48,18 +50,26 @@ public class PipelineBreakerExecutor {
}
/**
- * Execute a pipeline breaker and collect the results (synchronously)
+ * Execute a pipeline breaker and collect the results (synchronously). Currently, pipeline breaker executor can only
+ * execute mailbox receive pipeline breaker.
*
- * Currently, pipeline breaker executor can only execute mailbox receive pipeline breaker.
+ * @param scheduler scheduler service to run the pipeline breaker main thread.
+ * @param mailboxService mailbox service to attach the {@link MailboxReceiveNode} against.
+ * @param distributedStagePlan the distributed stage plan to run pipeline breaker on.
+ * @param deadlineMs execution deadline
+ * @param requestId request ID
+ * @param isTraceEnabled whether to enable trace.
+ * @return pipeline breaker result;
+ * - If exception occurs, exception block will be wrapped in {@link TransferableBlock} and assigned to each PB node.
+ * - Normal stats will be attached to each PB node and downstream execution should return with stats attached.
*/
public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler,
MailboxService mailboxService, DistributedStagePlan distributedStagePlan, long deadlineMs,
- long requestId, boolean isTraceEnabled)
- throws Exception {
- PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext(
- DistributedStagePlan.isLeafStage(distributedStagePlan));
+ long requestId, boolean isTraceEnabled) {
+ PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext();
PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(), pipelineBreakerContext);
if (pipelineBreakerContext.getPipelineBreakerMap().size() > 0) {
+ try {
PlanNode stageRoot = distributedStagePlan.getStageRoot();
// TODO: This PlanRequestContext needs to indicate it is a pre-stage opChain and only listens to pre-stage OpChain
// receive-mail callbacks.
@@ -70,6 +80,19 @@ public class PipelineBreakerExecutor {
Map<Integer, List<TransferableBlock>> resultMap =
PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext);
return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap);
+ } catch (Exception e) {
+ LOGGER.error("Unable to create pipeline breaker results for Req: " + requestId + ", Stage: "
+ + distributedStagePlan.getStageId(), e);
+ // Create all error blocks for all pipeline breaker nodes.
+ TransferableBlock errorBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
+ Map<Integer, List<TransferableBlock>> resultMap = new HashMap<>();
+ for (int key : pipelineBreakerContext.getNodeIdMap().values()) {
+ if (pipelineBreakerContext.getPipelineBreakerMap().containsKey(key)) {
+ resultMap.put(key, Collections.singletonList(errorBlock));
+ }
+ }
+ return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap);
+ }
} else {
return null;
}
@@ -82,7 +105,6 @@ public class PipelineBreakerExecutor {
for (Map.Entry<Integer, PlanNode> e : context.getPipelineBreakerMap().entrySet()) {
int key = e.getKey();
PlanNode planNode = e.getValue();
- // TODO: supprot other pipeline breaker node type as well.
if (!(planNode instanceof MailboxReceiveNode)) {
throw new UnsupportedOperationException("Only MailboxReceiveNode is supported to run as pipeline breaker now");
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
index 3c338ac1a8..438ce15a37 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -35,7 +35,7 @@ import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-public class PipelineBreakerOperator extends MultiStageOperator {
+class PipelineBreakerOperator extends MultiStageOperator {
private static final String EXPLAIN_NAME = "PIPELINE_BREAKER";
private final Deque<Map.Entry<Integer, Operator<TransferableBlock>>> _workerEntries;
private final Map<Integer, List<TransferableBlock>> _resultMap;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
index 7ebd1d0c8d..8388337ff1 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
@@ -24,6 +24,9 @@ import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+/**
+ * execution result encapsulation for {@link PipelineBreakerExecutor}.
+ */
public class PipelineBreakerResult {
private final Map<PlanNode, Integer> _nodeIdMap;
private final Map<Integer, List<TransferableBlock>> _resultMap;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
index 49add00a11..efa97d6638 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
@@ -25,7 +25,7 @@ import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
-public class PipelineBreakerVisitor extends DefaultPostOrderTraversalVisitor<Void, PipelineBreakerContext> {
+class PipelineBreakerVisitor extends DefaultPostOrderTraversalVisitor<Void, PipelineBreakerContext> {
private static final PlanNodeVisitor<Void, PipelineBreakerContext> INSTANCE = new PipelineBreakerVisitor();
public static void visitPlanRoot(PlanNode root, PipelineBreakerContext context) {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 13659ae810..616413f3ad 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -25,6 +25,9 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.DataSource;
import org.apache.pinot.common.request.Expression;
@@ -38,14 +41,17 @@ import org.apache.pinot.core.query.optimizer.QueryOptimizer;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.FilterKind;
import org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQueryRewriter;
import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter;
@@ -69,7 +75,50 @@ public class ServerPlanRequestUtils {
// do not instantiate.
}
- public static ServerPlanRequestContext build(PhysicalPlanContext planContext, DistributedStagePlan stagePlan,
+ /**
+ * Entry point to construct a {@link ServerPlanRequestContext} for executing leaf-stage runner.
+ *
+ * @param planContext physical plan context of the stage.
+ * @param distributedStagePlan distributed stage plan of the stage.
+ * @param requestMetadataMap metadata map
+ * @param helixPropertyStore helix property store used to fetch table config and schema for leaf-stage execution.
+ * @return a list of server plan request context to be run
+ */
+ public static List<ServerPlanRequestContext> constructServerQueryRequests(PhysicalPlanContext planContext,
+ DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap,
+ ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
+ StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
+ WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
+ String rawTableName = StageMetadata.getTableName(stageMetadata);
+ Map<String, List<String>> tableToSegmentListMap = WorkerMetadata.getTableSegmentsMap(workerMetadata);
+ List<ServerPlanRequestContext> requests = new ArrayList<>();
+ for (Map.Entry<String, List<String>> tableEntry : tableToSegmentListMap.entrySet()) {
+ String tableType = tableEntry.getKey();
+ // ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it should not cause too much out-of-the-box
+ // network traffic. but there's chance to improve this:
+ // TODO: use TableDataManager: it is already getting tableConfig and Schema when processing segments.
+ if (TableType.OFFLINE.name().equals(tableType)) {
+ TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
+ TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
+ Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
+ TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
+ requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig,
+ schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, tableEntry.getValue()));
+ } else if (TableType.REALTIME.name().equals(tableType)) {
+ TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
+ TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
+ Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
+ TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
+ requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig,
+ schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, tableEntry.getValue()));
+ } else {
+ throw new IllegalArgumentException("Unsupported table type key: " + tableType);
+ }
+ }
+ return requests;
+ }
+
+ private static ServerPlanRequestContext build(PhysicalPlanContext planContext, DistributedStagePlan stagePlan,
Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo,
TableType tableType, List<String> segmentList) {
// Before-visit: construct the ServerPlanRequestContext baseline
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 88081cb342..20bbf0f1ee 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -257,6 +257,12 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
@DataProvider(name = "testDataWithSqlExecutionExceptions")
private Object[][] provideTestSqlWithExecutionException() {
return new Object[][]{
+ // query hint with dynamic broadcast pipeline breaker should return error upstream
+ new Object[]{
+ "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ col1 FROM a "
+ + " WHERE a.col1 IN (SELECT b.col2 FROM b WHERE textMatch(col1, 'f')) AND a.col3 > 0",
+ "without text index"
+ },
// Timeout exception should occur with this option:
new Object[]{
"SET timeoutMs = 1; SELECT * FROM a JOIN b ON a.col1 = b.col1 JOIN c ON a.col1 = c.col1",
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
new file mode 100644
index 0000000000..56d78effe4
--- /dev/null
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.logical.PinotRelExchangeType;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.query.mailbox.MailboxIdUtils;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.routing.MailboxMetadata;
+import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.routing.WorkerMetadata;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
+import org.apache.pinot.query.runtime.executor.RoundRobinScheduler;
+import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
+import static org.mockito.Mockito.when;
+
+
+public class PipelineBreakerExecutorTest {
+ private static final VirtualServerAddress RECEIVER_ADDRESS = new VirtualServerAddress("localhost", 123, 0);
+ private static final DataSchema DATA_SCHEMA =
+ new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT});
+ private static final String MAILBOX_ID_1 = MailboxIdUtils.toMailboxId(0, 1, 0, 0, 0);
+ private static final String MAILBOX_ID_2 = MailboxIdUtils.toMailboxId(0, 2, 0, 0, 0);
+
+ private AutoCloseable _mocks;
+ @Mock
+ private MailboxService _mailboxService;
+ @Mock
+ private ReceivingMailbox _mailbox1;
+ @Mock
+ private ReceivingMailbox _mailbox2;
+
+ private VirtualServerAddress _server = new VirtualServerAddress("localhost", 123, 0);
+ private OpChainSchedulerService _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(10_000L),
+ Executors.newCachedThreadPool());
+ private StageMetadata _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(_server).map(
+ s -> new WorkerMetadata.Builder().setVirtualServerAddress(s)
+ .addMailBoxInfoMap(0, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+ org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)),
+ ImmutableList.of(_server), ImmutableMap.of()))
+ .addMailBoxInfoMap(1, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+ ImmutableList.of(_server), ImmutableMap.of()))
+ .addMailBoxInfoMap(2, new MailboxMetadata(
+ ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(2, 0, 0, 0)),
+ ImmutableList.of(_server), ImmutableMap.of()))
+ .build())
+ .collect(Collectors.toList())).build();
+
+ @BeforeClass
+ public void setUpClass() {
+ _scheduler.startAsync();
+ }
+
+ @AfterClass
+ public void tearDownClass() {
+ _scheduler.stopAsync();
+ }
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ when(_mailboxService.getHostname()).thenReturn("localhost");
+ when(_mailboxService.getPort()).thenReturn(123);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
+ @Test
+ public void shouldReturnBlocksUponNormalOperation() {
+ MailboxReceiveNode mailboxReceiveNode =
+ new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+ null, null, false, false, null);
+ DistributedStagePlan distributedStagePlan =
+ new DistributedStagePlan(0, RECEIVER_ADDRESS, mailboxReceiveNode, _stageMetadata1);
+
+ // when
+ when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
+ Object[] row1 = new Object[]{1, 1};
+ Object[] row2 = new Object[]{2, 3};
+ when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1),
+ OperatorTestUtil.block(DATA_SCHEMA, row2),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ PipelineBreakerResult pipelineBreakerResult =
+ PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
+ System.currentTimeMillis() + 10_000L, 0, false);
+
+ // then
+ // should have single PB result, receive 2 data blocks, EOS block shouldn't be included
+ Assert.assertNotNull(pipelineBreakerResult);
+ Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
+ Assert.assertEquals(pipelineBreakerResult.getResultMap().values().iterator().next().size(), 2);
+ }
+
+ @Test
+ public void shouldWorkWithMultiplePBNodeUponNormalOperation() {
+ MailboxReceiveNode mailboxReceiveNode1 =
+ new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+ null, null, false, false, null);
+ MailboxReceiveNode mailboxReceiveNode2 =
+ new MailboxReceiveNode(0, DATA_SCHEMA, 2, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+ null, null, false, false, null);
+ JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, false);
+ joinNode.addInput(mailboxReceiveNode1);
+ joinNode.addInput(mailboxReceiveNode2);
+ DistributedStagePlan distributedStagePlan =
+ new DistributedStagePlan(0, RECEIVER_ADDRESS, joinNode, _stageMetadata1);
+
+ // when
+ when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
+ when(_mailboxService.getReceivingMailbox(MAILBOX_ID_2)).thenReturn(_mailbox2);
+ Object[] row1 = new Object[]{1, 1};
+ Object[] row2 = new Object[]{2, 3};
+ when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row2),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ PipelineBreakerResult pipelineBreakerResult =
+ PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
+ System.currentTimeMillis() + 10_000L, 0, false);
+
+ // then
+ // should have two PB result, receive 2 data blocks, one each, EOS block shouldn't be included
+ Assert.assertNotNull(pipelineBreakerResult);
+ Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 2);
+ Iterator<List<TransferableBlock>> it = pipelineBreakerResult.getResultMap().values().iterator();
+ Assert.assertEquals(it.next().size(), 1);
+ Assert.assertEquals(it.next().size(), 1);
+ Assert.assertFalse(it.hasNext());
+ }
+
+ @Test
+ public void shouldReturnErrorBlocksFailureWhenPBExecute() {
+ MailboxReceiveNode incorrectlyConfiguredMailboxNode =
+ new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+ null, null, false, false, null);
+ DistributedStagePlan distributedStagePlan =
+ new DistributedStagePlan(0, RECEIVER_ADDRESS, incorrectlyConfiguredMailboxNode, _stageMetadata1);
+
+ // when
+ PipelineBreakerResult pipelineBreakerResult =
+ PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
+ System.currentTimeMillis() + 10_000L, 0, false);
+
+ // then
+ // should contain only failure error blocks
+ Assert.assertNotNull(pipelineBreakerResult);
+ Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
+ List<TransferableBlock> resultBlocks = pipelineBreakerResult.getResultMap().values().iterator().next();
+ Assert.assertEquals(resultBlocks.size(), 1);
+ Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
+ Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
+ }
+
+ @Test
+ public void shouldReturnErrorBlocksFailureWhenPBTimeout() {
+ MailboxReceiveNode incorrectlyConfiguredMailboxNode =
+ new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+ null, null, false, false, null);
+ DistributedStagePlan distributedStagePlan =
+ new DistributedStagePlan(0, RECEIVER_ADDRESS, incorrectlyConfiguredMailboxNode, _stageMetadata1);
+
+ // when
+ when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
+ Object[] row1 = new Object[]{1, 1};
+ Object[] row2 = new Object[]{2, 3};
+ when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1),
+ OperatorTestUtil.block(DATA_SCHEMA, row2),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ PipelineBreakerResult pipelineBreakerResult =
+ PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
+ System.currentTimeMillis() - 10_000L, 0, false);
+
+ // then
+ // should contain only failure error blocks
+ Assert.assertNotNull(pipelineBreakerResult);
+ Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 1);
+ List<TransferableBlock> resultBlocks = pipelineBreakerResult.getResultMap().values().iterator().next();
+ Assert.assertEquals(resultBlocks.size(), 1);
+ Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
+ Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
+ }
+
+ @Test
+ public void shouldReturnErrorBlocksWhenAnyPBFailure() {
+ MailboxReceiveNode mailboxReceiveNode1 =
+ new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+ null, null, false, false, null);
+ MailboxReceiveNode incorrectlyConfiguredMailboxNode =
+ new MailboxReceiveNode(0, DATA_SCHEMA, 3, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+ null, null, false, false, null);
+ JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, false);
+ joinNode.addInput(mailboxReceiveNode1);
+ joinNode.addInput(incorrectlyConfiguredMailboxNode);
+ DistributedStagePlan distributedStagePlan =
+ new DistributedStagePlan(0, RECEIVER_ADDRESS, joinNode, _stageMetadata1);
+
+ // when
+ when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
+ when(_mailboxService.getReceivingMailbox(MAILBOX_ID_2)).thenReturn(_mailbox2);
+ Object[] row1 = new Object[]{1, 1};
+ Object[] row2 = new Object[]{2, 3};
+ when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row2),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ PipelineBreakerResult pipelineBreakerResult =
+ PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
+ System.currentTimeMillis() + 10_000L, 0, false);
+
+ // then
+ // should fail even if one of the 2 PB returns correct results.
+ Assert.assertNotNull(pipelineBreakerResult);
+ Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 2);
+ for (List<TransferableBlock> resultBlocks : pipelineBreakerResult.getResultMap().values()) {
+ Assert.assertEquals(resultBlocks.size(), 1);
+ Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
+ Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
+ }
+ }
+
+ @Test
+ public void shouldReturnErrorBlocksWhenReceivedErrorFromSender() {
+ MailboxReceiveNode mailboxReceiveNode1 =
+ new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+ null, null, false, false, null);
+ MailboxReceiveNode incorrectlyConfiguredMailboxNode =
+ new MailboxReceiveNode(0, DATA_SCHEMA, 2, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
+ null, null, false, false, null);
+ JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, JoinRelType.INNER, null, null, false);
+ joinNode.addInput(mailboxReceiveNode1);
+ joinNode.addInput(incorrectlyConfiguredMailboxNode);
+ DistributedStagePlan distributedStagePlan =
+ new DistributedStagePlan(0, RECEIVER_ADDRESS, joinNode, _stageMetadata1);
+
+ // when
+ when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
+ when(_mailboxService.getReceivingMailbox(MAILBOX_ID_2)).thenReturn(_mailbox2);
+ Object[] row1 = new Object[]{1, 1};
+ Object[] row2 = new Object[]{2, 3};
+ when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1),
+ TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("ERROR ON 1")));
+ when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row2),
+ TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ PipelineBreakerResult pipelineBreakerResult =
+ PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan,
+ System.currentTimeMillis() + 10_000L, 0, false);
+
+ // then
+ // should fail even if one of the 2 PB doesn't contain error block from sender.
+ Assert.assertNotNull(pipelineBreakerResult);
+ Assert.assertEquals(pipelineBreakerResult.getResultMap().size(), 2);
+ for (List<TransferableBlock> resultBlocks : pipelineBreakerResult.getResultMap().values()) {
+ Assert.assertEquals(resultBlocks.size(), 1);
+ Assert.assertTrue(resultBlocks.get(0).isEndOfStreamBlock());
+ Assert.assertFalse(resultBlocks.get(0).isSuccessfulEndOfStreamBlock());
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org