You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/06/21 04:28:11 UTC

[pinot] branch master updated: [multistage][cleanup] clean up PhysicalPlanContext usage (#10950)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 73830ae3ce [multistage][cleanup] clean up PhysicalPlanContext usage (#10950)
73830ae3ce is described below

commit 73830ae3ce94428b7e9991d1bd160bd62a14b7c1
Author: Rong Rong <ro...@apache.org>
AuthorDate: Tue Jun 20 21:28:05 2023 -0700

    [multistage][cleanup] clean up PhysicalPlanContext usage (#10950)
    
    * [clean up] server plan should not extend physical plan, it should encapsulate
    * [clean up] reformat opChainExecutionContext, constructor to only take PhysicalPlanContext directly
    * [clean up] remove timeoutMs as argument and member var from plan context
---
 .../apache/pinot/query/runtime/QueryRunner.java    | 31 ++++----
 .../runtime/plan/OpChainExecutionContext.java      | 12 +--
 .../query/runtime/plan/PhysicalPlanContext.java    | 13 +---
 .../plan/pipeline/PipelineBreakerExecutor.java     |  4 +-
 .../plan/server/ServerPlanRequestContext.java      | 28 +++----
 .../plan/server/ServerPlanRequestUtils.java        | 21 ++----
 .../plan/server/ServerPlanRequestVisitor.java      |  6 +-
 .../query/service/dispatch/QueryDispatcher.java    | 14 ++--
 .../executor/OpChainSchedulerServiceTest.java      |  2 +-
 .../runtime/executor/RoundRobinSchedulerTest.java  |  2 +-
 .../operator/MailboxReceiveOperatorTest.java       | 87 ++++++++--------------
 .../runtime/operator/MailboxSendOperatorTest.java  |  2 +-
 .../pinot/query/runtime/operator/OpChainTest.java  | 41 ++++------
 .../query/runtime/operator/OperatorTestUtil.java   | 10 +--
 .../operator/SortedMailboxReceiveOperatorTest.java | 85 +++++++--------------
 15 files changed, 134 insertions(+), 224 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 914c6c2ed0..aa16f0b220 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -151,7 +151,7 @@ public class QueryRunner {
     PipelineBreakerResult pipelineBreakerResult;
     try {
       pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService,
-          distributedStagePlan, timeoutMs, deadlineMs, requestId, isTraceEnabled);
+          distributedStagePlan, deadlineMs, requestId, isTraceEnabled);
     } catch (Exception e) {
       LOGGER.error("Error executing pre-stage pipeline breaker for: {}:{}", requestId,
           distributedStagePlan.getStageId(), e);
@@ -174,7 +174,7 @@ public class QueryRunner {
       try {
         PlanNode stageRoot = distributedStagePlan.getStageRoot();
         OpChain rootOperator = PhysicalPlanVisitor.walkPlanNode(stageRoot,
-            new PhysicalPlanContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
+            new PhysicalPlanContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), deadlineMs,
                 distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled));
         _scheduler.register(rootOperator);
       } catch (Exception e) {
@@ -203,19 +203,18 @@ public class QueryRunner {
       PipelineBreakerResult pipelineBreakerResult, long timeoutMs, long deadlineMs, long requestId) {
     boolean isTraceEnabled =
         Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
+    PhysicalPlanContext planContext = new PhysicalPlanContext(_mailboxService, requestId,
+        distributedStagePlan.getStageId(), deadlineMs, distributedStagePlan.getServer(),
+        distributedStagePlan.getStageMetadata(), pipelineBreakerResult, isTraceEnabled);
     List<ServerPlanRequestContext> serverPlanRequestContexts =
-        constructServerQueryRequests(distributedStagePlan, requestMetadataMap, pipelineBreakerResult,
-            _helixPropertyStore, _mailboxService, deadlineMs);
+        constructServerQueryRequests(planContext, distributedStagePlan, requestMetadataMap, _helixPropertyStore);
     List<ServerQueryRequest> serverQueryRequests = new ArrayList<>(serverPlanRequestContexts.size());
     for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) {
       serverQueryRequests.add(new ServerQueryRequest(requestContext.getInstanceRequest(),
           new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis()));
     }
     MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
-    OpChainExecutionContext opChainExecutionContext =
-        new OpChainExecutionContext(_mailboxService, requestId, sendNode.getPlanFragmentId(),
-            distributedStagePlan.getServer(), timeoutMs, deadlineMs, distributedStagePlan.getStageMetadata(),
-            isTraceEnabled);
+    OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(planContext);
     MultiStageOperator leafStageOperator =
         new LeafStageTransferableBlockOperator(opChainExecutionContext, this::processServerQueryRequest,
             serverQueryRequests, sendNode.getDataSchema());
@@ -226,9 +225,9 @@ public class QueryRunner {
     return new OpChain(opChainExecutionContext, mailboxSendOperator, Collections.emptyList());
   }
 
-  private static List<ServerPlanRequestContext> constructServerQueryRequests(DistributedStagePlan distributedStagePlan,
-      Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult,
-      ZkHelixPropertyStore<ZNRecord> helixPropertyStore, MailboxService mailboxService, long deadlineMs) {
+  private static List<ServerPlanRequestContext> constructServerQueryRequests(PhysicalPlanContext planContext,
+      DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap,
+      ZkHelixPropertyStore<ZNRecord> helixPropertyStore) {
     StageMetadata stageMetadata = distributedStagePlan.getStageMetadata();
     WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata();
     String rawTableName = StageMetadata.getTableName(stageMetadata);
@@ -244,17 +243,15 @@ public class QueryRunner {
             TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
         Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
             TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
-        requests.add(ServerPlanRequestUtils.build(mailboxService, distributedStagePlan, requestMetadataMap,
-            pipelineBreakerResult, tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata),
-            TableType.OFFLINE, tableEntry.getValue(), deadlineMs));
+        requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig,
+            schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, tableEntry.getValue()));
       } else if (TableType.REALTIME.name().equals(tableType)) {
         TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore,
             TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
         Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
             TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
-        requests.add(ServerPlanRequestUtils.build(mailboxService, distributedStagePlan, requestMetadataMap,
-            pipelineBreakerResult, tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata),
-            TableType.REALTIME, tableEntry.getValue(), deadlineMs));
+        requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig,
+            schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, tableEntry.getValue()));
       } else {
         throw new IllegalArgumentException("Unsupported table type key: " + tableType);
       }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 5da20c4e70..b23f41400a 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.runtime.plan;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.function.Consumer;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.routing.VirtualServerAddress;
