You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/09/21 03:16:49 UTC
[pinot] 01/01: Acquire-release
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch acquire_release
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 0f3ff1918fafad93149406d4f61f1d21ee959fd8
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Mon Sep 20 20:16:17 2021 -0700
Acquire-release
---
.../AcquireReleaseColumnsSegmentOperator.java | 25 +++++++++++++---------
.../core/operator/combine/BaseCombineOperator.java | 11 +++++++++-
.../combine/GroupByOrderByCombineOperator.java | 17 ++++++++++-----
.../plan/AcquireReleaseColumnsSegmentPlanNode.java | 2 +-
4 files changed, 38 insertions(+), 17 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
index 127f38f..04dc79b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.operator;
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.plan.PlanNode;
import org.apache.pinot.segment.spi.FetchContext;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -31,13 +32,13 @@ import org.apache.pinot.segment.spi.IndexSegment;
public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
private static final String OPERATOR_NAME = "AcquireReleaseColumnsSegmentOperator";
- private final Operator _childOperator;
+ private final PlanNode _planNode;
private final IndexSegment _indexSegment;
private final FetchContext _fetchContext;
+ private Operator _childOperator;
- public AcquireReleaseColumnsSegmentOperator(Operator childOperator, IndexSegment indexSegment,
- FetchContext fetchContext) {
- _childOperator = childOperator;
+ public AcquireReleaseColumnsSegmentOperator(PlanNode planNode, IndexSegment indexSegment, FetchContext fetchContext) {
+ _planNode = planNode;
_indexSegment = indexSegment;
_fetchContext = fetchContext;
}
@@ -49,12 +50,16 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
*/
@Override
protected Block getNextBlock() {
+ _childOperator = _planNode.run();
+ return _childOperator.nextBlock();
+ }
+
+ public void acquire() {
_indexSegment.acquire(_fetchContext);
- try {
- return _childOperator.nextBlock();
- } finally {
- _indexSegment.release(_fetchContext);
- }
+ }
+
+ public void release() {
+ _indexSegment.release(_fetchContext);
}
@Override
@@ -64,6 +69,6 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
@Override
public ExecutionStatistics getExecutionStatistics() {
- return _childOperator.getExecutionStatistics();
+ return _childOperator == null ? new ExecutionStatistics(0, 0, 0, 0) : _childOperator.getExecutionStatistics();
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index eeb27c2..89c6f1f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -146,8 +147,12 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
*/
protected void processSegments(int taskIndex) {
for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
+ Operator operator = _operators.get(operatorIndex);
try {
- IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+ if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+ ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+ }
+ IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) operator.nextBlock();
if (isQuerySatisfied(resultsBlock)) {
// Query is satisfied, skip processing the remaining segments
_blockingQueue.offer(resultsBlock);
@@ -164,6 +169,10 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
e);
_blockingQueue.offer(new IntermediateResultsBlock(e));
return;
+ } finally {
+ if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+ ((AcquireReleaseColumnsSegmentOperator) operator).release();
+ }
}
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index b2d9373..d6c5dac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -41,6 +41,7 @@ import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
@@ -125,9 +126,12 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
*/
@Override
protected void processSegments(int threadIndex) {
+ Operator operator = _operators.get(threadIndex);
try {
- IntermediateResultsBlock intermediateResultsBlock =
- (IntermediateResultsBlock) _operators.get(threadIndex).nextBlock();
+ if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+ ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+ }
+ IntermediateResultsBlock intermediateResultsBlock = (IntermediateResultsBlock) operator.nextBlock();
_initLock.lock();
try {
@@ -186,9 +190,12 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
// Early-terminated because query times out or is already satisfied
} catch (Exception e) {
LOGGER.error("Caught exception while processing and combining group-by order-by for index: {}, operator: {}, "
- + "queryContext: {}", threadIndex, _operators.get(threadIndex).getClass().getName(), _queryContext, e);
+ + "queryContext: {}", threadIndex, operator.getClass().getName(), _queryContext, e);
_mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
} finally {
+ if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+ ((AcquireReleaseColumnsSegmentOperator) operator).release();
+ }
_operatorLatch.countDown();
}
}
@@ -213,8 +220,8 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
boolean opCompleted = _operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
if (!opCompleted) {
// If this happens, the broker side should already timed out, just log the error and return
- String errorMessage =
- String.format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs,
+ String errorMessage = String
+ .format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs,
_queryContext);
LOGGER.error(errorMessage);
return new IntermediateResultsBlock(new TimeoutException(errorMessage));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
index 5a9f506..5e517e1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
@@ -41,6 +41,6 @@ public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode {
@Override
public AcquireReleaseColumnsSegmentOperator run() {
- return new AcquireReleaseColumnsSegmentOperator(_childPlanNode.run(), _indexSegment, _fetchContext);
+ return new AcquireReleaseColumnsSegmentOperator(_childPlanNode, _indexSegment, _fetchContext);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org