You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/09/21 03:16:49 UTC

[pinot] 01/01: Acquire-release

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch acquire_release
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 0f3ff1918fafad93149406d4f61f1d21ee959fd8
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Mon Sep 20 20:16:17 2021 -0700

    Acquire-release
---
 .../AcquireReleaseColumnsSegmentOperator.java      | 25 +++++++++++++---------
 .../core/operator/combine/BaseCombineOperator.java | 11 +++++++++-
 .../combine/GroupByOrderByCombineOperator.java     | 17 ++++++++++-----
 .../plan/AcquireReleaseColumnsSegmentPlanNode.java |  2 +-
 4 files changed, 38 insertions(+), 17 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
index 127f38f..04dc79b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.operator;
 
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.plan.PlanNode;
 import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 
@@ -31,13 +32,13 @@ import org.apache.pinot.segment.spi.IndexSegment;
 public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
   private static final String OPERATOR_NAME = "AcquireReleaseColumnsSegmentOperator";
 
-  private final Operator _childOperator;
+  private final PlanNode _planNode;
   private final IndexSegment _indexSegment;
   private final FetchContext _fetchContext;
+  private Operator _childOperator;
 
-  public AcquireReleaseColumnsSegmentOperator(Operator childOperator, IndexSegment indexSegment,
-      FetchContext fetchContext) {
-    _childOperator = childOperator;
+  public AcquireReleaseColumnsSegmentOperator(PlanNode planNode, IndexSegment indexSegment, FetchContext fetchContext) {
+    _planNode = planNode;
     _indexSegment = indexSegment;
     _fetchContext = fetchContext;
   }
@@ -49,12 +50,16 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
    */
   @Override
   protected Block getNextBlock() {
+    _childOperator = _planNode.run();
+    return _childOperator.nextBlock();
+  }
+
+  public void acquire() {
     _indexSegment.acquire(_fetchContext);
-    try {
-      return _childOperator.nextBlock();
-    } finally {
-      _indexSegment.release(_fetchContext);
-    }
+  }
+
+  public void release() {
+    _indexSegment.release(_fetchContext);
   }
 
   @Override
@@ -64,6 +69,6 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
 
   @Override
   public ExecutionStatistics getExecutionStatistics() {
-    return _childOperator.getExecutionStatistics();
+    return _childOperator == null ? new ExecutionStatistics(0, 0, 0, 0) : _childOperator.getExecutionStatistics();
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index eeb27c2..89c6f1f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -146,8 +147,12 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
    */
   protected void processSegments(int taskIndex) {
     for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
+      Operator operator = _operators.get(operatorIndex);
       try {
-        IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+        }
+        IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) operator.nextBlock();
         if (isQuerySatisfied(resultsBlock)) {
           // Query is satisfied, skip processing the remaining segments
           _blockingQueue.offer(resultsBlock);
@@ -164,6 +169,10 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
             e);
         _blockingQueue.offer(new IntermediateResultsBlock(e));
         return;
+      } finally {
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).release();
+        }
       }
     }
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index b2d9373..d6c5dac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -41,6 +41,7 @@ import org.apache.pinot.core.data.table.IntermediateRecord;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
@@ -125,9 +126,12 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
    */
   @Override
   protected void processSegments(int threadIndex) {
+    Operator operator = _operators.get(threadIndex);
     try {
-      IntermediateResultsBlock intermediateResultsBlock =
-          (IntermediateResultsBlock) _operators.get(threadIndex).nextBlock();
+      if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+        ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+      }
+      IntermediateResultsBlock intermediateResultsBlock = (IntermediateResultsBlock) operator.nextBlock();
 
       _initLock.lock();
       try {
@@ -186,9 +190,12 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
       // Early-terminated because query times out or is already satisfied
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing and combining group-by order-by for index: {}, operator: {}, "
-          + "queryContext: {}", threadIndex, _operators.get(threadIndex).getClass().getName(), _queryContext, e);
+          + "queryContext: {}", threadIndex, operator.getClass().getName(), _queryContext, e);
       _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
     } finally {
+      if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+        ((AcquireReleaseColumnsSegmentOperator) operator).release();
+      }
       _operatorLatch.countDown();
     }
   }
@@ -213,8 +220,8 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
     boolean opCompleted = _operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
     if (!opCompleted) {
       // If this happens, the broker side should already timed out, just log the error and return
-      String errorMessage =
-          String.format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs,
+      String errorMessage = String
+          .format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs,
               _queryContext);
       LOGGER.error(errorMessage);
       return new IntermediateResultsBlock(new TimeoutException(errorMessage));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
index 5a9f506..5e517e1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
@@ -41,6 +41,6 @@ public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode {
 
   @Override
   public AcquireReleaseColumnsSegmentOperator run() {
-    return new AcquireReleaseColumnsSegmentOperator(_childPlanNode.run(), _indexSegment, _fetchContext);
+    return new AcquireReleaseColumnsSegmentOperator(_childPlanNode, _indexSegment, _fetchContext);
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org