@@ -35,21 +36,20 @@ public class OpChainExecutionContext {
   private final long _requestId;
   private final int _stageId;
   private final VirtualServerAddress _server;
-  private final long _timeoutMs;
   private final long _deadlineMs;
   private final StageMetadata _stageMetadata;
   private final OpChainId _id;
   private final OpChainStats _stats;
   private final boolean _traceEnabled;
 
+  @VisibleForTesting
   public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId,
-      VirtualServerAddress server, long timeoutMs, long deadlineMs, StageMetadata stageMetadata,
+      VirtualServerAddress server, long deadlineMs, StageMetadata stageMetadata,
       boolean traceEnabled) {
     _mailboxService = mailboxService;
     _requestId = requestId;
     _stageId = stageId;
     _server = server;
-    _timeoutMs = timeoutMs;
     _deadlineMs = deadlineMs;
     _stageMetadata = stageMetadata;
     _id = new OpChainId(requestId, server.workerId(), stageId);
@@ -59,7 +59,7 @@ public class OpChainExecutionContext {
 
   public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext) {
     this(physicalPlanContext.getMailboxService(), physicalPlanContext.getRequestId(), physicalPlanContext.getStageId(),
-        physicalPlanContext.getServer(), physicalPlanContext.getTimeoutMs(), physicalPlanContext.getDeadlineMs(),
+        physicalPlanContext.getServer(), physicalPlanContext.getDeadlineMs(),
         physicalPlanContext.getStageMetadata(), physicalPlanContext.isTraceEnabled());
   }
 
@@ -83,10 +83,6 @@ public class OpChainExecutionContext {
     return _server;
   }
 
-  public long getTimeoutMs() {
-    return _timeoutMs;
-  }
-
   public long getDeadlineMs() {
     return _deadlineMs;
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
index 2da5772930..a2fb6f4643 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java
@@ -29,8 +29,6 @@ public class PhysicalPlanContext {
   protected final MailboxService _mailboxService;
   protected final long _requestId;
   protected final int _stageId;
-  // TODO: Timeout is not needed since deadline is already present.
-  private final long _timeoutMs;
   private final long _deadlineMs;
   protected final VirtualServerAddress _server;
   protected final StageMetadata _stageMetadata;
@@ -39,13 +37,12 @@ public class PhysicalPlanContext {
   private final OpChainExecutionContext _opChainExecutionContext;
   private final boolean _traceEnabled;
 
-  public PhysicalPlanContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs,
-      long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata,
-      PipelineBreakerResult pipelineBreakerResult, boolean traceEnabled) {
+  public PhysicalPlanContext(MailboxService mailboxService, long requestId, int stageId, long deadlineMs,
+      VirtualServerAddress server, StageMetadata stageMetadata, PipelineBreakerResult pipelineBreakerResult,
+      boolean traceEnabled) {
     _mailboxService = mailboxService;
     _requestId = requestId;
     _stageId = stageId;
-    _timeoutMs = timeoutMs;
     _deadlineMs = deadlineMs;
     _server = server;
     _stageMetadata = stageMetadata;
@@ -62,10 +59,6 @@ public class PhysicalPlanContext {
     return _stageId;
   }
 
-  public long getTimeoutMs() {
-    return _timeoutMs;
-  }
-
   public long getDeadlineMs() {
     return _deadlineMs;
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index a59a310fed..0202127314 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -53,7 +53,7 @@ public class PipelineBreakerExecutor {
    * Currently, pipeline breaker executor can only execute mailbox receive pipeline breaker.
    */
   public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler,
-      MailboxService mailboxService, DistributedStagePlan distributedStagePlan, long timeoutMs, long deadlineMs,
+      MailboxService mailboxService, DistributedStagePlan distributedStagePlan, long deadlineMs,
       long requestId, boolean isTraceEnabled)
       throws Exception {
     PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext(
@@ -65,7 +65,7 @@ public class PipelineBreakerExecutor {
       //     receive-mail callbacks.
       // see also: MailboxIdUtils TODOs, de-couple mailbox id from query information
       PhysicalPlanContext physicalPlanContext =
-          new PhysicalPlanContext(mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
+          new PhysicalPlanContext(mailboxService, requestId, stageRoot.getPlanFragmentId(), deadlineMs,
               distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled);
       Map<Integer, List<TransferableBlock>> resultMap =
           PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
index 5c7bc12503..fb1c3af0c1 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -20,12 +20,7 @@ package org.apache.pinot.query.runtime.plan.server;
 
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.core.routing.TimeBoundaryInfo;
-import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
-import org.apache.pinot.query.runtime.plan.StageMetadata;
-import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 import org.apache.pinot.spi.config.table.TableType;
 
 
@@ -33,22 +28,21 @@ import org.apache.pinot.spi.config.table.TableType;
  * Context class for converting a {@link org.apache.pinot.query.runtime.plan.DistributedStagePlan} into
  * {@link PinotQuery} to execute on server.
  */
-public class ServerPlanRequestContext extends PhysicalPlanContext {
-  protected TableType _tableType;
-  protected TimeBoundaryInfo _timeBoundaryInfo;
+public class ServerPlanRequestContext {
+  private final PhysicalPlanContext _planContext;
+  private final TableType _tableType;
 
-  protected PinotQuery _pinotQuery;
-  protected InstanceRequest _instanceRequest;
+  private PinotQuery _pinotQuery;
+  private InstanceRequest _instanceRequest;
 
-  public ServerPlanRequestContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs,
-      long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata,
-      PipelineBreakerResult pipelineBreakerResult, PinotQuery pinotQuery,
-      TableType tableType, TimeBoundaryInfo timeBoundaryInfo, boolean traceEnabled) {
-    super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, stageMetadata, pipelineBreakerResult,
-        traceEnabled);
+  public ServerPlanRequestContext(PhysicalPlanContext planContext, PinotQuery pinotQuery, TableType tableType) {
+    _planContext = planContext;
     _pinotQuery = pinotQuery;
     _tableType = tableType;
-    _timeBoundaryInfo = timeBoundaryInfo;
+  }
+
+  public PhysicalPlanContext getPlanContext() {
+    return _planContext;
   }
 
   public TableType getTableType() {
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 7a0454798b..13659ae810 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -36,11 +36,10 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.query.optimizer.QueryOptimizer;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
-import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
 import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
-import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -70,10 +69,9 @@ public class ServerPlanRequestUtils {
     // do not instantiate.
   }
 
-  public static ServerPlanRequestContext build(MailboxService mailboxService, DistributedStagePlan stagePlan,
-      Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult,
-      TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo, TableType tableType,
-      List<String> segmentList, long deadlineMs) {
+  public static ServerPlanRequestContext build(PhysicalPlanContext planContext, DistributedStagePlan stagePlan,
+      Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo,
+      TableType tableType, List<String> segmentList) {
     // Before-visit: construct the ServerPlanRequestContext baseline
     // Making a unique requestId for leaf stages otherwise it causes problem on stats/metrics/tracing.
     long requestId = (Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)) << 16) + (
@@ -89,13 +87,10 @@ public class ServerPlanRequestUtils {
     }
     LOGGER.debug("QueryID" + requestId + " leafNodeLimit:" + leafNodeLimit);
     pinotQuery.setExplain(false);
-    ServerPlanRequestContext context =
-        new ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(), timeoutMs, deadlineMs,
-            stagePlan.getServer(), stagePlan.getStageMetadata(), pipelineBreakerResult, pinotQuery, tableType,
-            timeBoundaryInfo, traceEnabled);
+    ServerPlanRequestContext serverContext = new ServerPlanRequestContext(planContext, pinotQuery, tableType);
 
     // visit the plan and create query physical plan.
-    ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), context);
+    ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), serverContext);
 
     // Post-visit: finalize context.
     // 1. global rewrite/optimize
@@ -128,8 +123,8 @@ public class ServerPlanRequestUtils {
     instanceRequest.setSearchSegments(segmentList);
     instanceRequest.setQuery(brokerRequest);
 
-    context.setInstanceRequest(instanceRequest);
-    return context;
+    serverContext.setInstanceRequest(instanceRequest);
+    return serverContext;
   }
 
   /**
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
index 7cda76bfa2..8fa6df2756 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java
@@ -43,6 +43,7 @@ import org.apache.pinot.query.planner.plannode.TableScanNode;
 import org.apache.pinot.query.planner.plannode.ValueNode;
 import org.apache.pinot.query.planner.plannode.WindowNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
@@ -109,8 +110,9 @@ public class ServerPlanRequestVisitor implements PlanNodeVisitor<Void, ServerPla
       staticSide = node.getInputs().get(1);
     }
     staticSide.visit(this, context);
-    int resultMapId = context.getPipelineBreakerResult().getNodeIdMap().get(dynamicSide);
-    List<TransferableBlock> transferableBlocks = context.getPipelineBreakerResult().getResultMap().getOrDefault(
+    PipelineBreakerResult pipelineBreakerResult = context.getPlanContext().getPipelineBreakerResult();
+    int resultMapId = pipelineBreakerResult.getNodeIdMap().get(dynamicSide);
+    List<TransferableBlock> transferableBlocks = pipelineBreakerResult.getResultMap().getOrDefault(
         resultMapId, Collections.emptyList());
     List<Object[]> resultDataContainer = new ArrayList<>();
     DataSchema dataSchema = dynamicSide.getDataSchema();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index d7eb25a944..fb1dc05619 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -58,6 +58,7 @@ import org.apache.pinot.query.runtime.operator.OpChainStats;
 import org.apache.pinot.query.runtime.operator.OperatorStats;
 import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanContext;
 import org.apache.pinot.query.runtime.plan.StageMetadata;
 import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
 import org.apache.pinot.query.service.QueryConfig;
@@ -193,12 +194,13 @@ public class QueryDispatcher {
     DispatchablePlanFragment reduceStagePlanFragment = dispatchableSubPlan.getQueryStageList().get(reduceStageId);
     MailboxReceiveNode reduceNode = (MailboxReceiveNode) reduceStagePlanFragment.getPlanFragment().getFragmentRoot();
     VirtualServerAddress server = new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getPort(), 0);
-    OpChainExecutionContext context =
-        new OpChainExecutionContext(mailboxService, requestId, reduceStageId, server, timeoutMs,
-            System.currentTimeMillis() + timeoutMs,
-            new StageMetadata.Builder().setWorkerMetadataList(reduceStagePlanFragment.getWorkerMetadataList())
-                .addCustomProperties(reduceStagePlanFragment.getCustomProperties()).build(),
-            traceEnabled);
+    StageMetadata brokerStageMetadata = new StageMetadata.Builder()
+        .setWorkerMetadataList(reduceStagePlanFragment.getWorkerMetadataList())
+        .addCustomProperties(reduceStagePlanFragment.getCustomProperties())
+        .build();
+    PhysicalPlanContext planContext = new PhysicalPlanContext(mailboxService, requestId, reduceStageId,
+        System.currentTimeMillis() + timeoutMs, server, brokerStageMetadata, null, traceEnabled);
+    OpChainExecutionContext context = new OpChainExecutionContext(planContext);
     MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(context, reduceNode.getSenderStageId());
     List<DataBlock> resultDataBlocks =
         reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, statsAggregatorMap, dispatchableSubPlan,
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index fbbab9e4d9..332ceaaaab 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -74,7 +74,7 @@ public class OpChainSchedulerServiceTest {
 
   private OpChain getChain(MultiStageOperator operator) {
     VirtualServerAddress address = new VirtualServerAddress("localhost", 1234, 1);
-    OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, 0, null, true);
+    OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, null, true);
     return new OpChain(context, operator, ImmutableList.of());
   }
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
index 6589fe079f..798eb534ba 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java
@@ -180,6 +180,6 @@ public class RoundRobinSchedulerTest {
 
   private OpChainExecutionContext getOpChainExecutionContext(long requestId, int stageId, int virtualServerId) {
     return new OpChainExecutionContext(null, requestId, stageId,
-        new VirtualServerAddress("localhost", 1234, virtualServerId), 0, 0, null, true);
+        new VirtualServerAddress("localhost", 1234, virtualServerId), 0, null, true);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
index bcec439843..fed5ce422e 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java
@@ -74,40 +74,21 @@ public class MailboxReceiveOperatorTest {
     when(_mailboxService.getPort()).thenReturn(123);
     VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
     VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
-    _stageMetadataBoth = new StageMetadata.Builder()
-        .setWorkerMetadataList(Stream.of(server1, server2).map(
-                s -> new WorkerMetadata.Builder()
-                    .setVirtualServerAddress(s)
-                    .addMailBoxInfoMap(0,
-                        new MailboxMetadata(
-                            ImmutableList.of(
-                                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
-                                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-                            ImmutableList.of(server1, server2), ImmutableMap.of()))
-                    .addMailBoxInfoMap(1,
-                        new MailboxMetadata(
-                            ImmutableList.of(
-                                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
-                                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-                            ImmutableList.of(server1, server2), ImmutableMap.of()))
-                    .build())
-            .collect(Collectors.toList()))
-        .build();
+    _stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map(
+        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
+            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+            ImmutableList.of(server1, server2), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
+            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+            ImmutableList.of(server1, server2), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
     // sending stage is 0, receiving stage is 1
-    _stageMetadata1 = new StageMetadata.Builder()
-        .setWorkerMetadataList(Stream.of(server1).map(
-            s -> new WorkerMetadata.Builder()
-                .setVirtualServerAddress(s)
-                .addMailBoxInfoMap(0, new MailboxMetadata(
-                    ImmutableList.of(
-                        org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
-                    ImmutableList.of(server1), ImmutableMap.of()))
-                .addMailBoxInfoMap(1, new MailboxMetadata(
-                    ImmutableList.of(
-                        org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
-                    ImmutableList.of(server1), ImmutableMap.of()))
-                .build()).collect(Collectors.toList()))
-        .build();
+    _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map(
+        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
+            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+            ImmutableList.of(server1), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
+            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+            ImmutableList.of(server1), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
   }
 
   @AfterMethod
@@ -120,13 +101,11 @@ public class MailboxReceiveOperatorTest {
   public void shouldThrowSingletonNoMatchMailboxServer() {
     VirtualServerAddress server1 = new VirtualServerAddress("localhost", 456, 0);
     VirtualServerAddress server2 = new VirtualServerAddress("localhost", 789, 1);
-    StageMetadata stageMetadata = new StageMetadata.Builder()
-        .setWorkerMetadataList(Stream.of(server1, server2).map(
-            s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build()).collect(Collectors.toList()))
-        .build();
+    StageMetadata stageMetadata = new StageMetadata.Builder().setWorkerMetadataList(
+        Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build())
+            .collect(Collectors.toList())).build();
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            stageMetadata, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata, false);
     //noinspection resource
     new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1);
   }
@@ -134,8 +113,7 @@ public class MailboxReceiveOperatorTest {
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
   public void shouldThrowRangeDistributionNotSupported() {
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            null, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, null, false);
     //noinspection resource
     new MailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, 1);
   }
@@ -147,7 +125,7 @@ public class MailboxReceiveOperatorTest {
 
     // Short timeoutMs should result in timeout
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L,
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
             _stageMetadata1, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       Thread.sleep(100L);
@@ -158,8 +136,8 @@ public class MailboxReceiveOperatorTest {
     }
 
     // Longer timeout or default timeout (10s) doesn't result in timeout
-    context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10_000L,
-        System.currentTimeMillis() + 10_000L, _stageMetadata1, false);
+    context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L,
+        _stageMetadata1, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       Thread.sleep(100L);
       TransferableBlock mailbox = receiveOp.nextBlock();
@@ -172,8 +150,7 @@ public class MailboxReceiveOperatorTest {
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       assertTrue(receiveOp.nextBlock().isNoOpBlock());
     }
@@ -185,8 +162,7 @@ public class MailboxReceiveOperatorTest {
     when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       assertTrue(receiveOp.nextBlock().isEndOfStreamBlock());
     }
@@ -200,8 +176,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
       assertEquals(actualRows.size(), 1);
@@ -218,8 +193,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage)));
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, 1)) {
       TransferableBlock block = receiveOp.nextBlock();
       assertTrue(block.isErrorBlock());
@@ -237,8 +211,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
         1)) {
       List<Object[]> actualRows = receiveOp.nextBlock().getContainer();
@@ -261,8 +234,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
         1)) {
       // Receive first block from server1
@@ -287,8 +259,7 @@ public class MailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
     try (MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, RelDistribution.Type.HASH_DISTRIBUTED,
         1)) {
       TransferableBlock block = receiveOp.nextBlock();
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index ade517f1a1..2b74bb727a 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -183,7 +183,7 @@ public class MailboxSendOperatorTest {
         .setWorkerMetadataList(Collections.singletonList(
             new WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build();
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, _server, Long.MAX_VALUE, Long.MAX_VALUE,
+        new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, _server, Long.MAX_VALUE,
             stageMetadata, false);
     return new MailboxSendOperator(context, _sourceOperator, _exchange, null, null, false);
   }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
index 4af33067fa..910cc38523 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java
@@ -91,21 +91,14 @@ public class OpChainTest {
   public void setUp() {
     _mocks = MockitoAnnotations.openMocks(this);
     _serverAddress = new VirtualServerAddress("localhost", 123, 0);
-    _receivingStageMetadata = new StageMetadata.Builder()
-        .setWorkerMetadataList(Stream.of(_serverAddress).map(
-            s -> new WorkerMetadata.Builder()
-                .setVirtualServerAddress(s)
-                .addMailBoxInfoMap(0, new MailboxMetadata(
-                    ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
-                    ImmutableList.of(s), ImmutableMap.of()))
-                .addMailBoxInfoMap(1, new MailboxMetadata(
-                    ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
-                    ImmutableList.of(s), ImmutableMap.of()))
-                .addMailBoxInfoMap(2, new MailboxMetadata(
-                    ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)),
-                    ImmutableList.of(s), ImmutableMap.of()))
-                .build()).collect(Collectors.toList()))
-        .build();
+    _receivingStageMetadata = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(_serverAddress).map(
+        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0,
+            new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(s),
+                ImmutableMap.of())).addMailBoxInfoMap(1,
+            new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(s),
+                ImmutableMap.of())).addMailBoxInfoMap(2,
+            new MailboxMetadata(ImmutableList.of(MailboxIdUtils.toPlanMailboxId(0, 0, 0, 0)), ImmutableList.of(s),
+                ImmutableMap.of())).build()).collect(Collectors.toList())).build();
 
     when(_mailboxService1.getReceivingMailbox(any())).thenReturn(_mailbox1);
     when(_mailboxService2.getReceivingMailbox(any())).thenReturn(_mailbox2);
@@ -198,9 +191,8 @@ public class OpChainTest {
 
     int receivedStageId = 2;
     int senderStageId = 1;
-    OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, 1000,
-            System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
+    OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress,
+        System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
 
     Stack<MultiStageOperator> operators =
         getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -214,7 +206,7 @@ public class OpChainTest {
 
     OpChainExecutionContext secondStageContext =
         new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress,
-            1000, System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
+            System.currentTimeMillis() + 1000, _receivingStageMetadata, true);
 
     MailboxReceiveOperator secondStageReceiveOp =
         new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1);
@@ -238,9 +230,8 @@ public class OpChainTest {
 
     int receivedStageId = 2;
     int senderStageId = 1;
-    OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, 1000,
-            System.currentTimeMillis() + 1000, _receivingStageMetadata, false);
+    OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress,
+        System.currentTimeMillis() + 1000, _receivingStageMetadata, false);
 
     Stack<MultiStageOperator> operators =
         getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime);
@@ -251,7 +242,7 @@ public class OpChainTest {
     opChain.getStats().queued();
 
     OpChainExecutionContext secondStageContext =
-        new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, 1000,
+        new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress,
             System.currentTimeMillis() + 1000, _receivingStageMetadata, false);
     MailboxReceiveOperator secondStageReceiveOp =
         new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId);
@@ -290,8 +281,8 @@ public class OpChainTest {
     List<InstanceResponseBlock> resultsBlockList = Collections.singletonList(new InstanceResponseBlock(
         new SelectionResultsBlock(upStreamSchema, Arrays.asList(new Object[]{1}, new Object[]{2})), queryContext));
     LeafStageTransferableBlockOperator leafOp = new LeafStageTransferableBlockOperator(context,
-            LeafStageTransferableBlockOperatorTest.getStaticBlockProcessor(resultsBlockList),
-            Collections.singletonList(mock(ServerQueryRequest.class)), upStreamSchema);
+        LeafStageTransferableBlockOperatorTest.getStaticBlockProcessor(resultsBlockList),
+        Collections.singletonList(mock(ServerQueryRequest.class)), upStreamSchema);
 
     //Transform operator
     RexExpression.InputRef ref0 = new RexExpression.InputRef(0);
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index b5196e812f..7ecd344ab3 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -27,6 +27,7 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory;
 
+
 public class OperatorTestUtil {
   // simple key-value collision schema/data test set: "Aa" and "BB" have same hash code in java.
   private static final List<List<Object[]>> SIMPLE_KV_DATA_ROWS =
@@ -63,19 +64,16 @@ public class OperatorTestUtil {
 
   public static OpChainExecutionContext getDefaultContext() {
     VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
-    return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE,
-        null, true);
+    return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, true);
   }
 
   public static OpChainExecutionContext getDefaultContextWithTracingDisabled() {
     VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0);
-    return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE,
-        null, false);
+    return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, false);
   }
 
   public static OpChainExecutionContext getContext(long requestId, int stageId,
       VirtualServerAddress virtualServerAddress) {
-    return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE,
-        null, true);
+    return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, null, true);
   }
 }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
index 5bd973f4c2..25ef09f279 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java
@@ -84,40 +84,21 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailboxService.getPort()).thenReturn(123);
     VirtualServerAddress server1 = new VirtualServerAddress("localhost", 123, 0);
     VirtualServerAddress server2 = new VirtualServerAddress("localhost", 123, 1);
-    _stageMetadataBoth = new StageMetadata.Builder()
-        .setWorkerMetadataList(Stream.of(server1, server2).map(
-                s -> new WorkerMetadata.Builder()
-                    .setVirtualServerAddress(s)
-                    .addMailBoxInfoMap(0,
-                        new MailboxMetadata(
-                            ImmutableList.of(
-                                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
-                                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-                            ImmutableList.of(server1, server2), ImmutableMap.of()))
-                    .addMailBoxInfoMap(1,
-                        new MailboxMetadata(
-                            ImmutableList.of(
-                                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
-                                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
-                            ImmutableList.of(server1, server2), ImmutableMap.of()))
-                    .build())
-            .collect(Collectors.toList()))
-        .build();
+    _stageMetadataBoth = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1, server2).map(
+        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
+            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+            ImmutableList.of(server1, server2), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
+            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0),
+                org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 1, 0, 0)),
+            ImmutableList.of(server1, server2), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
     // sending stage is 0, receiving stage is 1
-    _stageMetadata1 = new StageMetadata.Builder()
-        .setWorkerMetadataList(Stream.of(server1).map(
-            s -> new WorkerMetadata.Builder()
-                .setVirtualServerAddress(s)
-                .addMailBoxInfoMap(0, new MailboxMetadata(
-                    ImmutableList.of(
-                        org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
-                    ImmutableList.of(server1), ImmutableMap.of()))
-                .addMailBoxInfoMap(1, new MailboxMetadata(
-                    ImmutableList.of(
-                        org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
-                    ImmutableList.of(server1), ImmutableMap.of()))
-                .build()).collect(Collectors.toList()))
-        .build();
+    _stageMetadata1 = new StageMetadata.Builder().setWorkerMetadataList(Stream.of(server1).map(
+        s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).addMailBoxInfoMap(0, new MailboxMetadata(
+            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+            ImmutableList.of(server1), ImmutableMap.of())).addMailBoxInfoMap(1, new MailboxMetadata(
+            ImmutableList.of(org.apache.pinot.query.planner.physical.MailboxIdUtils.toPlanMailboxId(1, 0, 0, 0)),
+            ImmutableList.of(server1), ImmutableMap.of())).build()).collect(Collectors.toList())).build();
   }
 
   @AfterMethod
@@ -134,8 +115,7 @@ public class SortedMailboxReceiveOperatorTest {
         Stream.of(server1, server2).map(s -> new WorkerMetadata.Builder().setVirtualServerAddress(s).build())
             .collect(Collectors.toList())).build();
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            stageMetadata, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, stageMetadata, false);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS,
         COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
@@ -144,8 +124,7 @@ public class SortedMailboxReceiveOperatorTest {
   @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*")
   public void shouldThrowRangeDistributionNotSupported() {
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE, null,
-            false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, null, false);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS,
         COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1);
@@ -155,7 +134,7 @@ public class SortedMailboxReceiveOperatorTest {
   public void shouldThrowOnEmptyCollationKey() {
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L,
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
             _stageMetadata1, false);
     //noinspection resource
     new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, Collections.emptyList(),
@@ -168,7 +147,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
     // Short timeoutMs should result in timeout
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10L, System.currentTimeMillis() + 10L,
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L,
             _stageMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
@@ -181,8 +160,8 @@ public class SortedMailboxReceiveOperatorTest {
     }
 
     // Longer timeout or default timeout (10s) doesn't result in timeout
-    context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, 10_000L,
-        System.currentTimeMillis() + 10_000L, _stageMetadata1, false);
+    context = new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, System.currentTimeMillis() + 10_000L,
+        _stageMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
         false, 1)) {
@@ -196,8 +175,7 @@ public class SortedMailboxReceiveOperatorTest {
   public void shouldReceiveSingletonNullMailbox() {
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
         false, 1)) {
@@ -210,8 +188,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1);
     when(_mailbox1.poll()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
         false, 1)) {
@@ -226,8 +203,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
         false, 1)) {
@@ -245,8 +221,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailbox1.poll()).thenReturn(
         TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException(errorMessage)));
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadata1, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.SINGLETON, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS,
         false, 1)) {
@@ -265,8 +240,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
         COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -289,8 +263,7 @@ public class SortedMailboxReceiveOperatorTest {
     when(_mailbox2.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
         COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -315,8 +288,7 @@ public class SortedMailboxReceiveOperatorTest {
         OperatorTestUtil.block(DATA_SCHEMA, row4), OperatorTestUtil.block(DATA_SCHEMA, row5),
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS,
         COLLATION_NULL_DIRECTIONS, false, 1)) {
@@ -347,8 +319,7 @@ public class SortedMailboxReceiveOperatorTest {
         TransferableBlockUtils.getEndOfStreamTransferableBlock());
 
     OpChainExecutionContext context =
-        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, Long.MAX_VALUE,
-            _stageMetadataBoth, false);
+        new OpChainExecutionContext(_mailboxService, 0, 0, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadataBoth, false);
     try (SortedMailboxReceiveOperator receiveOp = new SortedMailboxReceiveOperator(context,
         RelDistribution.Type.HASH_DISTRIBUTED, dataSchema, collationKeys, collationDirections, collationNullDirections,
         false, 1)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org