You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "walterddr (via GitHub)" <gi...@apache.org> on 2023/05/18 00:48:34 UTC

[GitHub] [pinot] walterddr opened a new pull request, #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

walterddr opened a new pull request, #10779:
URL: https://github.com/apache/pinot/pull/10779

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1198015757


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakOperator.java:
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+public class PipelineBreakOperator extends MultiStageOperator {
+  private static final String EXPLAIN_NAME = "PIPELINE_BREAKER";
+  private final Deque<Map.Entry<Integer, Operator<TransferableBlock>>> _workerEntries;
+  private final Map<Integer, List<TransferableBlock>> _resultMap;
+  private final CountDownLatch _workerDoneLatch;
+  private TransferableBlock _errorBlock;
+
+
+  public PipelineBreakOperator(OpChainExecutionContext context,
+      Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap) {
+    super(context);
+    _resultMap = new HashMap<>();
+    _workerEntries = new ArrayDeque<>();
+    _workerEntries.addAll(pipelineWorkerMap.entrySet());
+    _workerDoneLatch = new CountDownLatch(1);
+  }
+
+  public Map<Integer, List<TransferableBlock>> getResult() {
+    try {
+      boolean isWorkerDone =
+          _workerDoneLatch.await(_context.getDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+      if (isWorkerDone && _errorBlock == null) {
+        return _resultMap;
+      }
+    } catch (Exception e) {
+      _errorBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
+    }
+    return Collections.singletonMap(-1, Collections.singletonList(_errorBlock));
+  }
+
+  @Nullable
+  @Override
+  public String toExplainString() {
+    return EXPLAIN_NAME;
+  }
+
+  @Override
+  protected TransferableBlock getNextBlock() {
+    if (System.currentTimeMillis() > _context.getDeadlineMs()) {
+      _errorBlock = TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR);
+      _workerDoneLatch.countDown();
+      return _errorBlock;
+    }
+
+    // Poll from every mailbox operator in round-robin fashion:
+    // - Return the first content block
+    // - If no content block found but there are mailboxes not finished, return no-op block
+    // - If all content blocks are already returned, return end-of-stream block
+    int numWorkers = _workerEntries.size();
+    for (int i = 0; i < numWorkers; i++) {
+      Map.Entry<Integer, Operator<TransferableBlock>> worker = _workerEntries.remove();
+      TransferableBlock block = worker.getValue().nextBlock();
+
+      // Release the mailbox when the block is end-of-stream
+      if (block != null && block.isSuccessfulEndOfStreamBlock()) {
+        continue;
+      }
+
+      // Add the worker back to the queue if the block is not end-of-stream
+      _workerEntries.add(worker);
+      if (block != null) {
+        if (block.isErrorBlock()) {
+          _errorBlock = block;
+          _workerDoneLatch.countDown();
+          return _errorBlock;
+        }
+        List<TransferableBlock> blockList = _resultMap.computeIfAbsent(worker.getKey(), (k) -> new ArrayList<>());
+        blockList.add(block);

Review Comment:
   TODO: 
   1. only data block should be added, b/c non-datablock are not useful for pipeline breaker results
   2. consider a way to pass around stats and metadata, currently these info are discarded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1207492947


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -143,22 +148,51 @@ public void shutDown()
   }
 
   public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
+    try {
     long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
     long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
     boolean isTraceEnabled =
         Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
+    // run OpChain
     if (isLeafStage(distributedStagePlan)) {
-      OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs,
-          requestId);
+      // pre-stage execution for all pipeline breakers
+      // TODO: pipeline breaker is now only supported by leaf stage, to be supported by all OpChain
+      PipelineBreakerContext pipelineBreakerContext = executePipelineBreakers(_intermScheduler, distributedStagePlan,
+          timeoutMs, deadlineMs, requestId, isTraceEnabled);
+      OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, pipelineBreakerContext,
+          timeoutMs, deadlineMs, requestId);
       _leafScheduler.register(rootOperator);
     } else {
       PlanNode stageRoot = distributedStagePlan.getStageRoot();
-      OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
+      OpChain rootOperator = PhysicalPlanVisitor.walkPlanNode(stageRoot,
           new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
-              distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), isTraceEnabled));
+              distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled));
       _intermScheduler.register(rootOperator);
     }
+    } catch (Throwable t) {
+      LOGGER.error("", t);
+    }
+  }
+
+  private PipelineBreakerContext executePipelineBreakers(OpChainSchedulerService scheduler,
+      DistributedStagePlan distributedStagePlan, long timeoutMs, long deadlineMs, long requestId,
+      boolean isTraceEnabled) {
+    PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext();
+    PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(), pipelineBreakerContext);
+    if (pipelineBreakerContext.getPipelineBreakerMap().size() > 0) {
+      PlanNode stageRoot = distributedStagePlan.getStageRoot();
+      PlanRequestContext planRequestContext =
+          new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
+              distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled);
+      Map<Integer, List<TransferableBlock>> resultMap =
+          PipelineBreakerExecutionUtils.execute(scheduler, pipelineBreakerContext, planRequestContext);
+      Preconditions.checkState(!resultMap.containsKey(-1), "Pipeline Breaker received error block!");

