You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/09/21 03:16:48 UTC

[pinot] branch acquire_release created (now 0f3ff19)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a change to branch acquire_release
in repository https://gitbox.apache.org/repos/asf/pinot.git.


      at 0f3ff19  Acquire-release

This branch includes the following new commits:

     new 0f3ff19  Acquire-release

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[pinot] 01/01: Acquire-release

Posted by ne...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch acquire_release
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 0f3ff1918fafad93149406d4f61f1d21ee959fd8
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Mon Sep 20 20:16:17 2021 -0700

    Acquire-release
---
 .../AcquireReleaseColumnsSegmentOperator.java      | 25 +++++++++++++---------
 .../core/operator/combine/BaseCombineOperator.java | 11 +++++++++-
 .../combine/GroupByOrderByCombineOperator.java     | 17 ++++++++++-----
 .../plan/AcquireReleaseColumnsSegmentPlanNode.java |  2 +-
 4 files changed, 38 insertions(+), 17 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
index 127f38f..04dc79b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.operator;
 
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.plan.PlanNode;
 import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 
@@ -31,13 +32,13 @@ import org.apache.pinot.segment.spi.IndexSegment;
 public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
   private static final String OPERATOR_NAME = "AcquireReleaseColumnsSegmentOperator";
 
-  private final Operator _childOperator;
+  private final PlanNode _planNode;
   private final IndexSegment _indexSegment;
   private final FetchContext _fetchContext;
+  private Operator _childOperator;
 
-  public AcquireReleaseColumnsSegmentOperator(Operator childOperator, IndexSegment indexSegment,
-      FetchContext fetchContext) {
-    _childOperator = childOperator;
+  public AcquireReleaseColumnsSegmentOperator(PlanNode planNode, IndexSegment indexSegment, FetchContext fetchContext) {
+    _planNode = planNode;
     _indexSegment = indexSegment;
     _fetchContext = fetchContext;
   }
@@ -49,12 +50,16 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
    */
   @Override
   protected Block getNextBlock() {
+    _childOperator = _planNode.run();
+    return _childOperator.nextBlock();
+  }
+
+  public void acquire() {
     _indexSegment.acquire(_fetchContext);
-    try {
-      return _childOperator.nextBlock();
-    } finally {
-      _indexSegment.release(_fetchContext);
-    }
+  }
+
+  public void release() {
+    _indexSegment.release(_fetchContext);
   }
 
   @Override
@@ -64,6 +69,6 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
 
   @Override
   public ExecutionStatistics getExecutionStatistics() {
-    return _childOperator.getExecutionStatistics();
+    return _childOperator == null ? new ExecutionStatistics(0, 0, 0, 0) : _childOperator.getExecutionStatistics();
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index eeb27c2..89c6f1f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -146,8 +147,12 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
    */
   protected void processSegments(int taskIndex) {
     for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
+      Operator operator = _operators.get(operatorIndex);
       try {
-        IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+        }
+        IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) operator.nextBlock();
         if (isQuerySatisfied(resultsBlock)) {
           // Query is satisfied, skip processing the remaining segments
           _blockingQueue.offer(resultsBlock);
@@ -164,6 +169,10 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
             e);
         _blockingQueue.offer(new IntermediateResultsBlock(e));
         return;
+      } finally {
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).release();
+        }
       }
     }
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index b2d9373..d6c5dac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -41,6 +41,7 @@ import org.apache.pinot.core.data.table.IntermediateRecord;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
@@ -125,9 +126,12 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
    */
   @Override
   protected void processSegments(int threadIndex) {
+    Operator operator = _operators.get(threadIndex);
     try {
-      IntermediateResultsBlock intermediateResultsBlock =
-          (IntermediateResultsBlock) _operators.get(threadIndex).nextBlock();
+      if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+        ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+      }
+      IntermediateResultsBlock intermediateResultsBlock = (IntermediateResultsBlock) operator.nextBlock();
 
       _initLock.lock();
       try {
@@ -186,9 +190,12 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
       // Early-terminated because query times out or is already satisfied
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing and combining group-by order-by for index: {}, operator: {}, "
-          + "queryContext: {}", threadIndex, _operators.get(threadIndex).getClass().getName(), _queryContext, e);
+          + "queryContext: {}", threadIndex, operator.getClass().getName(), _queryContext, e);
       _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
     } finally {
+      if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+        ((AcquireReleaseColumnsSegmentOperator) operator).release();
+      }
       _operatorLatch.countDown();
     }
   }
@@ -213,8 +220,8 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
     boolean opCompleted = _operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
     if (!opCompleted) {
       // If this happens, the broker side should already timed out, just log the error and return
-      String errorMessage =
-          String.format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs,
+      String errorMessage = String
+          .format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs,
               _queryContext);
       LOGGER.error(errorMessage);
       return new IntermediateResultsBlock(new TimeoutException(errorMessage));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
index 5a9f506..5e517e1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
@@ -41,6 +41,6 @@ public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode {
 
   @Override
   public AcquireReleaseColumnsSegmentOperator run() {
-    return new AcquireReleaseColumnsSegmentOperator(_childPlanNode.run(), _indexSegment, _fetchContext);
+    return new AcquireReleaseColumnsSegmentOperator(_childPlanNode, _indexSegment, _fetchContext);
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org