You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/09/17 05:29:42 UTC

[pinot] branch master updated: FetchContext to hold configs in acquire/release calls (#7447)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b23a1f3  FetchContext to hold configs in acquire/release calls (#7447)
b23a1f3 is described below

commit b23a1f39e35e807c81886209a058c216e80db453
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Sep 16 22:29:29 2021 -0700

    FetchContext to hold configs in acquire/release calls (#7447)
    
    * FetchContext to hold configs in acquire/release calls
    
    * Review comments
---
 .../AcquireReleaseColumnsSegmentOperator.java      | 13 +++++----
 .../core/operator/InstanceResponseOperator.java    | 33 +++++++++++++++++++--
 .../plan/AcquireReleaseColumnsSegmentPlanNode.java | 11 +++----
 .../pinot/core/plan/InstanceResponsePlanNode.java  | 12 ++++++--
 .../core/plan/maker/InstancePlanMakerImplV2.java   | 33 ++++++++++++---------
 .../immutable/ImmutableSegmentImpl.java            | 13 +++++----
 .../org/apache/pinot/segment/spi/FetchContext.java | 34 ++++++++++++++++------
 .../org/apache/pinot/segment/spi/IndexSegment.java | 20 ++++++-------
 .../segment/spi/store/ColumnIndexDirectory.java    | 13 +++++----
 .../pinot/segment/spi/store/SegmentDirectory.java  | 26 +++++++++--------
 10 files changed, 137 insertions(+), 71 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
index f67b42a..127f38f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pinot.core.operator;
 
-import java.util.Set;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 
 
@@ -33,12 +33,13 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
 
   private final Operator _childOperator;
   private final IndexSegment _indexSegment;
-  private final Set<String> _columns;
+  private final FetchContext _fetchContext;
 
-  public AcquireReleaseColumnsSegmentOperator(Operator childOperator, IndexSegment indexSegment, Set<String> columns) {
+  public AcquireReleaseColumnsSegmentOperator(Operator childOperator, IndexSegment indexSegment,
+      FetchContext fetchContext) {
     _childOperator = childOperator;
     _indexSegment = indexSegment;
-    _columns = columns;
+    _fetchContext = fetchContext;
   }
 
   /**
@@ -48,11 +49,11 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
    */
   @Override
   protected Block getNextBlock() {
-    _indexSegment.acquire(_columns);
+    _indexSegment.acquire(_fetchContext);
     try {
       return _childOperator.nextBlock();
     } finally {
-      _indexSegment.release(_columns);
+      _indexSegment.release(_fetchContext);
     }
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
index 4c5b84b..15be068 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
@@ -18,26 +18,43 @@
  */
 package org.apache.pinot.core.operator;
 
+import java.util.List;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.DataTable.MetadataKey;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.segment.spi.FetchContext;
+import org.apache.pinot.segment.spi.IndexSegment;
 
 
 public class InstanceResponseOperator extends BaseOperator<InstanceResponseBlock> {
   private static final String OPERATOR_NAME = "InstanceResponseOperator";
 
   private final Operator _operator;
+  private final List<IndexSegment> _indexSegments;
+  private final List<FetchContext> _fetchContexts;
+  private final int _fetchContextSize;
 
-  public InstanceResponseOperator(Operator combinedOperator) {
+  public InstanceResponseOperator(Operator combinedOperator, List<IndexSegment> indexSegments,
+      List<FetchContext> fetchContexts) {
     _operator = combinedOperator;
+    _indexSegments = indexSegments;
+    _fetchContexts = fetchContexts;
+    _fetchContextSize = fetchContexts.size();
   }
 
   @Override
   protected InstanceResponseBlock getNextBlock() {
     long startWallClockTimeNs = System.nanoTime();
-    IntermediateResultsBlock intermediateResultsBlock = (IntermediateResultsBlock) _operator.nextBlock();
+
+    IntermediateResultsBlock intermediateResultsBlock;
+    try {
+      prefetchAll();
+      intermediateResultsBlock = (IntermediateResultsBlock) _operator.nextBlock();
+    } finally {
+      releaseAll();
+    }
     InstanceResponseBlock instanceResponseBlock = new InstanceResponseBlock(intermediateResultsBlock);
     DataTable dataTable = instanceResponseBlock.getInstanceResponseDataTable();
     long endWallClockTimeNs = System.nanoTime();
@@ -93,4 +110,16 @@ public class InstanceResponseOperator extends BaseOperator<InstanceResponseBlock
   public String getOperatorName() {
     return OPERATOR_NAME;
   }
+
+  private void prefetchAll() {
+    for (int i = 0; i < _fetchContextSize; i++) {
+      _indexSegments.get(i).prefetch(_fetchContexts.get(i));
+    }
+  }
+
+  private void releaseAll() {
+    for (int i = 0; i < _fetchContextSize; i++) {
+      _indexSegments.get(i).release(_fetchContexts.get(i));
+    }
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
index 9662549..5a9f506 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pinot.core.plan;
 
-import java.util.Set;
 import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
+import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 
 
@@ -30,16 +30,17 @@ public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode {
 
   private final PlanNode _childPlanNode;
   private final IndexSegment _indexSegment;
-  private final Set<String> _columns;
+  private final FetchContext _fetchContext;
 
-  public AcquireReleaseColumnsSegmentPlanNode(PlanNode childPlanNode, IndexSegment indexSegment, Set<String> columns) {
+  public AcquireReleaseColumnsSegmentPlanNode(PlanNode childPlanNode, IndexSegment indexSegment,
+      FetchContext fetchContext) {
     _childPlanNode = childPlanNode;
     _indexSegment = indexSegment;
-    _columns = columns;
+    _fetchContext = fetchContext;
   }
 
   @Override
   public AcquireReleaseColumnsSegmentOperator run() {
-    return new AcquireReleaseColumnsSegmentOperator(_childPlanNode.run(), _indexSegment, _columns);
+    return new AcquireReleaseColumnsSegmentOperator(_childPlanNode.run(), _indexSegment, _fetchContext);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/InstanceResponsePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/InstanceResponsePlanNode.java
index 887031c..c88edef 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/InstanceResponsePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/InstanceResponsePlanNode.java
@@ -18,18 +18,26 @@
  */
 package org.apache.pinot.core.plan;
 
+import java.util.List;
 import org.apache.pinot.core.operator.InstanceResponseOperator;
+import org.apache.pinot.segment.spi.FetchContext;
+import org.apache.pinot.segment.spi.IndexSegment;
 
 
 public class InstanceResponsePlanNode implements PlanNode {
   private final CombinePlanNode _combinePlanNode;
+  private final List<IndexSegment> _indexSegments;
+  private final List<FetchContext> _fetchContexts;
 
-  public InstanceResponsePlanNode(CombinePlanNode combinePlanNode) {
+  public InstanceResponsePlanNode(CombinePlanNode combinePlanNode, List<IndexSegment> indexSegments,
+      List<FetchContext> fetchContexts) {
     _combinePlanNode = combinePlanNode;
+    _indexSegments = indexSegments;
+    _fetchContexts = fetchContexts;
   }
 
   @Override
   public InstanceResponseOperator run() {
-    return new InstanceResponseOperator(_combinePlanNode.run());
+    return new InstanceResponseOperator(_combinePlanNode.run(), _indexSegments, _fetchContexts);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 1d48cc1..39fc9da 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -22,9 +22,11 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import io.grpc.stub.StreamObserver;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.request.context.ExpressionContext;
@@ -53,6 +55,7 @@ import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
 import org.apache.pinot.core.util.GroupByUtils;
 import org.apache.pinot.core.util.QueryOptions;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -82,8 +85,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   // set as pinot.server.query.executor.groupby.trim.threshold
   public static final String GROUPBY_TRIM_THRESHOLD_KEY = "groupby.trim.threshold";
   public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
-  public static final String ENABLE_BUFFER_ACQUIRE_RELEASE = "enable.buffer.acquire.release";
-  public static final boolean DEFAULT_ENABLE_BUFFER_ACQUIRE_AND_RELEASE = false;
+  public static final String ENABLE_PREFETCH = "enable.prefetch";
+  public static final boolean DEFAULT_ENABLE_PREFETCH = false;
 
   private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class);
   private final int _maxInitialResultHolderCapacity;
@@ -93,7 +96,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   private final int _minSegmentGroupTrimSize;
   private final int _minServerGroupTrimSize;
   private final int _groupByTrimThreshold;
-  private final boolean _enableBufferAcquireRelease;
+  private final boolean _enablePrefetch;
 
   @VisibleForTesting
   public InstancePlanMakerImplV2() {
@@ -102,7 +105,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
     _minSegmentGroupTrimSize = DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE;
     _minServerGroupTrimSize = DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE;
     _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD;
-    _enableBufferAcquireRelease = DEFAULT_ENABLE_BUFFER_ACQUIRE_AND_RELEASE;
+    _enablePrefetch = DEFAULT_ENABLE_PREFETCH;
   }
 
   @VisibleForTesting
@@ -113,7 +116,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
     _minSegmentGroupTrimSize = minSegmentGroupTrimSize;
     _minServerGroupTrimSize = minServerGroupTrimSize;
     _groupByTrimThreshold = groupByTrimThreshold;
-    _enableBufferAcquireRelease = DEFAULT_ENABLE_BUFFER_ACQUIRE_AND_RELEASE;
+    _enablePrefetch = DEFAULT_ENABLE_PREFETCH;
   }
 
   /**
@@ -139,19 +142,21 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
     Preconditions
         .checkState(_groupByTrimThreshold > 0, "Invalid configurable: groupByTrimThreshold: %d must be positive",
             _groupByTrimThreshold);
-    _enableBufferAcquireRelease = Boolean.parseBoolean(config.getProperty(ENABLE_BUFFER_ACQUIRE_RELEASE));
+    _enablePrefetch = Boolean.parseBoolean(config.getProperty(ENABLE_PREFETCH));
     LOGGER.info("Initializing plan maker with maxInitialResultHolderCapacity: {}, numGroupsLimit: {}, "
-            + "minSegmentGroupTrimSize: {}, minServerGroupTrimSize: {}, enableBufferAcquireRelease: {}",
+            + "minSegmentGroupTrimSize: {}, minServerGroupTrimSize: {}, enablePrefetch: {}",
         _maxInitialResultHolderCapacity, _numGroupsLimit, _minSegmentGroupTrimSize, _minServerGroupTrimSize,
-        _enableBufferAcquireRelease);
+        _enablePrefetch);
   }
 
   @Override
   public Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext queryContext,
       ExecutorService executorService, long endTimeMs) {
     List<PlanNode> planNodes = new ArrayList<>(indexSegments.size());
+    List<FetchContext> fetchContexts;
 
-    if (_enableBufferAcquireRelease) {
+    if (_enablePrefetch) {
+      fetchContexts = new ArrayList<>(indexSegments.size());
       List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions();
       for (IndexSegment indexSegment : indexSegments) {
         Set<String> columns;
@@ -160,12 +165,14 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
         } else {
           columns = queryContext.getColumns();
         }
-        indexSegment.prefetch(columns);
+        FetchContext fetchContext = new FetchContext(UUID.randomUUID(), columns);
+        fetchContexts.add(fetchContext);
         planNodes.add(
             new AcquireReleaseColumnsSegmentPlanNode(makeSegmentPlanNode(indexSegment, queryContext), indexSegment,
-                columns));
+                fetchContext));
       }
     } else {
+      fetchContexts = Collections.emptyList();
       for (IndexSegment indexSegment : indexSegments) {
         planNodes.add(makeSegmentPlanNode(indexSegment, queryContext));
       }
@@ -174,7 +181,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
     CombinePlanNode combinePlanNode =
         new CombinePlanNode(planNodes, queryContext, executorService, endTimeMs, _numGroupsLimit,
             _minServerGroupTrimSize, _groupByTrimThreshold, null);
-    return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
+    return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode, indexSegments, fetchContexts));
   }
 
   @Override
@@ -223,7 +230,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
     CombinePlanNode combinePlanNode =
         new CombinePlanNode(planNodes, queryContext, executorService, endTimeMs, _numGroupsLimit,
             _minServerGroupTrimSize, _groupByTrimThreshold, streamObserver);
-    return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
+    return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode, indexSegments, Collections.emptyList()));
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index 33fe26f..c0972ff 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -29,6 +29,7 @@ import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexContainer;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
@@ -128,18 +129,18 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
   }
 
   @Override
-  public void prefetch(Set<String> columns) {
-    _segmentDirectory.prefetch(columns);
+  public void prefetch(FetchContext fetchContext) {
+    _segmentDirectory.prefetch(fetchContext);
   }
 
   @Override
-  public void acquire(Set<String> columns) {
-    _segmentDirectory.acquire(columns);
+  public void acquire(FetchContext fetchContext) {
+    _segmentDirectory.acquire(fetchContext);
   }
 
   @Override
-  public void release(Set<String> columns) {
-    _segmentDirectory.release(columns);
+  public void release(FetchContext fetchContext) {
+    _segmentDirectory.release(fetchContext);
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/InstanceResponsePlanNode.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/FetchContext.java
similarity index 57%
copy from pinot-core/src/main/java/org/apache/pinot/core/plan/InstanceResponsePlanNode.java
copy to pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/FetchContext.java
index 887031c..03e0c68 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/InstanceResponsePlanNode.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/FetchContext.java
@@ -16,20 +16,36 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.plan;
+package org.apache.pinot.segment.spi;
 
-import org.apache.pinot.core.operator.InstanceResponseOperator;
+import java.util.Set;
+import java.util.UUID;
 
 
-public class InstanceResponsePlanNode implements PlanNode {
-  private final CombinePlanNode _combinePlanNode;
+/**
+ * The context for fetching buffers of a segment during query.
+ */
+public class FetchContext {
+  private final UUID _fetchId;
+  private final Set<String> _columns;
+
+  public FetchContext(UUID fetchId, Set<String> columns) {
+    _fetchId = fetchId;
+    _columns = columns;
+  }
 
-  public InstanceResponsePlanNode(CombinePlanNode combinePlanNode) {
-    _combinePlanNode = combinePlanNode;
+  /**
+   * An id to uniquely identify the fetch request
+   * @return unique uuid
+   */
+  public UUID getFetchId() {
+    return _fetchId;
   }
 
-  @Override
-  public InstanceResponseOperator run() {
-    return new InstanceResponseOperator(_combinePlanNode.run());
+  /**
+   * Columns to be fetched as part of this request
+   */
+  public Set<String> getColumns() {
+    return _columns;
   }
 }
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
index 5085c10..1f64f2e 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
@@ -88,27 +88,27 @@ public interface IndexSegment {
 
   /**
    * Hints the segment to begin prefetching buffers for specified columns.
-   * Typically, this should be an async call made in the planning phase,
-   * in preparation for reading the data in the execution phase
-   * @param columns columns to prefetch
+   * Typically, this should be an async call made before operating on the segment.
+   * @param fetchContext context for this segment's fetch
    */
-  default void prefetch(Set<String> columns) {
+  default void prefetch(FetchContext fetchContext) {
   }
 
   /**
    * Instructs the segment to fetch buffers for specified columns.
-   * Typically, this should be a blocking call made before the data is read
-   * @param columns columns to acquire
+   * When enabled, this should be a blocking call made before operating on the segment.
+   * @param fetchContext context for this segment's fetch
    */
-  default void acquire(Set<String> columns) {
+  default void acquire(FetchContext fetchContext) {
   }
 
   /**
    * Instructs the segment to release buffers for specified columns.
-   * Typically, this should be a call made after the data is read
-   * @param columns columns to release
+   * When enabled, this should be a call made after operating on the segment.
+   * It is possible that this called multiple times.
+   * @param fetchContext context for this segment's fetch
    */
-  default void release(Set<String> columns) {
+  default void release(FetchContext fetchContext) {
   }
 
   /**
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java
index ae6c127..ca343db 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Set;
+import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 
@@ -87,20 +88,20 @@ public abstract class ColumnIndexDirectory implements Closeable {
   public abstract Set<String> getColumnsWithIndex(ColumnIndexType type);
 
   /**
-   * Hint to prefetch the buffer for this column
+   * A hint to prefetch the buffers for columns in the context, in preparation for operating on the segment.
    */
-  public void prefetchBuffer(String columns) {
+  public void prefetchBuffer(FetchContext fetchContext) {
   }
 
   /**
-   * Fetch the buffer for this column
+   * An instruction to fetch the buffers for columns in the context, in order to operate on the segment.
    */
-  public void acquireBuffer(String column) {
+  public void acquireBuffer(FetchContext fetchContext) {
   }
 
   /**
-   * Release the buffer for this column
+   * An instruction to release the fetched buffers for columns in this context, after operating on this segment.
    */
-  public void releaseBuffer(String column) {
+  public void releaseBuffer(FetchContext fetchContext) {
   }
 }
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
index e10d864..60de836 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
@@ -24,6 +24,7 @@ import java.net.URI;
 import java.nio.file.Path;
 import java.util.Set;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 
@@ -115,27 +116,28 @@ public abstract class SegmentDirectory implements Closeable {
   public abstract Set<String> getColumnsWithIndex(ColumnIndexType type);
 
   /**
-   * This is a hint to the segment directory, to begin prefetching buffers for specified columns.
-   * Typically, this should be an async call hooked up from the planning phase,
-   * in preparation for reading data in execution phase
-   * @param columns columns to prefetch
+   * This is a hint to the segment directory, to begin prefetching buffers for given context.
+   * Typically, this should be an async call made before operating on the segment.
+   * @param fetchContext context for this segment's fetch
    */
-  public void prefetch(Set<String> columns) {
+  public void prefetch(FetchContext fetchContext) {
   }
 
   /**
-   * This is an instruction to the segment directory, to fetch buffers for specified column.
-   * Typically this should be a blocking call made before the data is read
-   * @param columns columns to acquire
+   * This is an instruction to the segment directory, to fetch buffers for the given context.
+   * When enabled, this should be a blocking call made before operating on the segment.
+   * @param fetchContext context for this segment's fetch
    */
-  public void acquire(Set<String> columns) {
+  public void acquire(FetchContext fetchContext) {
   }
 
   /**
-   * This is an instruction to the segment directory to release the fetched buffers for the specified column.
-   * @param columns columns to release
+   * This is an instruction to the segment directory to release the fetched buffers for given context.
+   * When enabled, this should be a call made after operating on the segment.
+   * It is possible that this called multiple times.
+   * @param fetchContext context for this segment's fetch
    */
-  public void release(Set<String> columns) {
+  public void release(FetchContext fetchContext) {
   }
 
   /**

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org