You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/06/01 05:16:44 UTC
[pinot] branch master updated: [multistage] pipeline breaker framework and dynamic-broadcast pipeline breaker (#10779)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 714ea4ed64 [multistage] pipeline breaker framework and dynamic-broadcast pipeline breaker (#10779)
714ea4ed64 is described below
commit 714ea4ed645407d0e3b9509bc702fa5967b0046a
Author: Rong Rong <ro...@apache.org>
AuthorDate: Wed May 31 22:16:35 2023 -0700
[multistage] pipeline breaker framework and dynamic-broadcast pipeline breaker (#10779)
Pipeline breaker
---
1. Introduce `PipelineBreaker` package
- `PipelineBreaker` runs before the actual `OpChain` of a stage (PlanFragment + Stage/Worker metadata), it would compute/store all pending "sub-plan" results before the actual `OpChain` of a stage is **CONSTRUCTED**
- `PipelineBreaker` is not another `OpChain` the actual computation happens elsewhere.
- currently `PipelineBreaker` only supports MailboxReceive
2. Introduce compilation support for accepting data result placeholder in `PhysicalPlanVisitor` (and `ServerPlanRequestVisitor`)
- this way `OpChain` will be generated with the results of the PipelineBreaker as contextual input.
3. Also cleaned up the scheduler
- no need for 2 identical schedulers as they are all cached (leaf vs. intermediate)
- pipeline executor is now a static method, so it is easily plugged into different executor services
Dynamic broadcast SEMI JOIN
---
1. Created dynamic broadcast runtime based on #10772
2. Modified physical plan visitor to support semi join using dynamic broadcast
---------
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../apache/pinot/query/mailbox/MailboxIdUtils.java | 8 +-
.../apache/pinot/query/runtime/QueryRunner.java | 114 +++++----
.../pinot/query/runtime/operator/OpChainId.java | 4 +
.../query/runtime/plan/DistributedStagePlan.java | 8 +
.../runtime/plan/OpChainExecutionContext.java | 8 +-
...equestContext.java => PhysicalPlanContext.java} | 14 +-
.../query/runtime/plan/PhysicalPlanVisitor.java | 30 +--
.../plan/pipeline/PipelineBreakerContext.java | 62 +++++
.../plan/pipeline/PipelineBreakerExecutor.java | 101 ++++++++
.../plan/pipeline/PipelineBreakerOperator.java | 118 +++++++++
.../plan/pipeline/PipelineBreakerResult.java | 43 ++++
.../plan/pipeline/PipelineBreakerVisitor.java | 50 ++++
.../plan/server/ServerPlanRequestContext.java | 11 +-
.../ServerPlanRequestUtils.java} | 264 ++++++++++-----------
.../plan/server/ServerPlanRequestVisitor.java | 191 +++++++++++++++
.../apache/pinot/query/QueryServerEnclosure.java | 15 +-
.../pinot/query/runtime/QueryRunnerTest.java | 3 +
.../runtime/operator/LiteralValueOperatorTest.java | 4 +-
.../pinot/query/service/QueryServerTest.java | 4 -
.../service/dispatch/QueryDispatcherTest.java | 3 -
.../src/test/resources/queries/QueryHints.json | 20 ++
21 files changed, 846 insertions(+), 229 deletions(-)
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java
index f65de98118..02944c665a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java
@@ -45,8 +45,12 @@ public class MailboxIdUtils {
return new OpChainId(Long.parseLong(parts[0]), Integer.parseInt(parts[4]), Integer.parseInt(parts[3]));
}
- public static List<String> toMailboxIds(long requestId, MailboxMetadata senderMailBoxMetadatas) {
- return senderMailBoxMetadatas.getMailBoxIdList().stream()
+ public static List<String> toMailboxIds(long requestId, MailboxMetadata senderMailBoxMetadata) {
+ return toMailboxIds(requestId, senderMailBoxMetadata.getMailBoxIdList());
+ }
+
+ public static List<String> toMailboxIds(long requestId, List<String> mailboxMetadataIdList) {
+ return mailboxMetadataIdList.stream()
.map(mailboxIdFromBroker -> Long.toString(requestId) + SEPARATOR + mailboxIdFromBroker)
.collect(Collectors.toList());
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 94640ffae7..6fa02a216a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -42,7 +42,6 @@ import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.executor.RoundRobinScheduler;
@@ -52,11 +51,13 @@ import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
-import org.apache.pinot.query.runtime.plan.PlanRequestContext;
-import org.apache.pinot.query.runtime.plan.ServerRequestPlanVisitor;
import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
+import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -83,14 +84,11 @@ public class QueryRunner {
private MailboxService _mailboxService;
private String _hostname;
private int _port;
- private VirtualServerAddress _rootServer;
private ExecutorService _queryWorkerIntermExecutorService;
private ExecutorService _queryWorkerLeafExecutorService;
- private ExecutorService _queryRunnerExecutorService;
- private OpChainSchedulerService _intermScheduler;
- private OpChainSchedulerService _leafScheduler;
+ private OpChainSchedulerService _scheduler;
/**
* Initializes the query executor.
@@ -102,8 +100,6 @@ public class QueryRunner {
_hostname = instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? instanceName.substring(
CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
_port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
- // always use 0 for root server ID as all data is processed by one node at the global root
- _rootServer = new VirtualServerAddress(_hostname, _port, 0);
_helixManager = helixManager;
try {
long releaseMs = config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS,
@@ -112,12 +108,9 @@ public class QueryRunner {
new NamedThreadFactory("query_intermediate_worker_on_" + _port + "_port"));
_queryWorkerLeafExecutorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
new NamedThreadFactory("query_leaf_worker_on_" + _port + "_port"));
- _queryRunnerExecutorService = Executors.newCachedThreadPool(
- new NamedThreadFactory("query_runner_on_" + _port + "_port"));
- _intermScheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs),
+ _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs),
getQueryWorkerIntermExecutorService());
- _leafScheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), getQueryRunnerExecutorService());
- _mailboxService = new MailboxService(_hostname, _port, config, _intermScheduler::onDataAvailable);
+ _mailboxService = new MailboxService(_hostname, _port, config, _scheduler::onDataAvailable);
_serverExecutor = new ServerQueryExecutorV1Impl();
_serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics);
} catch (Exception e) {
@@ -130,39 +123,70 @@ public class QueryRunner {
_helixPropertyStore = _helixManager.getHelixPropertyStore();
_mailboxService.start();
_serverExecutor.start();
- _leafScheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS);
- _intermScheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS);
+ _scheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS);
}
public void shutDown()
throws TimeoutException {
_serverExecutor.shutDown();
_mailboxService.shutdown();
- _leafScheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
- _intermScheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
+ _scheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
}
- public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
+ /**
+ * Execute a {@link DistributedStagePlan}.
+ *
+ * <p>This execution entry point should be asynchronously called by the request handler and caller should not wait
+ * for results/exceptions.</p>
+ */
+ public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap)
+ throws Exception {
long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
boolean isTraceEnabled =
Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
long deadlineMs = System.currentTimeMillis() + timeoutMs;
- if (isLeafStage(distributedStagePlan)) {
- OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs,
- requestId);
- _leafScheduler.register(rootOperator);
+
+ // run pre-stage execution for all pipeline breakers
+ PipelineBreakerResult pipelineBreakerResult;
+ try {
+ pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService,
+ distributedStagePlan, timeoutMs, deadlineMs, requestId, isTraceEnabled);
+ } catch (Exception e) {
+ LOGGER.error("Error executing pre-stage pipeline breaker for: {}:{}", requestId,
+ distributedStagePlan.getStageId(), e);
+ _scheduler.cancel(requestId);
+ throw e;
+ }
+
+ // run OpChain
+ if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
+ try {
+ OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, pipelineBreakerResult,
+ timeoutMs, deadlineMs, requestId);
+ _scheduler.register(rootOperator);
+ } catch (Exception e) {
+ LOGGER.error("Error executing leaf stage for: {}:{}", requestId, distributedStagePlan.getStageId(), e);
+ _scheduler.cancel(requestId);
+ throw e;
+ }
} else {
- PlanNode stageRoot = distributedStagePlan.getStageRoot();
- OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
- new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
- distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), isTraceEnabled));
- _intermScheduler.register(rootOperator);
+ try {
+ PlanNode stageRoot = distributedStagePlan.getStageRoot();
+ OpChain rootOperator = PhysicalPlanVisitor.walkPlanNode(stageRoot,
+ new PhysicalPlanContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
+ distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled));
+ _scheduler.register(rootOperator);
+ } catch (Exception e) {
+ LOGGER.error("Error executing intermediate stage for: {}:{}", requestId, distributedStagePlan.getStageId(), e);
+ _scheduler.cancel(requestId);
+ throw e;
+ }
}
}
public void cancel(long requestId) {
- _intermScheduler.cancel(requestId);
+ _scheduler.cancel(requestId);
}
@VisibleForTesting
@@ -175,17 +199,13 @@ public class QueryRunner {
return _queryWorkerIntermExecutorService;
}
- public ExecutorService getQueryRunnerExecutorService() {
- return _queryRunnerExecutorService;
- }
-
private OpChain compileLeafStage(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap,
- long timeoutMs, long deadlineMs, long requestId) {
+ PipelineBreakerResult pipelineBreakerResult, long timeoutMs, long deadlineMs, long requestId) {
boolean isTraceEnabled =
Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
List<ServerPlanRequestContext> serverPlanRequestContexts =
- constructServerQueryRequests(distributedStagePlan, requestMetadataMap, _helixPropertyStore, _mailboxService,
- deadlineMs);
+ constructServerQueryRequests(distributedStagePlan, requestMetadataMap, pipelineBreakerResult,
+ _helixPropertyStore, _mailboxService, deadlineMs);
List<ServerQueryRequest> serverQueryRequests = new ArrayList<>(serverPlanRequestContexts.size());
for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) {
serverQueryRequests.add(new ServerQueryRequest(requestContext.getInstanceRequest(),
@@ -207,8 +227,8 @@ public class QueryRunner {
}
private static List<ServerPlanRequestContext> constructServerQueryRequests(DistributedStagePlan distributedStagePlan,
- Map<String, String> requestMetadataMap, ZkHelixPropertyStore<ZNRecord> helixPropertyStore,
- MailboxService mailboxService, long deadlineMs) {
+ Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult,
+ ZkHelixPropertyStore<ZNRecord> helixPropertyStore, MailboxService mailboxService, long deadlineMs) {
StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
String rawTableName = StageMetadata.getTableName(stageMetadata);
@@ -224,17 +244,17 @@ public class QueryRunner {
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
- requests.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap,
- tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE,
- tableEntry.getValue(), deadlineMs));
+ requests.add(ServerPlanRequestUtils.build(mailboxService, distributedStagePlan, requestMetadataMap,
+ pipelineBreakerResult, tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata),
+ TableType.OFFLINE, tableEntry.getValue(), deadlineMs));
} else if (TableType.REALTIME.name().equals(tableType)) {
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
- requests.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap,
- tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME,
- tableEntry.getValue(), deadlineMs));
+ requests.add(ServerPlanRequestUtils.build(mailboxService, distributedStagePlan, requestMetadataMap,
+ pipelineBreakerResult, tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata),
+ TableType.REALTIME, tableEntry.getValue(), deadlineMs));
} else {
throw new IllegalArgumentException("Unsupported table type key: " + tableType);
}
@@ -254,10 +274,4 @@ public class QueryRunner {
}
return result;
}
-
- private boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
- WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
- Map<String, List<String>> segments = WorkerMetadata.getTableSegmentsMap(workerMetadata);
- return segments != null && segments.size() > 0;
- }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java
index 50589cfca5..dae9c2f755 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java
@@ -36,6 +36,10 @@ public class OpChainId {
return _requestId;
}
+ public int getVirtualServerId() {
+ return _virtualServerId;
+ }
+
@Override
public String toString() {
return String.format("%s_%s_%s", _requestId, _virtualServerId, _stageId);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
index f8a5f5118b..2aa269e6aa 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.query.runtime.plan;
+import java.util.List;
+import java.util.Map;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.routing.WorkerMetadata;
@@ -78,4 +80,10 @@ public class DistributedStagePlan {
public WorkerMetadata getCurrentWorkerMetadata() {
return _stageMetadata.getWorkerMetadataList().get(_server.workerId());
}
+
+ public static boolean isLeafStage(DistributedStagePlan distributedStagePlan) {
+ WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
+ Map<String, List<String>> segments = WorkerMetadata.getTableSegmentsMap(workerMetadata);
+ return segments != null && segments.size() > 0;
+ }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index e16795891b..5da20c4e70 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -57,10 +57,10 @@ public class OpChainExecutionContext {
_traceEnabled = traceEnabled;
}
- public OpChainExecutionContext(PlanRequestContext planRequestContext) {
- this(planRequestContext.getMailboxService(), planRequestContext.getRequestId(), planRequestContext.getStageId(),
- planRequestContext.getServer(), planRequestContext.getTimeoutMs(), planRequestContext.getDeadlineMs(),
- planRequestContext.getStageMetadata(), planRequestContext.isTraceEnabled());
+ public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext) {
+ this(physicalPlanContext.getMailboxService(), physicalPlanContext.getRequestId(), physicalPlanContext.getStageId(),
+ physicalPlanContext.getServer(), physicalPlanContext.getTimeoutMs(), physicalPlanContext.getDeadlineMs(),
+ physicalPlanContext.getStageMetadata(), physicalPlanContext.isTraceEnabled());
}
public MailboxService getMailboxService() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
similarity index 82%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
index d0233c1578..2da5772930 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
@@ -22,9 +22,10 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
-public class PlanRequestContext {
+public class PhysicalPlanContext {
protected final MailboxService _mailboxService;
protected final long _requestId;
protected final int _stageId;
@@ -33,12 +34,14 @@ public class PlanRequestContext {
private final long _deadlineMs;
protected final VirtualServerAddress _server;
protected final StageMetadata _stageMetadata;
+ protected final PipelineBreakerResult _pipelineBreakerResult;
protected final List<String> _receivingMailboxIds = new ArrayList<>();
private final OpChainExecutionContext _opChainExecutionContext;
private final boolean _traceEnabled;
- public PlanRequestContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs, long deadlineMs,
- VirtualServerAddress server, StageMetadata stageMetadata, boolean traceEnabled) {
+ public PhysicalPlanContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs,
+ long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata,
+ PipelineBreakerResult pipelineBreakerResult, boolean traceEnabled) {
_mailboxService = mailboxService;
_requestId = requestId;
_stageId = stageId;
@@ -46,6 +49,7 @@ public class PlanRequestContext {
_deadlineMs = deadlineMs;
_server = server;
_stageMetadata = stageMetadata;
+ _pipelineBreakerResult = pipelineBreakerResult;
_traceEnabled = traceEnabled;
_opChainExecutionContext = new OpChainExecutionContext(this);
}
@@ -74,6 +78,10 @@ public class PlanRequestContext {
return _stageMetadata;
}
+ public PipelineBreakerResult getPipelineBreakerResult() {
+ return _pipelineBreakerResult;
+ }
+
public MailboxService getMailboxService() {
return _mailboxService;
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 8b98fa517a..68fcd82f67 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -56,19 +56,19 @@ import org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
* this works only for the intermediate stage nodes, leaf stage nodes are expected to compile into
* v1 operators at this point in time.
*
- * <p>This class should be used statically via {@link #build(PlanNode, PlanRequestContext)}
+ * <p>This class should be used statically via {@link #walkPlanNode(PlanNode, PhysicalPlanContext)}
*/
-public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, PlanRequestContext> {
+public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, PhysicalPlanContext> {
private static final PhysicalPlanVisitor INSTANCE = new PhysicalPlanVisitor();
- public static OpChain build(PlanNode node, PlanRequestContext context) {
+ public static OpChain walkPlanNode(PlanNode node, PhysicalPlanContext context) {
MultiStageOperator root = node.visit(INSTANCE, context);
return new OpChain(context.getOpChainExecutionContext(), root, context.getReceivingMailboxIds());
}
@Override
- public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, PlanRequestContext context) {
+ public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, PhysicalPlanContext context) {
if (node.isSortOnReceiver()) {
SortedMailboxReceiveOperator sortedMailboxReceiveOperator =
new SortedMailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(),
@@ -86,7 +86,7 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
}
@Override
- public MultiStageOperator visitMailboxSend(MailboxSendNode node, PlanRequestContext context) {
+ public MultiStageOperator visitMailboxSend(MailboxSendNode node, PhysicalPlanContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
return new MailboxSendOperator(context.getOpChainExecutionContext(), nextOperator, node.getExchangeType(),
node.getPartitionKeySelector(), node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(),
@@ -94,14 +94,14 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
}
@Override
- public MultiStageOperator visitAggregate(AggregateNode node, PlanRequestContext context) {
+ public MultiStageOperator visitAggregate(AggregateNode node, PhysicalPlanContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
return new AggregateOperator(context.getOpChainExecutionContext(), nextOperator, node.getDataSchema(),
node.getAggCalls(), node.getGroupSet(), node.getInputs().get(0).getDataSchema());
}
@Override
- public MultiStageOperator visitWindow(WindowNode node, PlanRequestContext context) {
+ public MultiStageOperator visitWindow(WindowNode node, PhysicalPlanContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
return new WindowAggregateOperator(context.getOpChainExecutionContext(), nextOperator, node.getGroupSet(),
node.getOrderSet(), node.getOrderSetDirection(), node.getOrderSetNullDirection(), node.getAggCalls(),
@@ -110,7 +110,7 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
}
@Override
- public MultiStageOperator visitSetOp(SetOpNode setOpNode, PlanRequestContext context) {
+ public MultiStageOperator visitSetOp(SetOpNode setOpNode, PhysicalPlanContext context) {
List<MultiStageOperator> inputs = new ArrayList<>();
for (PlanNode input : setOpNode.getInputs()) {
MultiStageOperator visited = input.visit(this, context);
@@ -132,19 +132,19 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
}
@Override
- public MultiStageOperator visitExchange(ExchangeNode exchangeNode, PlanRequestContext context) {
+ public MultiStageOperator visitExchange(ExchangeNode exchangeNode, PhysicalPlanContext context) {
throw new UnsupportedOperationException("ExchangeNode should not be visited");
}
@Override
- public MultiStageOperator visitFilter(FilterNode node, PlanRequestContext context) {
+ public MultiStageOperator visitFilter(FilterNode node, PhysicalPlanContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
return new FilterOperator(context.getOpChainExecutionContext(), nextOperator, node.getDataSchema(),
node.getCondition());
}
@Override
- public MultiStageOperator visitJoin(JoinNode node, PlanRequestContext context) {
+ public MultiStageOperator visitJoin(JoinNode node, PhysicalPlanContext context) {
PlanNode left = node.getInputs().get(0);
PlanNode right = node.getInputs().get(1);
@@ -156,14 +156,14 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
}
@Override
- public MultiStageOperator visitProject(ProjectNode node, PlanRequestContext context) {
+ public MultiStageOperator visitProject(ProjectNode node, PhysicalPlanContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
return new TransformOperator(context.getOpChainExecutionContext(), nextOperator, node.getDataSchema(),
node.getProjects(), node.getInputs().get(0).getDataSchema());
}
@Override
- public MultiStageOperator visitSort(SortNode node, PlanRequestContext context) {
+ public MultiStageOperator visitSort(SortNode node, PhysicalPlanContext context) {
MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context);
boolean isInputSorted = nextOperator instanceof SortedMailboxReceiveOperator;
return new SortOperator(context.getOpChainExecutionContext(), nextOperator, node.getCollationKeys(),
@@ -172,12 +172,12 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator,
}
@Override
- public MultiStageOperator visitTableScan(TableScanNode node, PlanRequestContext context) {
+ public MultiStageOperator visitTableScan(TableScanNode node, PhysicalPlanContext context) {
throw new UnsupportedOperationException("Stage node of type TableScanNode is not supported!");
}
@Override
- public MultiStageOperator visitValue(ValueNode node, PlanRequestContext context) {
+ public MultiStageOperator visitValue(ValueNode node, PhysicalPlanContext context) {
return new LiteralValueOperator(context.getOpChainExecutionContext(), node.getDataSchema(), node.getLiteralRows());
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java
new file mode 100644
index 0000000000..110c8e22e5
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+
+
+/**
+ * This class used to record the pipeline breaker operator that needs to be run before the main opChain.
+ */
+public class PipelineBreakerContext {
+ private final Map<Integer, PlanNode> _pipelineBreakerMap = new HashMap<>();
+ private final Map<PlanNode, Integer> _planNodeObjectToIdMap = new HashMap<>();
+
+ private final boolean _isLeafStage;
+ private int _currentNodeId = 0;
+
+ public PipelineBreakerContext(boolean isLeafStage) {
+ _isLeafStage = isLeafStage;
+ }
+
+ public void addPipelineBreaker(MailboxReceiveNode mailboxReceiveNode) {
+ int nodeId = _planNodeObjectToIdMap.get(mailboxReceiveNode);
+ _pipelineBreakerMap.put(nodeId, mailboxReceiveNode);
+ }
+
+ public void visitedNewPlanNode(PlanNode planNode) {
+ _planNodeObjectToIdMap.put(planNode, _currentNodeId);
+ _currentNodeId++;
+ }
+
+ public Map<PlanNode, Integer> getNodeIdMap() {
+ return _planNodeObjectToIdMap;
+ }
+
+ public Map<Integer, PlanNode> getPipelineBreakerMap() {
+ return _pipelineBreakerMap;
+ }
+
+ public boolean isLeafStage() {
+ return _isLeafStage;
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
new file mode 100644
index 0000000000..b000d8ee74
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
+
+
+/**
+ * Utility class to run pipeline breaker execution and collects the results.
+ */
+public class PipelineBreakerExecutor {
+ private PipelineBreakerExecutor() {
+ // do not instantiate.
+ }
+
+ /**
+ * Execute a pipeline breaker and collect the results (synchronously)
+ *
+ * Currently, pipeline breaker executor can only execute mailbox receive pipeline breaker.
+ */
+ public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler,
+ MailboxService mailboxService, DistributedStagePlan distributedStagePlan, long timeoutMs, long deadlineMs,
+ long requestId, boolean isTraceEnabled)
+ throws Exception {
+ PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext(
+ DistributedStagePlan.isLeafStage(distributedStagePlan));
+ PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(), pipelineBreakerContext);
+ if (pipelineBreakerContext.getPipelineBreakerMap().size() > 0) {
+ PlanNode stageRoot = distributedStagePlan.getStageRoot();
+ // TODO: This PlanRequestContext needs to indicate it is a pre-stage opChain and only listens to pre-stage OpChain
+ // receive-mail callbacks.
+ // see also: MailboxIdUtils TODOs, de-couple mailbox id from query information
+ PhysicalPlanContext physicalPlanContext =
+ new PhysicalPlanContext(mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
+ distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled);
+ Map<Integer, List<TransferableBlock>> resultMap =
+ PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext);
+ return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap);
+ } else {
+ return null;
+ }
+ }
+
+ private static Map<Integer, List<TransferableBlock>> execute(OpChainSchedulerService scheduler,
+ PipelineBreakerContext context, PhysicalPlanContext physicalPlanContext)
+ throws Exception {
+ Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap = new HashMap<>();
+ for (Map.Entry<Integer, PlanNode> e : context.getPipelineBreakerMap().entrySet()) {
+ int key = e.getKey();
+ PlanNode planNode = e.getValue();
+ // TODO: supprot other pipeline breaker node type as well.
+ if (!(planNode instanceof MailboxReceiveNode)) {
+ throw new UnsupportedOperationException("Only MailboxReceiveNode is supported to run as pipeline breaker now");
+ }
+ OpChain tempOpChain = PhysicalPlanVisitor.walkPlanNode(planNode, physicalPlanContext);
+ pipelineWorkerMap.put(key, tempOpChain.getRoot());
+ }
+ return runMailboxReceivePipelineBreaker(scheduler, pipelineWorkerMap, physicalPlanContext);
+ }
+
+ private static Map<Integer, List<TransferableBlock>> runMailboxReceivePipelineBreaker(
+ OpChainSchedulerService scheduler, Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap,
+ PhysicalPlanContext physicalPlanContext)
+ throws Exception {
+ PipelineBreakerOperator pipelineBreakerOperator = new PipelineBreakerOperator(
+ physicalPlanContext.getOpChainExecutionContext(), pipelineWorkerMap);
+ OpChain pipelineBreakerOpChain = new OpChain(physicalPlanContext.getOpChainExecutionContext(),
+ pipelineBreakerOperator,
+ physicalPlanContext.getReceivingMailboxIds());
+ scheduler.register(pipelineBreakerOpChain);
+ return pipelineBreakerOperator.getResult();
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
new file mode 100644
index 0000000000..61bc9c03b9
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+public class PipelineBreakerOperator extends MultiStageOperator {
+ private static final String EXPLAIN_NAME = "PIPELINE_BREAKER";
+ private final Deque<Map.Entry<Integer, Operator<TransferableBlock>>> _workerEntries;
+ private final Map<Integer, List<TransferableBlock>> _resultMap;
+ private final CountDownLatch _workerDoneLatch;
+ private TransferableBlock _errorBlock;
+
+
+ public PipelineBreakerOperator(OpChainExecutionContext context,
+ Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap) {
+ super(context);
+ _resultMap = new HashMap<>();
+ _workerEntries = new ArrayDeque<>();
+ _workerEntries.addAll(pipelineWorkerMap.entrySet());
+ _workerDoneLatch = new CountDownLatch(1);
+ }
+
+ public Map<Integer, List<TransferableBlock>> getResult()
+ throws Exception {
+ boolean isWorkerDone =
+ _workerDoneLatch.await(_context.getDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ if (isWorkerDone && _errorBlock == null) {
+ return _resultMap;
+ } else {
+ throw new RuntimeException("Unable to construct pipeline breaker results due to timeout.");
+ }
+ }
+
+ @Nullable
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ protected TransferableBlock getNextBlock() {
+ if (System.currentTimeMillis() > _context.getDeadlineMs()) {
+ _errorBlock = TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
+ _workerDoneLatch.countDown();
+ return _errorBlock;
+ }
+
+ // Poll from every mailbox operator in round-robin fashion:
+ // - Return the first content block
+ // - If no content block found but there are mailboxes not finished, return no-op block
+ // - If all content blocks are already returned, return end-of-stream block
+ int numWorkers = _workerEntries.size();
+ for (int i = 0; i < numWorkers; i++) {
+ Map.Entry<Integer, Operator<TransferableBlock>> worker = _workerEntries.remove();
+ TransferableBlock block = worker.getValue().nextBlock();
+
+ // Release the mailbox when the block is end-of-stream
+ if (block != null && block.isSuccessfulEndOfStreamBlock()) {
+ continue;
+ }
+
+ // Add the worker back to the queue if the block is not end-of-stream
+ _workerEntries.add(worker);
+ if (block != null) {
+ if (block.isErrorBlock()) {
+ _errorBlock = block;
+ _workerDoneLatch.countDown();
+ return _errorBlock;
+ }
+ List<TransferableBlock> blockList = _resultMap.computeIfAbsent(worker.getKey(), (k) -> new ArrayList<>());
+ // TODO: only data block is handled, we also need to handle metadata block from upstream in the future.
+ if (!block.isEndOfStreamBlock()) {
+ blockList.add(block);
+ }
+ }
+ }
+
+ if (_workerEntries.isEmpty()) {
+ // NOTIFY results are ready.
+ _workerDoneLatch.countDown();
+ return TransferableBlockUtils.getEndOfStreamTransferableBlock();
+ } else {
+ return TransferableBlockUtils.getNoOpTransferableBlock();
+ }
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
new file mode 100644
index 0000000000..7ebd1d0c8d
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class PipelineBreakerResult {
+ private final Map<PlanNode, Integer> _nodeIdMap;
+ private final Map<Integer, List<TransferableBlock>> _resultMap;
+
+ public PipelineBreakerResult(Map<PlanNode, Integer> nodeIdMap, Map<Integer, List<TransferableBlock>> resultMap) {
+ _nodeIdMap = nodeIdMap;
+ _resultMap = resultMap;
+ }
+
+ public Map<PlanNode, Integer> getNodeIdMap() {
+ return _nodeIdMap;
+ }
+
+ public Map<Integer, List<TransferableBlock>> getResultMap() {
+ return _resultMap;
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
new file mode 100644
index 0000000000..03ea0b2115
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+
+
+public class PipelineBreakerVisitor extends DefaultPostOrderTraversalVisitor<Void, PipelineBreakerContext> {
+ private static final PlanNodeVisitor<Void, PipelineBreakerContext> INSTANCE = new PipelineBreakerVisitor();
+
+ public static void visitPlanRoot(PlanNode root, PipelineBreakerContext context) {
+ root.visit(PipelineBreakerVisitor.INSTANCE, context);
+ }
+
+ @Override
+ public Void process(PlanNode planNode, PipelineBreakerContext context) {
+ context.visitedNewPlanNode(planNode);
+ return null;
+ }
+
+ @Override
+ public Void visitMailboxReceive(MailboxReceiveNode node, PipelineBreakerContext context) {
+ process(node, context);
+ // TODO: actually implement pipeline breaker attribute in PlanNode
+ // currently all mailbox receive node from leaf stage is considered as pipeline breaker node.
+ if (context.isLeafStage()) {
+ context.addPipelineBreaker(node);
+ }
+ return null;
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 87bf3302b5..5c7bc12503 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -23,8 +23,9 @@ import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.routing.VirtualServerAddress;
-import org.apache.pinot.query.runtime.plan.PlanRequestContext;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
import org.apache.pinot.query.runtime.plan.StageMetadata;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.spi.config.table.TableType;
@@ -32,7 +33,7 @@ import org.apache.pinot.spi.config.table.TableType;
* Context class for converting a {@link org.apache.pinot.query.runtime.plan.DistributedStagePlan} into
* {@link PinotQuery} to execute on server.
*/
-public class ServerPlanRequestContext extends PlanRequestContext {
+public class ServerPlanRequestContext extends PhysicalPlanContext {
protected TableType _tableType;
protected TimeBoundaryInfo _timeBoundaryInfo;
@@ -40,9 +41,11 @@ public class ServerPlanRequestContext extends PlanRequestContext {
protected InstanceRequest _instanceRequest;
public ServerPlanRequestContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs,
- long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata, PinotQuery pinotQuery,
+ long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata,
+ PipelineBreakerResult pipelineBreakerResult, PinotQuery pinotQuery,
TableType tableType, TimeBoundaryInfo timeBoundaryInfo, boolean traceEnabled) {
- super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, stageMetadata, traceEnabled);
+ super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, stageMetadata, pipelineBreakerResult,
+ traceEnabled);
_pinotQuery = pinotQuery;
_tableType = tableType;
_timeBoundaryInfo = timeBoundaryInfo;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
similarity index 57%
rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index a4bf0450ea..53846d257f 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -16,48 +16,37 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.plan;
+package org.apache.pinot.query.runtime.plan.server;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.DataSource;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.optimizer.QueryOptimizer;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.parser.CalciteRexExpressionParser;
-import org.apache.pinot.query.planner.plannode.AggregateNode;
-import org.apache.pinot.query.planner.plannode.ExchangeNode;
-import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.plannode.JoinNode;
-import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
-import org.apache.pinot.query.planner.plannode.MailboxSendNode;
-import org.apache.pinot.query.planner.plannode.PlanNode;
-import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
-import org.apache.pinot.query.planner.plannode.ProjectNode;
-import org.apache.pinot.query.planner.plannode.SetOpNode;
-import org.apache.pinot.query.planner.plannode.SortNode;
-import org.apache.pinot.query.planner.plannode.TableScanNode;
-import org.apache.pinot.query.planner.plannode.ValueNode;
-import org.apache.pinot.query.planner.plannode.WindowNode;
-import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.FilterKind;
import org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQueryRewriter;
import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter;
@@ -67,18 +56,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Plan visitor for direct leaf-stage server request.
- *
- * This should be merged with logics in {@link org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2} in the future
- * to directly produce operator chain.
- *
- * As of now, the reason why we use the plan visitor for server request is for additional support such as dynamic
- * filtering and other auxiliary functionalities.
- */
-public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPlanRequestContext> {
+public class ServerPlanRequestUtils {
private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000;
- private static final Logger LOGGER = LoggerFactory.getLogger(ServerRequestPlanVisitor.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServerPlanRequestUtils.class);
private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
ImmutableList.of(PredicateComparisonRewriter.class.getName(),
NonAggregationGroupByToDistinctQueryRewriter.class.getName());
@@ -86,11 +66,14 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla
new ArrayList<>(QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES));
private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();
- private static final ServerRequestPlanVisitor INSTANCE = new ServerRequestPlanVisitor();
+ private ServerPlanRequestUtils() {
+ // do not instantiate.
+ }
public static ServerPlanRequestContext build(MailboxService mailboxService, DistributedStagePlan stagePlan,
- Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo,
- TableType tableType, List<String> segmentList, long deadlineMs) {
+ Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult,
+ TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo, TableType tableType,
+ List<String> segmentList, long deadlineMs) {
// Before-visit: construct the ServerPlanRequestContext baseline
// Making a unique requestId for leaf stages otherwise it causes problem on stats/metrics/tracing.
long requestId = (Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)) << 16) + (
@@ -108,10 +91,11 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla
pinotQuery.setExplain(false);
ServerPlanRequestContext context =
new ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(), timeoutMs, deadlineMs,
- stagePlan.getServer(), stagePlan.getStageMetadata(), pinotQuery, tableType, timeBoundaryInfo, traceEnabled);
+ stagePlan.getServer(), stagePlan.getStageMetadata(), pipelineBreakerResult, pinotQuery, tableType,
+ timeBoundaryInfo, traceEnabled);
// visit the plan and create query physical plan.
- ServerRequestPlanVisitor.walkStageNode(stagePlan.getStageRoot(), context);
+ ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), context);
// Post-visit: finalize context.
// 1. global rewrite/optimize
@@ -148,6 +132,9 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla
return context;
}
+ /**
+ * Helper method to update query options.
+ */
private static void updateQueryOptions(PinotQuery pinotQuery, Map<String, String> requestMetadataMap, long timeoutMs,
boolean traceEnabled) {
Map<String, String> queryOptions = new HashMap<>();
@@ -161,113 +148,6 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla
pinotQuery.setQueryOptions(queryOptions);
}
- private static void walkStageNode(PlanNode node, ServerPlanRequestContext context) {
- node.visit(INSTANCE, context);
- }
-
- @Override
- public Void visitAggregate(AggregateNode node, ServerPlanRequestContext context) {
- visitChildren(node, context);
- // set group-by list
- context.getPinotQuery()
- .setGroupByList(CalciteRexExpressionParser.convertGroupByList(node.getGroupSet(), context.getPinotQuery()));
- // set agg list
- context.getPinotQuery().setSelectList(
- CalciteRexExpressionParser.addSelectList(context.getPinotQuery().getGroupByList(), node.getAggCalls(),
- context.getPinotQuery()));
- return null;
- }
-
- @Override
- public Void visitWindow(WindowNode node, ServerPlanRequestContext context) {
- throw new UnsupportedOperationException("Window not yet supported!");
- }
-
- @Override
- public Void visitSetOp(SetOpNode node, ServerPlanRequestContext context) {
- visitChildren(node, context);
- return null;
- }
-
- @Override
- public Void visitExchange(ExchangeNode exchangeNode, ServerPlanRequestContext context) {
- throw new UnsupportedOperationException("Exchange not yet supported!");
- }
-
- @Override
- public Void visitFilter(FilterNode node, ServerPlanRequestContext context) {
- visitChildren(node, context);
- context.getPinotQuery()
- .setFilterExpression(CalciteRexExpressionParser.toExpression(node.getCondition(), context.getPinotQuery()));
- return null;
- }
-
- @Override
- public Void visitJoin(JoinNode node, ServerPlanRequestContext context) {
- visitChildren(node, context);
- return null;
- }
-
- @Override
- public Void visitMailboxReceive(MailboxReceiveNode node, ServerPlanRequestContext context) {
- visitChildren(node, context);
- return null;
- }
-
- @Override
- public Void visitMailboxSend(MailboxSendNode node, ServerPlanRequestContext context) {
- visitChildren(node, context);
- return null;
- }
-
- @Override
- public Void visitProject(ProjectNode node, ServerPlanRequestContext context) {
- visitChildren(node, context);
- context.getPinotQuery()
- .setSelectList(CalciteRexExpressionParser.overwriteSelectList(node.getProjects(), context.getPinotQuery()));
- return null;
- }
-
- @Override
- public Void visitSort(SortNode node, ServerPlanRequestContext context) {
- visitChildren(node, context);
- PinotQuery pinotQuery = context.getPinotQuery();
- if (node.getCollationKeys().size() > 0) {
- pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(node, pinotQuery));
- }
- if (node.getFetch() > 0) {
- pinotQuery.setLimit(node.getFetch());
- }
- if (node.getOffset() > 0) {
- pinotQuery.setOffset(node.getOffset());
- }
- return null;
- }
-
- @Override
- public Void visitTableScan(TableScanNode node, ServerPlanRequestContext context) {
- DataSource dataSource = new DataSource();
- String tableNameWithType = TableNameBuilder.forType(context.getTableType())
- .tableNameWithType(TableNameBuilder.extractRawTableName(node.getTableName()));
- dataSource.setTableName(tableNameWithType);
- context.getPinotQuery().setDataSource(dataSource);
- context.getPinotQuery().setSelectList(
- node.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression).collect(Collectors.toList()));
- return null;
- }
-
- @Override
- public Void visitValue(ValueNode node, ServerPlanRequestContext context) {
- visitChildren(node, context);
- return null;
- }
-
- private void visitChildren(PlanNode node, ServerPlanRequestContext context) {
- for (PlanNode child : node.getInputs()) {
- child.visit(this, context);
- }
- }
-
/**
* Helper method to attach the time boundary to the given PinotQuery.
*/
@@ -289,4 +169,106 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla
pinotQuery.setFilterExpression(timeFilterExpression);
}
}
+
+ /**
+ * attach the dynamic filter to the given PinotQuery.
+ */
+ static void attachDynamicFilter(PinotQuery pinotQuery, JoinNode.JoinKeys joinKeys, List<Object[]> dataContainer,
+ DataSchema dataSchema) {
+ FieldSelectionKeySelector leftSelector = (FieldSelectionKeySelector) joinKeys.getLeftJoinKeySelector();
+ FieldSelectionKeySelector rightSelector = (FieldSelectionKeySelector) joinKeys.getRightJoinKeySelector();
+ List<Expression> expressions = new ArrayList<>();
+ for (int i = 0; i < leftSelector.getColumnIndices().size(); i++) {
+ Expression leftExpr = pinotQuery.getSelectList().get(leftSelector.getColumnIndices().get(i));
+ int rightIdx = rightSelector.getColumnIndices().get(i);
+ Expression inFilterExpr = RequestUtils.getFunctionExpression(FilterKind.IN.name());
+ List<Expression> operands = new ArrayList<>(dataContainer.size() + 1);
+ operands.add(leftExpr);
+ operands.addAll(computeInOperands(dataContainer, dataSchema, rightIdx));
+ inFilterExpr.getFunctionCall().setOperands(operands);
+ expressions.add(inFilterExpr);
+ }
+ attachFilterExpression(pinotQuery, FilterKind.AND, expressions);
+ }
+
+ private static List<Expression> computeInOperands(List<Object[]> dataContainer, DataSchema dataSchema, int colIdx) {
+ final DataSchema.ColumnDataType columnDataType = dataSchema.getColumnDataType(colIdx);
+ final FieldSpec.DataType storedType = columnDataType.getStoredType().toDataType();;
+ final int numRows = dataContainer.size();
+ List<Expression> expressions = new ArrayList<>();
+ switch (storedType) {
+ case INT:
+ int[] arrInt = new int[numRows];
+ for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+ arrInt[rowIdx] = (int) dataContainer.get(rowIdx)[colIdx];
+ }
+ Arrays.sort(arrInt);
+ for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+ expressions.add(RequestUtils.getLiteralExpression(arrInt[rowIdx]));
+ }
+ break;
+ case LONG:
+ long[] arrLong = new long[numRows];
+ for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+ arrLong[rowIdx] = (long) dataContainer.get(rowIdx)[colIdx];
+ }
+ Arrays.sort(arrLong);
+ for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+ expressions.add(RequestUtils.getLiteralExpression(arrLong[rowIdx]));
+ }
+ break;
+ case FLOAT:
+ float[] arrFloat = new float[numRows];
+ for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+ arrFloat[rowIdx] = (float) dataContainer.get(rowIdx)[colIdx];
+ }
+ Arrays.sort(arrFloat);
+ for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+ expressions.add(RequestUtils.getLiteralExpression(arrFloat[rowIdx]));
+ }
+ break;
+ case DOUBLE:
+ double[] arrDouble = new double[numRows];
+ for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+ arrDouble[rowIdx] = (double) dataContainer.get(rowIdx)[colIdx];
+ }
+ Arrays.sort(arrDouble);
+ for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+ expressions.add(RequestUtils.getLiteralExpression(arrDouble[rowIdx]));
+ }
+ break;
+ case STRING:
+ String[] arrString = new String[numRows];
+ for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+ arrString[rowIdx] = (String) dataContainer.get(rowIdx)[colIdx];
+ }
+ Arrays.sort(arrString);
+ for (int rowIdx = 0; rowIdx < numRows; rowIdx++) {
+ expressions.add(RequestUtils.getLiteralExpression(arrString[rowIdx]));
+ }
+ break;
+ default:
+ throw new IllegalStateException("Illegal SV data type for ID_SET aggregation function: " + storedType);
+ }
+ return expressions;
+ }
+
+ /**
+ * Attach Filter Expression to existing PinotQuery.
+ */
+ private static void attachFilterExpression(PinotQuery pinotQuery, FilterKind attachKind, List<Expression> exprs) {
+ Preconditions.checkState(attachKind == FilterKind.AND || attachKind == FilterKind.OR);
+ Expression filterExpression = pinotQuery.getFilterExpression();
+ List<Expression> arrayList = new ArrayList<>(exprs);
+ if (filterExpression != null) {
+ arrayList.add(filterExpression);
+ }
+ if (arrayList.size() > 1) {
+ Expression attachFilterExpression = RequestUtils.getFunctionExpression(attachKind.name());
+ attachFilterExpression.getFunctionCall().setOperands(arrayList);
+ pinotQuery.setFilterExpression(attachFilterExpression);
+ } else {
+ pinotQuery.setFilterExpression(arrayList.get(0));
+ }
+ }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
new file mode 100644
index 0000000000..9b1f97426c
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.server;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.query.parser.CalciteRexExpressionParser;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+/**
+ * Plan visitor for direct leaf-stage server request.
+ *
+ * This should be merged with logics in {@link org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2} in the future
+ * to directly produce operator chain.
+ *
+ * As of now, the reason why we use the plan visitor for server request is for additional support such as dynamic
+ * filtering and other auxiliary functionalities.
+ */
+public class ServerPlanRequestVisitor implements PlanNodeVisitor<Void, ServerPlanRequestContext> {
+ private static final ServerPlanRequestVisitor INSTANCE = new ServerPlanRequestVisitor();
+
+ static void walkStageNode(PlanNode node, ServerPlanRequestContext context) {
+ node.visit(INSTANCE, context);
+ }
+
+ @Override
+ public Void visitAggregate(AggregateNode node, ServerPlanRequestContext context) {
+ visitChildren(node, context);
+ // set group-by list
+ context.getPinotQuery()
+ .setGroupByList(CalciteRexExpressionParser.convertGroupByList(node.getGroupSet(), context.getPinotQuery()));
+ // set agg list
+ context.getPinotQuery().setSelectList(
+ CalciteRexExpressionParser.addSelectList(context.getPinotQuery().getGroupByList(), node.getAggCalls(),
+ context.getPinotQuery()));
+ return null;
+ }
+
+ @Override
+ public Void visitWindow(WindowNode node, ServerPlanRequestContext context) {
+ throw new UnsupportedOperationException("Window not yet supported!");
+ }
+
+ @Override
+ public Void visitSetOp(SetOpNode node, ServerPlanRequestContext context) {
+ visitChildren(node, context);
+ return null;
+ }
+
+ @Override
+ public Void visitExchange(ExchangeNode exchangeNode, ServerPlanRequestContext context) {
+ throw new UnsupportedOperationException("Exchange not yet supported!");
+ }
+
+ @Override
+ public Void visitFilter(FilterNode node, ServerPlanRequestContext context) {
+ visitChildren(node, context);
+ context.getPinotQuery()
+ .setFilterExpression(CalciteRexExpressionParser.toExpression(node.getCondition(), context.getPinotQuery()));
+ return null;
+ }
+
+ @Override
+ public Void visitJoin(JoinNode node, ServerPlanRequestContext context) {
+ // visit only the static side, turn the dynamic side into a lookup from the pipeline breaker resultDataContainer
+ PlanNode staticSide = node.getInputs().get(0);
+ PlanNode dynamicSide = node.getInputs().get(1);
+ if (staticSide instanceof MailboxReceiveNode) {
+ dynamicSide = node.getInputs().get(0);
+ staticSide = node.getInputs().get(1);
+ }
+ staticSide.visit(this, context);
+ int resultMapId = context.getPipelineBreakerResult().getNodeIdMap().get(dynamicSide);
+ List<TransferableBlock> transferableBlocks = context.getPipelineBreakerResult().getResultMap().get(resultMapId);
+ List<Object[]> resultDataContainer = new ArrayList<>();
+ DataSchema dataSchema = dynamicSide.getDataSchema();
+ for (TransferableBlock block : transferableBlocks) {
+ if (block.getType() == DataBlock.Type.ROW) {
+ resultDataContainer.addAll(block.getContainer());
+ }
+ }
+
+ if (resultDataContainer.size() > 0) {
+ // rewrite SEMI-JOIN as filter clause.
+ ServerPlanRequestUtils.attachDynamicFilter(context.getPinotQuery(), node.getJoinKeys(), resultDataContainer,
+ dataSchema);
+ } else {
+ // do not pull any data out, this is constant false filter.
+ context.getPinotQuery().setLimit(0);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitMailboxReceive(MailboxReceiveNode node, ServerPlanRequestContext context) {
+ visitChildren(node, context);
+ return null;
+ }
+
+ @Override
+ public Void visitMailboxSend(MailboxSendNode node, ServerPlanRequestContext context) {
+ visitChildren(node, context);
+ return null;
+ }
+
+ @Override
+ public Void visitProject(ProjectNode node, ServerPlanRequestContext context) {
+ visitChildren(node, context);
+ context.getPinotQuery()
+ .setSelectList(CalciteRexExpressionParser.overwriteSelectList(node.getProjects(), context.getPinotQuery()));
+ return null;
+ }
+
+ @Override
+ public Void visitSort(SortNode node, ServerPlanRequestContext context) {
+ visitChildren(node, context);
+ PinotQuery pinotQuery = context.getPinotQuery();
+ if (node.getCollationKeys().size() > 0) {
+ pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(node, pinotQuery));
+ }
+ if (node.getFetch() > 0) {
+ pinotQuery.setLimit(node.getFetch());
+ }
+ if (node.getOffset() > 0) {
+ pinotQuery.setOffset(node.getOffset());
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitTableScan(TableScanNode node, ServerPlanRequestContext context) {
+ DataSource dataSource = new DataSource();
+ String tableNameWithType = TableNameBuilder.forType(context.getTableType())
+ .tableNameWithType(TableNameBuilder.extractRawTableName(node.getTableName()));
+ dataSource.setTableName(tableNameWithType);
+ context.getPinotQuery().setDataSource(dataSource);
+ context.getPinotQuery().setSelectList(
+ node.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression).collect(Collectors.toList()));
+ return null;
+ }
+
+ @Override
+ public Void visitValue(ValueNode node, ServerPlanRequestContext context) {
+ visitChildren(node, context);
+ return null;
+ }
+
+ private void visitChildren(PlanNode node, ServerPlanRequestContext context) {
+ for (PlanNode child : node.getInputs()) {
+ child.visit(this, context);
+ }
+ }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 75401ca07f..5042805948 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -20,12 +20,16 @@ package org.apache.pinot.query;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.common.utils.SchemaUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.service.QueryConfig;
@@ -65,6 +69,9 @@ public class QueryServerEnclosure {
private final HelixManager _helixManager;
private final QueryRunner _queryRunner;
+ private final ExecutorService _executor = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,
+ new NamedThreadFactory("QueryServerTest_Server"));
+
public QueryServerEnclosure(MockInstanceDataManagerFactory factory) {
try {
@@ -124,6 +131,12 @@ public class QueryServerEnclosure {
}
public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
- _queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
+ _executor.submit(() -> {
+ try {
+ _queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
+ } catch (Exception e) {
+ throw new RuntimeException("Error executing query!", e);
+ }
+ });
}
}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index ca35b62a9c..88081cb342 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -215,6 +215,9 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
@DataProvider(name = "testDataWithSqlToFinalRowCount")
private Object[][] provideTestSqlAndRowCount() {
return new Object[][]{
+ new Object[]{"SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ col1 FROM a "
+ + " WHERE a.col1 IN (SELECT b.col2 FROM b WHERE b.col3 < 10) AND a.col3 > 0", 9},
+
// using join clause
new Object[]{"SELECT * FROM a JOIN b USING (col1)", 15},
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
index 8276c647aa..83dea7c82d 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java
@@ -25,7 +25,7 @@ import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
-import org.apache.pinot.query.runtime.plan.PlanRequestContext;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.mockito.Mock;
import org.mockito.Mockito;
@@ -41,7 +41,7 @@ public class LiteralValueOperatorTest {
private AutoCloseable _mocks;
@Mock
- private PlanRequestContext _context;
+ private PhysicalPlanContext _context;
@Mock
private VirtualServerAddress _serverAddress;
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
index ca24f97043..b099e5f807 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java
@@ -67,9 +67,6 @@ public class QueryServerTest extends QueryTestSet {
private static final ExecutorService INTERM_WORKER_EXECUTOR_SERVICE =
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS,
new NamedThreadFactory("QueryDispatcherTest_IntermWorker"));
- private static final ExecutorService RUNNER_EXECUTOR_SERVICE =
- Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,
- new NamedThreadFactory("QueryServerTest_Runner"));
private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>();
@@ -85,7 +82,6 @@ public class QueryServerTest extends QueryTestSet {
QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
Mockito.when(queryRunner.getQueryWorkerLeafExecutorService()).thenReturn(LEAF_WORKER_EXECUTOR_SERVICE);
Mockito.when(queryRunner.getQueryWorkerIntermExecutorService()).thenReturn(INTERM_WORKER_EXECUTOR_SERVICE);
- Mockito.when(queryRunner.getQueryRunnerExecutorService()).thenReturn(RUNNER_EXECUTOR_SERVICE);
QueryServer queryServer = new QueryServer(availablePort, queryRunner);
queryServer.start();
_queryServerMap.put(availablePort, queryServer);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index 69d2dda026..98a54b5f6e 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -55,8 +55,6 @@ public class QueryDispatcherTest extends QueryTestSet {
ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("QueryDispatcherTest_LeafWorker"));
private static final ExecutorService INTERM_WORKER_EXECUTOR_SERVICE = Executors.newFixedThreadPool(
ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("QueryDispatcherTest_IntermWorker"));
- private static final ExecutorService RUNNER_EXECUTOR_SERVICE = Executors.newFixedThreadPool(
- ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, new NamedThreadFactory("QueryDispatcherTest_Runner"));
private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>();
private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>();
@@ -73,7 +71,6 @@ public class QueryDispatcherTest extends QueryTestSet {
QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
Mockito.when(queryRunner.getQueryWorkerLeafExecutorService()).thenReturn(LEAF_WORKER_EXECUTOR_SERVICE);
Mockito.when(queryRunner.getQueryWorkerIntermExecutorService()).thenReturn(INTERM_WORKER_EXECUTOR_SERVICE);
- Mockito.when(queryRunner.getQueryRunnerExecutorService()).thenReturn(RUNNER_EXECUTOR_SERVICE);
QueryServer queryServer = new QueryServer(availablePort, queryRunner);
queryServer = Mockito.spy(queryServer);
queryServer.start();
diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
index 464dac98f5..2bc442fc09 100644
--- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
+++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
@@ -62,6 +62,26 @@
{
"description": "Colocated JOIN with partition column and group by non-partitioned column",
"sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true'), aggOptions(is_partitioned_by_group_by_keys='false') */ {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.name"
+ },
+ {
+ "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column",
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl1}.name FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy'))"
+ },
+ {
+ "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by partition column",
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true'), aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT({tbl1}.name) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
+ },
+ {
+ "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by non-partitioned column",
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.name"
+ },
+ {
+ "description": "Dynamic broadcast SEMI-JOIN with empty right table result",
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val = 'non-exist') GROUP BY {tbl1}.name"
+ },
+ {
+ "description": "Colocated, Dynamic broadcast SEMI-JOIN with partially empty right table result for some servers",
+ "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val = 'z') GROUP BY {tbl1}.name"
}
]
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org