Review Comment:
   done. throwing instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1202880301


##########
pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java:
##########
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.hint.PinotHintOptions;
+import org.apache.calcite.rel.hint.PinotHintStrategyTable;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.zookeeper.common.StringUtils;
+
+
+/**
+ * Special rule for Pinot, this rule transposing an INNER JOIN into dynamic broadcast join for the leaf stage.
+ *
+ * <p>Consider the following INNER JOIN plan
+ *
+ *                  ...                                     ...
+ *                   |                                       |
+ *             [ Transform ]                           [ Transform ]
+ *                   |                                       |
+ *             [ Inner Join ]                     [ Pass-through xChange ]
+ *             /            \                          /
+ *        [xChange]      [xChange]              [Inner Join] <----
+ *           /                \                      /            \
+ *     [ Transform ]     [ Transform ]         [ Transform ]       \
+ *          |                  |                    |               \
+ *     [Proj/Filter]     [Proj/Filter]         [Proj/Filter]  <------\
+ *          |                  |                    |                 \
+ *     [Table Scan ]     [Table Scan ]         [Table Scan ]       [xChange]
+ *                                                                      \
+ *                                                                 [ Transform ]
+ *                                                                       |
+ *                                                                 [Proj/Filter]
+ *                                                                       |
+ *                                                                 [Table Scan ]
+ *
+ * <p> This rule is one part of the overall mechanism and this rule only does the following
+ *
+ *                 ...                                      ...
+ *                  |                                        |
+ *            [ Transform ]                             [ Transform ]
+ *                  |                                       /
+ *            [ Inner Join ]                     [Pass-through xChange]
+ *            /            \                            /
+ *       [xChange]      [xChange]          |----[Dyn. Broadcast]   <-----
+ *          /                \             |           |                 \
+ *    [ Transform ]     [ Transform ]      |----> [ Inner Join ]      [xChange]
+ *         |                  |            |           |                   \
+ *    [Proj/Filter]     [Proj/Filter]      |      [ Transform ]       [ Transform ]
+ *         |                  |            |          |                    |
+ *    [Table Scan ]     [Table Scan ]      |----> [Proj/Filter]       [Proj/Filter]
+ *                                                     |                    |
+ *                                                [Table Scan ]       [Table Scan ]
+ *
+ *
+ *
+ * <p> The next part to extend the Dynamic broadcast into the Proj/Filter operator happens in the runtime.
+ *
+ * <p> This rewrite is only useful if we can ensure that:
+ * <ul>
+ *   <li>new plan can leverage the indexes and fast filtering of the leaf-stage Pinot server processing logic.</li>
+ *   <li>
+ *     data sending over xChange should be relatively small comparing to data would've been selected out if the dynamic
+ *     broadcast is not applied.
+ *   </li>
+ *   <li>
+ *     since leaf-stage operator can only process a typical Pinot V1 engine chain-operator chain. This means the entire
+ *     right-table will be ship over and persist in memory before the dynamic broadcast can occur. memory foot print
+ *     will be high so this rule should be use carefully until we have cost-based optimization.
+ *   </li>
+ *   <li>
+ *     if the dynamic broadcast stage is broadcasting to a left-table that's pre-partitioned with the join key, then the
+ *     right-table will be ship over and persist in memory using hash distribution. memory foot print will be
+ *     relatively low comparing to non-partitioned.
+ *   </li>
+ *   <li>
+ *     if the dynamic broadcast stage operating on a table with the same partition scheme as the left-table, then there
+ *     is not going to be any network shuffling overhead. both memory and latency overhead will be low.
+ *   </li>
+ * </ul>
+ *
+ * TODO #1: Only support SEMI-JOIN, once JOIN operator is supported by leaf-stage we should allow it to match
+ *   @see <a href="https://github.com/apache/pinot/pull/10565/>
+ * TODO #2: Only convert to dynamic broadcast from right-to-left, allow option to specify dynamic broadcast direction.
+ * TODO #3: Only convert to dynamic broadcast if left is leaf stage, allow the option for intermediate stage.
+ * TODO #4: currently adding a pass-through after join, support leaf-stage to chain arbitrary operator(s) next.
+ */
+public class PinotJoinToDynamicBroadcastRule extends RelOptRule {
+  public static final PinotJoinToDynamicBroadcastRule INSTANCE =
+      new PinotJoinToDynamicBroadcastRule(PinotRuleUtils.PINOT_REL_FACTORY);
+  private static final String DYNAMIC_BROADCAST_HINT_OPTION_VALUE = "dynamic_broadcast";
+
+  public PinotJoinToDynamicBroadcastRule(RelBuilderFactory factory) {
+    super(operand(LogicalJoin.class, any()), factory, null);
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    if (call.rels.length < 1 || !(call.rel(0) instanceof Join)) {
+      return false;
+    }
+    Join join = call.rel(0);
+    String joinStrategyString = PinotHintStrategyTable.getHintOption(join.getHints(),
+        PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
+    List<String> joinStrategies = joinStrategyString != null ? StringUtils.split(joinStrategyString, ",")
+        : Collections.emptyList();
+    if (!joinStrategies.contains(DYNAMIC_BROADCAST_HINT_OPTION_VALUE)) {
+      return false;
+    }
+    JoinInfo joinInfo = join.analyzeCondition();
+    RelNode left = join.getLeft() instanceof HepRelVertex ? ((HepRelVertex) join.getLeft()).getCurrentRel()
+        : join.getLeft();
+    RelNode right = join.getRight() instanceof HepRelVertex ? ((HepRelVertex) join.getRight()).getCurrentRel()
+        : join.getRight();
+    return left instanceof LogicalExchange && right instanceof LogicalExchange
+        && PinotRuleUtils.noExchangeInSubtree(left.getInput(0))
+        && (join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty());
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    Join join = call.rel(0);
+    LogicalExchange left = (LogicalExchange) (join.getLeft() instanceof HepRelVertex
+        ? ((HepRelVertex) join.getLeft()).getCurrentRel() : join.getLeft());
+    LogicalExchange right = (LogicalExchange) (join.getRight() instanceof HepRelVertex
+        ? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight());
+
+    // TODO: both optimized hash empty list should be singleton exchange. however it is broken. fix later.
+    boolean isColocatedJoin = PinotHintStrategyTable.containsHintOption(join.getHints(),
+        PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
+    LogicalExchange dynamicBroadcastExchange = isColocatedJoin
+        ? LogicalExchange.create(right.getInput(), RelDistributions.hash(Collections.emptyList()))

Review Comment:
   singleton fix pending: https://github.com/apache/pinot/pull/10797



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1198000788


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -143,22 +148,51 @@ public void shutDown()
   }
 
   public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
+    try {
     long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
     long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
     boolean isTraceEnabled =
         Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
+    // run OpChain
     if (isLeafStage(distributedStagePlan)) {
-      OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs,
-          requestId);
+      // pre-stage execution for all pipeline breakers
+      // TODO: pipeline breaker is now only supported by leaf stage, to be supported by all OpChain
+      PipelineBreakerContext pipelineBreakerContext = executePipelineBreakers(_intermScheduler, distributedStagePlan,
+          timeoutMs, deadlineMs, requestId, isTraceEnabled);

Review Comment:
   TODO: this execution is done on a dedicated threadpool, but is awaiting on the GRPC runner main thread (parked)
   we should come up with a better wait and continue model



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr merged pull request #10779: [multistage] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr merged PR #10779:
URL: https://github.com/apache/pinot/pull/10779


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1198026292


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java:
##########
@@ -291,4 +169,106 @@ private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo t
       pinotQuery.setFilterExpression(timeFilterExpression);
     }
   }
+
+  /**
+   * attach the dynamic filter to the given PinotQuery.
+   */
+  static void attachDynamicFilter(PinotQuery pinotQuery, JoinNode.JoinKeys joinKeys, List<Object[]> dataContainer,
+      DataSchema dataSchema) {

Review Comment:
   TODO: handle 
   1. empty dataContainer,
   2. empty existing filter
   3. null dataContainer or errored dataContainer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1202880069


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java:
##########
@@ -291,4 +169,106 @@ private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo t
       pinotQuery.setFilterExpression(timeFilterExpression);
     }
   }
+
+  /**
+   * attach the dynamic filter to the given PinotQuery.
+   */
+  static void attachDynamicFilter(PinotQuery pinotQuery, JoinNode.JoinKeys joinKeys, List<Object[]> dataContainer,
+      DataSchema dataSchema) {

Review Comment:
   [update]
   - 1 & 2 handled
   - 3 should be handled by error handling on the pipeline context level



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #10779:
URL: https://github.com/apache/pinot/pull/10779#issuecomment-1552287266

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/10779?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#10779](https://app.codecov.io/gh/apache/pinot/pull/10779?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (f5517b0) into [master](https://app.codecov.io/gh/apache/pinot/commit/4f1564137937bade8be9c76020819815cf40125f?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (4f15641) will **decrease** coverage by `55.32%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #10779       +/-   ##
   =============================================
   - Coverage     68.96%   13.65%   -55.32%     
   + Complexity     6483      439     -6044     
   =============================================
     Files          2157     2108       -49     
     Lines        116058   113770     -2288     
     Branches      17559    17290      -269     
   =============================================
   - Hits          80043    15534    -64509     
   - Misses        30410    96967    +66557     
   + Partials       5605     1269     -4336     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `13.65% <0.00%> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://app.codecov.io/gh/apache/pinot/pull/10779?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...he/pinot/query/planner/plannode/AggregateNode.java](https://app.codecov.io/gh/apache/pinot/pull/10779?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcGxhbm5lci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcGxhbm5lci9wbGFubm9kZS9BZ2dyZWdhdGVOb2RlLmphdmE=) | `0.00% <0.00%> (-89.48%)` | :arrow_down: |
   | [...org/apache/pinot/query/mailbox/MailboxIdUtils.java](https://app.codecov.io/gh/apache/pinot/pull/10779?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvbWFpbGJveC9NYWlsYm94SWRVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...va/org/apache/pinot/query/runtime/QueryRunner.java](https://app.codecov.io/gh/apache/pinot/pull/10779?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9RdWVyeVJ1bm5lci5qYXZh) | `0.00% <0.00%> (-88.29%)` | :arrow_down: |
   | [...apache/pinot/query/runtime/operator/OpChainId.java](https://app.codecov.io/gh/apache/pinot/pull/10779?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9vcGVyYXRvci9PcENoYWluSWQuamF2YQ==) | `0.00% <0.00%> (-64.29%)` | :arrow_down: |
   | [.../pinot/query/runtime/plan/PhysicalPlanVisitor.java](https://app.codecov.io/gh/apache/pinot/pull/10779?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL1BoeXNpY2FsUGxhblZpc2l0b3IuamF2YQ==) | `0.00% <ø> (-93.23%)` | :arrow_down: |
   | [...e/pinot/query/runtime/plan/PlanRequestContext.java](https://app.codecov.io/gh/apache/pinot/pull/10779?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL1BsYW5SZXF1ZXN0Q29udGV4dC5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...y/runtime/plan/pipeline/PipelineBreakOperator.java](https://app.codecov.io/gh/apache/pinot/pull/10779?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL3BpcGVsaW5lL1BpcGVsaW5lQnJlYWtPcGVyYXRvci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [.../runtime/plan/pipeline/PipelineBreakerContext.java](https://app.codecov.io/gh/apache/pinot/pull/10779?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL3BpcGVsaW5lL1BpcGVsaW5lQnJlYWtlckNvbnRleHQuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...e/plan/pipeline/PipelineBreakerExecutionUtils.java](https://app.codecov.io/gh/apache/pinot/pull/10779?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL3BpcGVsaW5lL1BpcGVsaW5lQnJlYWtlckV4ZWN1dGlvblV0aWxzLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [.../runtime/plan/pipeline/PipelineBreakerVisitor.java](https://app.codecov.io/gh/apache/pinot/pull/10779?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcXVlcnktcnVudGltZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcXVlcnkvcnVudGltZS9wbGFuL3BpcGVsaW5lL1BpcGVsaW5lQnJlYWtlclZpc2l0b3IuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | ... and [3 more](https://app.codecov.io/gh/apache/pinot/pull/10779?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | |
   
   ... and [1669 files with indirect coverage changes](https://app.codecov.io/gh/apache/pinot/pull/10779/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1207495051


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerRequestPlanVisitor.java:
##########
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.server;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.query.parser.CalciteRexExpressionParser;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+/**
+ * Plan visitor for direct leaf-stage server request.
+ *
+ * This should be merged with logics in {@link org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2} in the future
+ * to directly produce operator chain.
+ *
+ * As of now, the reason why we use the plan visitor for server request is for additional support such as dynamic
+ * filtering and other auxiliary functionalities.
+ */
+public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPlanRequestContext> {
+  private static final ServerRequestPlanVisitor INSTANCE = new ServerRequestPlanVisitor();
+
+  static void walkStageNode(PlanNode node, ServerPlanRequestContext context) {
+    node.visit(INSTANCE, context);
+  }

Review Comment:
   done. catch all added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1198024063


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java:
##########
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+
+
+public class PipelineBreakerVisitor extends DefaultPostOrderTraversalVisitor<Void, PipelineBreakerContext> {
+  private static final PlanNodeVisitor<Void, PipelineBreakerContext> INSTANCE = new PipelineBreakerVisitor();
+
+  public static void visitPlanRoot(PlanNode root, PipelineBreakerContext context) {
+    root.visit(PipelineBreakerVisitor.INSTANCE, context);
+  }
+
+  @Override
+  public Void process(PlanNode planNode, PipelineBreakerContext context) {
+    context.visitedNewPlanNode(planNode);
+    return null;
+  }
+
+  @Override
+  public Void visitMailboxReceive(MailboxReceiveNode node, PipelineBreakerContext context) {
+    process(node, context);
+    // TODO: actually implement pipeline breaker attribute in PlanNode
+    // currently all mailbox receive node from leaf stage is considered as pipeline breaker node.
+    context.addPipelineBreaker(node);
+    return null;

Review Comment:
   TODO: @xiangfu0 we need a pipeline breaker indicator flag in planner to tell "this is a pipeline breaker node"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1198011452


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -180,12 +214,12 @@ public ExecutorService getQueryRunnerExecutorService() {
   }
 
   private OpChain compileLeafStage(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap,
-      long timeoutMs, long deadlineMs, long requestId) {
+      PipelineBreakerContext pipelineBreakerContext, long timeoutMs, long deadlineMs, long requestId) {

Review Comment:
   TODO: consider if this is the best way to pass around extra info for each visitor/builder 
   
   we already have
   - 3 plan contexts (1 more is added in this PR)
   - 3 visitors (1 more is added in this PR)
   
   all of these needs to create slightly different visitor context.
   this means changing the function signature like this line will impact 10+ places, most of these are passthrough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1207493170


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -143,22 +148,51 @@ public void shutDown()
   }
 
   public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
+    try {
     long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
     long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
     boolean isTraceEnabled =
         Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
+    // run OpChain
     if (isLeafStage(distributedStagePlan)) {
-      OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs,
-          requestId);
+      // pre-stage execution for all pipeline breakers
+      // TODO: pipeline breaker is now only supported by leaf stage, to be supported by all OpChain
+      PipelineBreakerContext pipelineBreakerContext = executePipelineBreakers(_intermScheduler, distributedStagePlan,
+          timeoutMs, deadlineMs, requestId, isTraceEnabled);
+      OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, pipelineBreakerContext,
+          timeoutMs, deadlineMs, requestId);
       _leafScheduler.register(rootOperator);
     } else {
       PlanNode stageRoot = distributedStagePlan.getStageRoot();
-      OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
+      OpChain rootOperator = PhysicalPlanVisitor.walkPlanNode(stageRoot,
           new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
-              distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), isTraceEnabled));
+              distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled));
       _intermScheduler.register(rootOperator);
     }
+    } catch (Throwable t) {
+      LOGGER.error("", t);
+    }
+  }
+
+  private PipelineBreakerContext executePipelineBreakers(OpChainSchedulerService scheduler,
+      DistributedStagePlan distributedStagePlan, long timeoutMs, long deadlineMs, long requestId,
+      boolean isTraceEnabled) {
+    PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext();

Review Comment:
   done. split into PipelineBreakerContext and PipelineBreakerResult



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1198021180


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutionUtils.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
+import org.apache.pinot.query.runtime.plan.PlanRequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class to run pipeline breaker execution and collects the results.
+ */
+public class PipelineBreakerExecutionUtils {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipelineBreakerExecutionUtils.class);
+
+  private PipelineBreakerExecutionUtils() {
+    // do not instantiate.
+  }
+
+  public static Map<Integer, List<TransferableBlock>> execute(OpChainSchedulerService scheduler,

Review Comment:
   TODO: echo back to the main point --> this currently runs on the QueryServer thread



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1198004291


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -143,22 +148,51 @@ public void shutDown()
   }
 
   public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
+    try {
     long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
     long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
     boolean isTraceEnabled =
         Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
+    // run OpChain
     if (isLeafStage(distributedStagePlan)) {
-      OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs,
-          requestId);
+      // pre-stage execution for all pipeline breakers
+      // TODO: pipeline breaker is now only supported by leaf stage, to be supported by all OpChain
+      PipelineBreakerContext pipelineBreakerContext = executePipelineBreakers(_intermScheduler, distributedStagePlan,
+          timeoutMs, deadlineMs, requestId, isTraceEnabled);
+      OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, pipelineBreakerContext,
+          timeoutMs, deadlineMs, requestId);
       _leafScheduler.register(rootOperator);
     } else {
       PlanNode stageRoot = distributedStagePlan.getStageRoot();
-      OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
+      OpChain rootOperator = PhysicalPlanVisitor.walkPlanNode(stageRoot,
           new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
-              distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), isTraceEnabled));
+              distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled));
       _intermScheduler.register(rootOperator);
     }
+    } catch (Throwable t) {
+      LOGGER.error("", t);
+    }
+  }
+
+  private PipelineBreakerContext executePipelineBreakers(OpChainSchedulerService scheduler,
+      DistributedStagePlan distributedStagePlan, long timeoutMs, long deadlineMs, long requestId,
+      boolean isTraceEnabled) {
+    PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext();

Review Comment:
   PipelineBreakerContext does a lot of things
   1. capture all the pipeline breaking operators needs to be executed first 
       - currently only mailbox receive operator can trigger this but later it should support others
   2. indexes the PlanNodes with post-order traverse, so when later the PhysicalPlanVisitor hits one of these pipeline breaker PlanNode, it knows what index used to retrieve the PipelineBreaker results
   3. host the pipeline breaker results in a map
   
   all 3 should be separate (especially 2, which should be done individually also for stats collecting purpose)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1198014832


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakOperator.java:
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
+import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+
+
+public class PipelineBreakOperator extends MultiStageOperator {
+  private static final String EXPLAIN_NAME = "PIPELINE_BREAKER";
+  private final Deque<Map.Entry<Integer, Operator<TransferableBlock>>> _workerEntries;
+  private final Map<Integer, List<TransferableBlock>> _resultMap;
+  private final CountDownLatch _workerDoneLatch;
+  private TransferableBlock _errorBlock;
+
+
+  public PipelineBreakOperator(OpChainExecutionContext context,
+      Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap) {
+    super(context);
+    _resultMap = new HashMap<>();
+    _workerEntries = new ArrayDeque<>();
+    _workerEntries.addAll(pipelineWorkerMap.entrySet());
+    _workerDoneLatch = new CountDownLatch(1);
+  }
+
+  public Map<Integer, List<TransferableBlock>> getResult() {
+    try {
+      boolean isWorkerDone =
+          _workerDoneLatch.await(_context.getDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+      if (isWorkerDone && _errorBlock == null) {
+        return _resultMap;
+      }
+    } catch (Exception e) {
+      _errorBlock = TransferableBlockUtils.getErrorTransferableBlock(e);
+    }
+    return Collections.singletonMap(-1, Collections.singletonList(_errorBlock));

Review Comment:
   TODO: consider if we should return error here, or throw --> which will be a question for the main opChain
   1. if the main OpChain is using this pipeline breaker result as input to planVisitor, then we should throw, b/c main OpChain planvisitor will failed, and mailbox send operator will not be created, thus the error should be returned to broker directly
   2. if the main OpChain is using it as data block for input (e.g. fake the mailbox received that has already been received as a sequential access operator), then we should retain the error block b/c the error block also contain metadata info useful for debugging



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1198077928


##########
pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java:
##########
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.RelDistributions;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.hint.PinotHintOptions;
+import org.apache.calcite.rel.hint.PinotHintStrategyTable;
+import org.apache.calcite.rel.logical.LogicalExchange;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.zookeeper.common.StringUtils;
+
+
+/**
+ * Special rule for Pinot, this rule transposing an INNER JOIN into dynamic broadcast join for the leaf stage.
+ *
+ * <p>Consider the following INNER JOIN plan
+ *
+ *                  ...                                     ...
+ *                   |                                       |
+ *             [ Transform ]                           [ Transform ]
+ *                   |                                       |
+ *             [ Inner Join ]                     [ Pass-through xChange ]
+ *             /            \                          /
+ *        [xChange]      [xChange]              [Inner Join] <----
+ *           /                \                      /            \
+ *     [ Transform ]     [ Transform ]         [ Transform ]       \
+ *          |                  |                    |               \
+ *     [Proj/Filter]     [Proj/Filter]         [Proj/Filter]  <------\
+ *          |                  |                    |                 \
+ *     [Table Scan ]     [Table Scan ]         [Table Scan ]       [xChange]
+ *                                                                      \
+ *                                                                 [ Transform ]
+ *                                                                       |
+ *                                                                 [Proj/Filter]
+ *                                                                       |
+ *                                                                 [Table Scan ]
+ *
+ * <p> This rule is one part of the overall mechanism and this rule only does the following
+ *
+ *                 ...                                      ...
+ *                  |                                        |
+ *            [ Transform ]                             [ Transform ]
+ *                  |                                       /
+ *            [ Inner Join ]                     [Pass-through xChange]
+ *            /            \                            /
+ *       [xChange]      [xChange]          |----[Dyn. Broadcast]   <-----
+ *          /                \             |           |                 \
+ *    [ Transform ]     [ Transform ]      |----> [ Inner Join ]      [xChange]
+ *         |                  |            |           |                   \
+ *    [Proj/Filter]     [Proj/Filter]      |      [ Transform ]       [ Transform ]
+ *         |                  |            |          |                    |
+ *    [Table Scan ]     [Table Scan ]      |----> [Proj/Filter]       [Proj/Filter]
+ *                                                     |                    |
+ *                                                [Table Scan ]       [Table Scan ]
+ *
+ *
+ *
+ * <p> The next part to extend the Dynamic broadcast into the Proj/Filter operator happens in the runtime.
+ *
+ * <p> This rewrite is only useful if we can ensure that:
+ * <ul>
+ *   <li>new plan can leverage the indexes and fast filtering of the leaf-stage Pinot server processing logic.</li>
+ *   <li>
+ *     data sending over xChange should be relatively small comparing to data would've been selected out if the dynamic
+ *     broadcast is not applied.
+ *   </li>
+ *   <li>
+ *     since leaf-stage operator can only process a typical Pinot V1 engine chain-operator chain. This means the entire
+ *     right-table will be ship over and persist in memory before the dynamic broadcast can occur. memory foot print
+ *     will be high so this rule should be use carefully until we have cost-based optimization.
+ *   </li>
+ *   <li>
+ *     if the dynamic broadcast stage is broadcasting to a left-table that's pre-partitioned with the join key, then the
+ *     right-table will be ship over and persist in memory using hash distribution. memory foot print will be
+ *     relatively low comparing to non-partitioned.
+ *   </li>
+ *   <li>
+ *     if the dynamic broadcast stage operating on a table with the same partition scheme as the left-table, then there
+ *     is not going to be any network shuffling overhead. both memory and latency overhead will be low.
+ *   </li>
+ * </ul>
+ *
+ * TODO #1: Only support SEMI-JOIN, once JOIN operator is supported by leaf-stage we should allow it to match
+ *   @see <a href="https://github.com/apache/pinot/pull/10565/>
+ * TODO #2: Only convert to dynamic broadcast from right-to-left, allow option to specify dynamic broadcast direction.
+ * TODO #3: Only convert to dynamic broadcast if left is leaf stage, allow the option for intermediate stage.
+ * TODO #4: currently adding a pass-through after join, support leaf-stage to chain arbitrary operator(s) next.
+ */
+public class PinotJoinToDynamicBroadcastRule extends RelOptRule {
+  public static final PinotJoinToDynamicBroadcastRule INSTANCE =
+      new PinotJoinToDynamicBroadcastRule(PinotRuleUtils.PINOT_REL_FACTORY);
+  private static final String DYNAMIC_BROADCAST_HINT_OPTION_VALUE = "dynamic_broadcast";
+
+  public PinotJoinToDynamicBroadcastRule(RelBuilderFactory factory) {
+    super(operand(LogicalJoin.class, any()), factory, null);
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    if (call.rels.length < 1 || !(call.rel(0) instanceof Join)) {
+      return false;
+    }
+    Join join = call.rel(0);
+    String joinStrategyString = PinotHintStrategyTable.getHintOption(join.getHints(),
+        PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
+    List<String> joinStrategies = joinStrategyString != null ? StringUtils.split(joinStrategyString, ",")
+        : Collections.emptyList();
+    if (!joinStrategies.contains(DYNAMIC_BROADCAST_HINT_OPTION_VALUE)) {
+      return false;
+    }
+    JoinInfo joinInfo = join.analyzeCondition();
+    RelNode left = join.getLeft() instanceof HepRelVertex ? ((HepRelVertex) join.getLeft()).getCurrentRel()
+        : join.getLeft();
+    RelNode right = join.getRight() instanceof HepRelVertex ? ((HepRelVertex) join.getRight()).getCurrentRel()
+        : join.getRight();
+    return left instanceof LogicalExchange && right instanceof LogicalExchange
+        && PinotRuleUtils.noExchangeInSubtree(left.getInput(0))
+        && (join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty());
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    Join join = call.rel(0);
+    LogicalExchange left = (LogicalExchange) (join.getLeft() instanceof HepRelVertex
+        ? ((HepRelVertex) join.getLeft()).getCurrentRel() : join.getLeft());
+    LogicalExchange right = (LogicalExchange) (join.getRight() instanceof HepRelVertex
+        ? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight());
+
+    // TODO: both optimized hash empty list should be singleton exchange. however it is broken. fix later.
+    boolean isColocatedJoin = PinotHintStrategyTable.containsHintOption(join.getHints(),
+        PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
+    LogicalExchange dynamicBroadcastExchange = isColocatedJoin
+        ? LogicalExchange.create(right.getInput(), RelDistributions.hash(Collections.emptyList()))

Review Comment:
   TODO: we have to move this to hash with empty List (deterministically to 1-server) instead of singleton. we need to clean up singleton before this can work. CC @xiangfu0 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1198009335


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -143,22 +148,51 @@ public void shutDown()
   }
 
   public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
+    try {
     long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
     long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
     boolean isTraceEnabled =
         Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
+    // run OpChain
     if (isLeafStage(distributedStagePlan)) {
-      OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs,
-          requestId);
+      // pre-stage execution for all pipeline breakers
+      // TODO: pipeline breaker is now only supported by leaf stage, to be supported by all OpChain
+      PipelineBreakerContext pipelineBreakerContext = executePipelineBreakers(_intermScheduler, distributedStagePlan,
+          timeoutMs, deadlineMs, requestId, isTraceEnabled);
+      OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, pipelineBreakerContext,
+          timeoutMs, deadlineMs, requestId);
       _leafScheduler.register(rootOperator);
     } else {
       PlanNode stageRoot = distributedStagePlan.getStageRoot();
-      OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
+      OpChain rootOperator = PhysicalPlanVisitor.walkPlanNode(stageRoot,
           new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
-              distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), isTraceEnabled));
+              distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled));
       _intermScheduler.register(rootOperator);
     }
+    } catch (Throwable t) {
+      LOGGER.error("", t);
+    }
+  }
+
+  private PipelineBreakerContext executePipelineBreakers(OpChainSchedulerService scheduler,
+      DistributedStagePlan distributedStagePlan, long timeoutMs, long deadlineMs, long requestId,
+      boolean isTraceEnabled) {
+    PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext();
+    PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(), pipelineBreakerContext);
+    if (pipelineBreakerContext.getPipelineBreakerMap().size() > 0) {
+      PlanNode stageRoot = distributedStagePlan.getStageRoot();
+      PlanRequestContext planRequestContext =
+          new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
+              distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled);

Review Comment:
   PlanRequestContext is created/shared between the main opChain and the PipelineBreaker opChain --> this means at a given time only one of them can run. 
   
   1. problem we stated earlier: mailboxID-to-opChainID conversion is a hard-coded extraction, we should 
       - use different opChainID for pre-stage pipeline breaker and main stage opChain
       - decouple it by specifically initialize ReceivingMailbox with the appropriate opChainID to wake up
   2. the visitor pattern of the physicalPlanVisitor/ServerPlanVisitor needs to be cleaned up - currently the common info are filled in twice (once in physical plan visitor, once in pipeline breaker visitor) and we rely on the fact that those common pieces remains the same, we should: 
       - fill the common pieces only once 
       - fill in the separate pipeline-breaker info and main info respectively in each individual runs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1198025615


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerRequestPlanVisitor.java:
##########
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.server;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.query.parser.CalciteRexExpressionParser;
+import org.apache.pinot.query.planner.plannode.AggregateNode;
+import org.apache.pinot.query.planner.plannode.ExchangeNode;
+import org.apache.pinot.query.planner.plannode.FilterNode;
+import org.apache.pinot.query.planner.plannode.JoinNode;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.PlanNodeVisitor;
+import org.apache.pinot.query.planner.plannode.ProjectNode;
+import org.apache.pinot.query.planner.plannode.SetOpNode;
+import org.apache.pinot.query.planner.plannode.SortNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
+import org.apache.pinot.query.planner.plannode.ValueNode;
+import org.apache.pinot.query.planner.plannode.WindowNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+
+/**
+ * Plan visitor for direct leaf-stage server request.
+ *
+ * This should be merged with logics in {@link org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2} in the future
+ * to directly produce operator chain.
+ *
+ * As of now, the reason why we use the plan visitor for server request is for additional support such as dynamic
+ * filtering and other auxiliary functionalities.
+ */
+public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPlanRequestContext> {
+  private static final ServerRequestPlanVisitor INSTANCE = new ServerRequestPlanVisitor();
+
+  static void walkStageNode(PlanNode node, ServerPlanRequestContext context) {
+    node.visit(INSTANCE, context);
+  }

Review Comment:
   TODO: both this and the `PhysicalPlanVisitor.walkStageNode` requires a catch-all-and-throw
   
   previously if planFragment can be generated it is guarantee to correctly create opChain, this is no longer true b/c the data received from pipeline breaker can be arbitrary (e.g. empty, null, single-row expected but multiple-row received, etc) we need a way to log the error and return back to broker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1198023915


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutionUtils.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.pipeline;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
+import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
+import org.apache.pinot.query.runtime.operator.OpChain;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
+import org.apache.pinot.query.runtime.plan.PlanRequestContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class to run pipeline breaker execution and collects the results.
+ */
+public class PipelineBreakerExecutionUtils {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PipelineBreakerExecutionUtils.class);
+
+  private PipelineBreakerExecutionUtils() {
+    // do not instantiate.
+  }
+
+  public static Map<Integer, List<TransferableBlock>> execute(OpChainSchedulerService scheduler,
+      PipelineBreakerContext context, PlanRequestContext planRequestContext) {
+    Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap = new HashMap<>();
+    for (Map.Entry<Integer, PlanNode> e : context.getPipelineBreakerMap().entrySet()) {
+      int key = e.getKey();
+      PlanNode planNode = e.getValue();
+      if (!(planNode instanceof MailboxReceiveNode)) {
+        throw new UnsupportedOperationException("Only MailboxReceiveNode is supported to run as pipeline breaker now");
+      }
+      OpChain tempOpChain = PhysicalPlanVisitor.walkPlanNode(planNode, planRequestContext);
+      pipelineWorkerMap.put(key, tempOpChain.getRoot());

Review Comment:
   TODO: Technically this tempOpChain can run any intermediary stage opChain. but haven't been tested



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1207267751


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -143,22 +148,51 @@ public void shutDown()
   }
 
   public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
+    try {
     long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
     long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
     boolean isTraceEnabled =
         Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
+    // run OpChain
     if (isLeafStage(distributedStagePlan)) {
-      OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs,
-          requestId);
+      // pre-stage execution for all pipeline breakers
+      // TODO: pipeline breaker is now only supported by leaf stage, to be supported by all OpChain
+      PipelineBreakerContext pipelineBreakerContext = executePipelineBreakers(_intermScheduler, distributedStagePlan,
+          timeoutMs, deadlineMs, requestId, isTraceEnabled);

Review Comment:
   this is addressed via #10791



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #10779: [multistage][mvp] pipeline breaker and dynamic broadcast runtime for semi-join

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #10779:
URL: https://github.com/apache/pinot/pull/10779#discussion_r1198009809


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java:
##########
@@ -143,22 +148,51 @@ public void shutDown()
   }
 
   public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
+    try {
     long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
     long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
     boolean isTraceEnabled =
         Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
+    // run OpChain
     if (isLeafStage(distributedStagePlan)) {
-      OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs,
-          requestId);
+      // pre-stage execution for all pipeline breakers
+      // TODO: pipeline breaker is now only supported by leaf stage, to be supported by all OpChain
+      PipelineBreakerContext pipelineBreakerContext = executePipelineBreakers(_intermScheduler, distributedStagePlan,
+          timeoutMs, deadlineMs, requestId, isTraceEnabled);
+      OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, pipelineBreakerContext,
+          timeoutMs, deadlineMs, requestId);
       _leafScheduler.register(rootOperator);
     } else {
       PlanNode stageRoot = distributedStagePlan.getStageRoot();
-      OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot,
+      OpChain rootOperator = PhysicalPlanVisitor.walkPlanNode(stageRoot,
           new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
-              distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), isTraceEnabled));
+              distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled));
       _intermScheduler.register(rootOperator);
     }
+    } catch (Throwable t) {
+      LOGGER.error("", t);
+    }
+  }
+
+  private PipelineBreakerContext executePipelineBreakers(OpChainSchedulerService scheduler,
+      DistributedStagePlan distributedStagePlan, long timeoutMs, long deadlineMs, long requestId,
+      boolean isTraceEnabled) {
+    PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext();
+    PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(), pipelineBreakerContext);
+    if (pipelineBreakerContext.getPipelineBreakerMap().size() > 0) {
+      PlanNode stageRoot = distributedStagePlan.getStageRoot();
+      PlanRequestContext planRequestContext =
+          new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs,
+              distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled);
+      Map<Integer, List<TransferableBlock>> resultMap =
+          PipelineBreakerExecutionUtils.execute(scheduler, pipelineBreakerContext, planRequestContext);
+      Preconditions.checkState(!resultMap.containsKey(-1), "Pipeline Breaker received error block!");

Review Comment:
   TODO: give a better error message handling



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org