You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/09/29 19:57:22 UTC

[pinot] branch master updated: Move acquire/release to right before/after execution call (#7493)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d638444  Move acquire/release to right before/after execution call (#7493)
d638444 is described below

commit d638444e7254bdc881a299f37ac0909a5664f0ca
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Wed Sep 29 12:56:48 2021 -0700

    Move acquire/release to right before/after execution call (#7493)
---
 .../AcquireReleaseColumnsSegmentOperator.java      |  43 ++--
 .../core/operator/combine/BaseCombineOperator.java |  14 +-
 .../combine/GroupByOrderByCombineOperator.java     |  96 +++++----
 .../plan/AcquireReleaseColumnsSegmentPlanNode.java |  13 +-
 .../core/plan/maker/InstancePlanMakerImplV2.java   |   2 +-
 .../pinot/perf/BenchmarkRoaringBitmapCreation.java | 217 +++++++++++++++++++++
 .../index/readers/BitmapInvertedIndexReader.java   |  29 ---
 .../index/readers/NullValueVectorReaderImpl.java   |   8 +-
 .../index/readers/RangeIndexReaderImpl.java        |  36 ----
 .../org/apache/pinot/segment/spi/FetchContext.java |  11 +-
 10 files changed, 339 insertions(+), 130 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
index 127f38f..422374f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
@@ -20,41 +20,56 @@ package org.apache.pinot.core.operator;
 
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.plan.PlanNode;
 import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 
 
 /**
  * A common wrapper around the segment-level operator.
- * Provides an opportunity to acquire and release column buffers before reading data
+ * NOTE: This is only used if <code>pinot.server.query.executor.enable.prefetch</code> is true
+ * This creates a mechanism to acquire and release column buffers before reading data.
+ * This Operator is different from others in the following way:
+ * It expects the PlanNode of the execution, instead of the Operator.
+ * It runs the plan to get the operator, before it begins execution.
+ * The reason this is done is the planners access segment buffers,
+ * and we need to acquire the segment before any access is made to the buffers.
  */
 public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
   private static final String OPERATOR_NAME = "AcquireReleaseColumnsSegmentOperator";
 
-  private final Operator _childOperator;
+  private final PlanNode _planNode;
   private final IndexSegment _indexSegment;
   private final FetchContext _fetchContext;
+  private Operator _childOperator;
 
-  public AcquireReleaseColumnsSegmentOperator(Operator childOperator, IndexSegment indexSegment,
-      FetchContext fetchContext) {
-    _childOperator = childOperator;
+  public AcquireReleaseColumnsSegmentOperator(PlanNode planNode, IndexSegment indexSegment, FetchContext fetchContext) {
+    _planNode = planNode;
     _indexSegment = indexSegment;
     _fetchContext = fetchContext;
   }
 
   /**
-   * Makes a call to acquire column buffers from {@link IndexSegment} before getting nextBlock from childOperator,
-   * and
-   * a call to release the column buffers from {@link IndexSegment} after.
+   * Runs the planNode to get the childOperator, and then proceeds with execution.
    */
   @Override
   protected Block getNextBlock() {
+    _childOperator = _planNode.run();
+    return _childOperator.nextBlock();
+  }
+
+  /**
+   * Acquires the indexSegment using the provided fetchContext
+   */
+  public void acquire() {
     _indexSegment.acquire(_fetchContext);
-    try {
-      return _childOperator.nextBlock();
-    } finally {
-      _indexSegment.release(_fetchContext);
-    }
+  }
+
+  /**
+   * Releases the indexSegment using the provided fetchContext
+   */
+  public void release() {
+    _indexSegment.release(_fetchContext);
   }
 
   @Override
@@ -64,6 +79,6 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
 
   @Override
   public ExecutionStatistics getExecutionStatistics() {
-    return _childOperator.getExecutionStatistics();
+    return _childOperator == null ? new ExecutionStatistics(0, 0, 0, 0) : _childOperator.getExecutionStatistics();
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 4d93c02..eb77329 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -151,7 +152,18 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
    */
   protected void processSegments(int taskIndex) {
     for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
-      IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+      Operator operator = _operators.get(operatorIndex);
+      IntermediateResultsBlock resultsBlock;
+      try {
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+        }
+        resultsBlock = (IntermediateResultsBlock) operator.nextBlock();
+      } finally {
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).release();
+        }
+      }
       if (isQuerySatisfied(resultsBlock)) {
         // Query is satisfied, skip processing the remaining segments
         _blockingQueue.offer(resultsBlock);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index 2ece042..e4203bc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -39,6 +39,7 @@ import org.apache.pinot.core.data.table.IntermediateRecord;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
@@ -121,58 +122,67 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
   @Override
   protected void processSegments(int taskIndex) {
     for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
-      IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
-
-      if (_indexedTable == null) {
-        synchronized (this) {
-          if (_indexedTable == null) {
-            DataSchema dataSchema = resultsBlock.getDataSchema();
-            // NOTE: Use trimSize as resultSize on server size.
-            if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
-              // special case of trim threshold where it is set to max value.
-              // there won't be any trimming during upsert in this case.
-              // thus we can avoid the overhead of read-lock and write-lock
-              // in the upsert method.
-              _indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
-            } else {
-              _indexedTable =
-                  new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
+      Operator operator = _operators.get(operatorIndex);
+      try {
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+        }
+        IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) operator.nextBlock();
+        if (_indexedTable == null) {
+          synchronized (this) {
+            if (_indexedTable == null) {
+              DataSchema dataSchema = resultsBlock.getDataSchema();
+              // NOTE: Use trimSize as resultSize on server size.
+              if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
+                // special case of trim threshold where it is set to max value.
+                // there won't be any trimming during upsert in this case.
+                // thus we can avoid the overhead of read-lock and write-lock
+                // in the upsert method.
+                _indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
+              } else {
+                _indexedTable =
+                    new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
+              }
             }
           }
         }
-      }
 
-      // Merge processing exceptions.
-      List<ProcessingException> processingExceptionsToMerge = resultsBlock.getProcessingExceptions();
-      if (processingExceptionsToMerge != null) {
-        _mergedProcessingExceptions.addAll(processingExceptionsToMerge);
-      }
+        // Merge processing exceptions.
+        List<ProcessingException> processingExceptionsToMerge = resultsBlock.getProcessingExceptions();
+        if (processingExceptionsToMerge != null) {
+          _mergedProcessingExceptions.addAll(processingExceptionsToMerge);
+        }
 
-      // Merge aggregation group-by result.
-      // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable
-      Collection<IntermediateRecord> intermediateRecords = resultsBlock.getIntermediateRecords();
-      // For now, only GroupBy OrderBy query has pre-constructed intermediate records
-      if (intermediateRecords == null) {
         // Merge aggregation group-by result.
-        AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
-        if (aggregationGroupByResult != null) {
-          // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable
-          Iterator<GroupKeyGenerator.GroupKey> dicGroupKeyIterator = aggregationGroupByResult.getGroupKeyIterator();
-          while (dicGroupKeyIterator.hasNext()) {
-            GroupKeyGenerator.GroupKey groupKey = dicGroupKeyIterator.next();
-            Object[] keys = groupKey._keys;
-            Object[] values = Arrays.copyOf(keys, _numColumns);
-            int groupId = groupKey._groupId;
-            for (int i = 0; i < _numAggregationFunctions; i++) {
-              values[_numGroupByExpressions + i] = aggregationGroupByResult.getResultForGroupId(i, groupId);
+        // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable
+        Collection<IntermediateRecord> intermediateRecords = resultsBlock.getIntermediateRecords();
+        // For now, only GroupBy OrderBy query has pre-constructed intermediate records
+        if (intermediateRecords == null) {
+          // Merge aggregation group-by result.
+          AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
+          if (aggregationGroupByResult != null) {
+            // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable
+            Iterator<GroupKeyGenerator.GroupKey> dicGroupKeyIterator = aggregationGroupByResult.getGroupKeyIterator();
+            while (dicGroupKeyIterator.hasNext()) {
+              GroupKeyGenerator.GroupKey groupKey = dicGroupKeyIterator.next();
+              Object[] keys = groupKey._keys;
+              Object[] values = Arrays.copyOf(keys, _numColumns);
+              int groupId = groupKey._groupId;
+              for (int i = 0; i < _numAggregationFunctions; i++) {
+                values[_numGroupByExpressions + i] = aggregationGroupByResult.getResultForGroupId(i, groupId);
+              }
+              _indexedTable.upsert(new Key(keys), new Record(values));
             }
-            _indexedTable.upsert(new Key(keys), new Record(values));
+          }
+        } else {
+          for (IntermediateRecord intermediateResult : intermediateRecords) {
+            //TODO: change upsert api so that it accepts intermediateRecord directly
+            _indexedTable.upsert(intermediateResult._key, intermediateResult._record);
           }
         }
-      } else {
-        for (IntermediateRecord intermediateResult : intermediateRecords) {
-          //TODO: change upsert api so that it accepts intermediateRecord directly
-          _indexedTable.upsert(intermediateResult._key, intermediateResult._record);
+      } finally {
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).release();
         }
       }
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
index 5a9f506..73406c2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
@@ -25,6 +25,13 @@ import org.apache.pinot.segment.spi.IndexSegment;
 
 /**
  * A common wrapper for the segment-level plan node.
+ * NOTE: This is only used if <code>pinot.server.query.executor.enable.prefetch</code> is true
+ * This PlanNode differs from the other PlanNodes in the following way:
+ * This PlanNode does not invoke a <code>run</code> on the childOperator in its run method.
+ * Instead, it passes the childPlanNode as is, to the {@link AcquireReleaseColumnsSegmentOperator},
+ * and it is that operator's responsibility to run the childPlanNode and get the childOperator before execution.
+ * The reason this is done is the planners access segment buffers,
+ * and we need to acquire the segment before any access is made to the buffers.
  */
 public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode {
 
@@ -39,8 +46,12 @@ public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode {
     _fetchContext = fetchContext;
   }
 
+  /**
+   * Doesn't run the childPlan,
+   * but instead just creates a {@link AcquireReleaseColumnsSegmentOperator} and passes the plan to it
+   */
   @Override
   public AcquireReleaseColumnsSegmentOperator run() {
-    return new AcquireReleaseColumnsSegmentOperator(_childPlanNode.run(), _indexSegment, _fetchContext);
+    return new AcquireReleaseColumnsSegmentOperator(_childPlanNode, _indexSegment, _fetchContext);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index ea54327..f5567a4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -175,7 +175,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
         } else {
           columns = queryContext.getColumns();
         }
-        FetchContext fetchContext = new FetchContext(UUID.randomUUID(), columns);
+        FetchContext fetchContext = new FetchContext(UUID.randomUUID(), indexSegment.getSegmentName(), columns);
         fetchContexts.add(fetchContext);
         planNodes.add(
             new AcquireReleaseColumnsSegmentPlanNode(makeSegmentPlanNode(indexSegment, queryContext), indexSegment,
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java
new file mode 100644
index 0000000..2d85652
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.nio.ByteOrder;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.BitmapInvertedIndexWriter;
+import org.apache.pinot.segment.spi.memory.PinotByteBuffer;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.profile.GCProfiler;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.TimeValue;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+
+/**
+ * Benchmark created to test the impact of removing the SoftReference array cache for ImmutableRoaringBitmap
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+public class BenchmarkRoaringBitmapCreation {
+
+  private static final int NUM_DOCS = 1_000_000;
+  private static final int CARDINALITY = 100_000;
+  private static final File TEMP_DIR =
+      new File(FileUtils.getTempDirectory(), "bitmap_creation_benchmark_" + System.currentTimeMillis());
+
+  @Param({"100", "10000", "99999"}) // higher this is, lesser the cache access
+  public int _dictIdsToRead;
+
+  private int _numBitmaps;
+  private BitmapInvertedIndexWriter _bitmapInvertedIndexWriter;
+  private SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmapsArrayReference = null;
+  private SoftReference<SoftReference<Pair<Integer, Integer>>[]> _offsetLengthPairsArrayReference = null;
+  private PinotDataBuffer _offsetLengthBuffer;
+  private PinotDataBuffer _bitmapBuffer;
+  private int _firstOffset;
+
+  @Setup
+  public void setup()
+      throws IllegalAccessException, InstantiationException, IOException {
+    _numBitmaps = CARDINALITY;
+
+    File bufferDir = new File(TEMP_DIR, "cardinality_" + CARDINALITY);
+    FileUtils.forceMkdir(bufferDir);
+    File bufferFile = new File(bufferDir, "buffer");
+    _bitmapInvertedIndexWriter = new BitmapInvertedIndexWriter(bufferFile, _numBitmaps);
+    // Insert between 10-1000 values per bitmap
+    for (int i = 0; i < _numBitmaps; i++) {
+      int size = 10 + RandomUtils.nextInt(990);
+      int[] data = new int[size];
+      for (int j = 0; j < size; j++) {
+        data[j] = RandomUtils
+            .nextInt(NUM_DOCS); // docIds will repeat across bitmaps, but doesn't matter for purpose of this benchmark
+      }
+      RoaringBitmap bitmap = RoaringBitmap.bitmapOf(data);
+      _bitmapInvertedIndexWriter.add(bitmap);
+    }
+
+    PinotDataBuffer dataBuffer = PinotByteBuffer.mapReadOnlyBigEndianFile(bufferFile);
+    long offsetBufferEndOffset = (long) (_numBitmaps + 1) * Integer.BYTES;
+    _offsetLengthBuffer = dataBuffer.view(0, offsetBufferEndOffset, ByteOrder.BIG_ENDIAN);
+    _bitmapBuffer = dataBuffer.view(offsetBufferEndOffset, dataBuffer.size());
+    _firstOffset = _offsetLengthBuffer.getInt(0);
+  }
+
+  @TearDown
+  public void teardown()
+      throws IOException {
+    _bitmapInvertedIndexWriter.close();
+    FileUtils.deleteQuietly(TEMP_DIR);
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public boolean cacheReferences() {
+    int dictId = RandomUtils.nextInt(_dictIdsToRead);
+    ImmutableRoaringBitmap roaringBitmapFromCache = getRoaringBitmapFromCache(dictId);
+    return roaringBitmapFromCache.isEmpty();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public boolean alwaysBuild() {
+    int dictId = RandomUtils.nextInt(_dictIdsToRead);
+    ImmutableRoaringBitmap immutableRoaringBitmap = buildRoaringBitmap(dictId);
+    return immutableRoaringBitmap.isEmpty();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public boolean alwaysBuildCachedOffsetAndLength() {
+    int dictId = RandomUtils.nextInt(_dictIdsToRead);
+    ImmutableRoaringBitmap immutableRoaringBitmap = buildRoaringBitmapUsingOffsetPairFromCache(dictId);
+    return immutableRoaringBitmap.isEmpty();
+  }
+
+  /**
+   * Code as of before this commit, using an array of SoftReference for the ImmutableRoaringBitmap
+   */
+  private ImmutableRoaringBitmap getRoaringBitmapFromCache(int dictId) {
+    SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference =
+        (_bitmapsArrayReference != null) ? _bitmapsArrayReference.get() : null;
+    if (bitmapArrayReference != null) {
+      SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId];
+      ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null;
+      if (bitmap != null) {
+        return bitmap;
+      }
+    } else {
+      bitmapArrayReference = new SoftReference[_numBitmaps];
+      _bitmapsArrayReference = new SoftReference<>(bitmapArrayReference);
+    }
+    synchronized (this) {
+      SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId];
+      ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null;
+      if (bitmap == null) {
+        bitmap = buildRoaringBitmap(dictId);
+        bitmapArrayReference[dictId] = new SoftReference<>(bitmap);
+      }
+      return bitmap;
+    }
+  }
+
+  private ImmutableRoaringBitmap buildRoaringBitmap(int dictId) {
+    Pair<Integer, Integer> offsetLengthPair = buildOffsetLengthPair(dictId);
+    return buildRoaringBitmap(offsetLengthPair);
+  }
+
+  private Pair<Integer, Integer> buildOffsetLengthPair(int dictId) {
+    int offset = _offsetLengthBuffer.getInt(dictId * Integer.BYTES);
+    int length = _offsetLengthBuffer.getInt((dictId + 1) * Integer.BYTES) - offset;
+    return Pair.of(offset, length);
+  }
+
+  private ImmutableRoaringBitmap buildRoaringBitmap(Pair<Integer, Integer> offsetLengthPair) {
+    return new ImmutableRoaringBitmap(
+        _bitmapBuffer.toDirectByteBuffer(offsetLengthPair.getLeft() - _firstOffset, offsetLengthPair.getRight()));
+  }
+
+  private ImmutableRoaringBitmap buildRoaringBitmapUsingOffsetPairFromCache(int dictId) {
+    return buildRoaringBitmap(getOffsetLengthPairFromCache(dictId));
+  }
+
+  private Pair<Integer, Integer> getOffsetLengthPairFromCache(int dictId) {
+
+    SoftReference<Pair<Integer, Integer>>[] offsetLengthPairArrayReference =
+        (_offsetLengthPairsArrayReference != null) ? _offsetLengthPairsArrayReference.get() : null;
+    if (offsetLengthPairArrayReference != null) {
+      SoftReference<Pair<Integer, Integer>> offsetLengthPairReference = offsetLengthPairArrayReference[dictId];
+      Pair<Integer, Integer> offsetLengthPair =
+          (offsetLengthPairReference != null) ? offsetLengthPairReference.get() : null;
+      if (offsetLengthPair != null) {
+        return offsetLengthPair;
+      }
+    } else {
+      offsetLengthPairArrayReference = new SoftReference[_numBitmaps];
+      _offsetLengthPairsArrayReference = new SoftReference<>(offsetLengthPairArrayReference);
+    }
+    synchronized (this) {
+      SoftReference<Pair<Integer, Integer>> offsetLengthPairReference = offsetLengthPairArrayReference[dictId];
+      Pair<Integer, Integer> offsetLengthPair =
+          (offsetLengthPairReference != null) ? offsetLengthPairReference.get() : null;
+      if (offsetLengthPair == null) {
+        offsetLengthPair = buildOffsetLengthPair(dictId);
+        offsetLengthPairArrayReference[dictId] = new SoftReference<>(offsetLengthPair);
+      }
+      return offsetLengthPair;
+    }
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    ChainedOptionsBuilder opt = new OptionsBuilder().include(BenchmarkRoaringBitmapCreation.class.getSimpleName())
+        .warmupTime(TimeValue.seconds(10)).warmupIterations(1).measurementTime(TimeValue.seconds(60))
+        .measurementIterations(1).forks(1).addProfiler(GCProfiler.class);
+    new Runner(opt.build()).run();
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BitmapInvertedIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BitmapInvertedIndexReader.java
index 491fe67..48905a7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BitmapInvertedIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BitmapInvertedIndexReader.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
-import java.lang.ref.SoftReference;
 import java.nio.ByteOrder;
 import org.apache.pinot.segment.local.segment.creator.impl.inv.BitmapInvertedIndexWriter;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
@@ -35,7 +34,6 @@ import org.slf4j.LoggerFactory;
 public class BitmapInvertedIndexReader implements InvertedIndexReader<ImmutableRoaringBitmap> {
   public static final Logger LOGGER = LoggerFactory.getLogger(BitmapInvertedIndexReader.class);
 
-  private final int _numBitmaps;
   private final PinotDataBuffer _offsetBuffer;
   private final PinotDataBuffer _bitmapBuffer;
 
@@ -44,11 +42,7 @@ public class BitmapInvertedIndexReader implements InvertedIndexReader<ImmutableR
   //   2. Offset buffer stores the offsets within the bitmap buffer
   private final int _firstOffset;
 
-  private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps;
-
   public BitmapInvertedIndexReader(PinotDataBuffer dataBuffer, int numBitmaps) {
-    _numBitmaps = numBitmaps;
-
     long offsetBufferEndOffset = (long) (numBitmaps + 1) * Integer.BYTES;
     _offsetBuffer = dataBuffer.view(0, offsetBufferEndOffset, ByteOrder.BIG_ENDIAN);
     _bitmapBuffer = dataBuffer.view(offsetBufferEndOffset, dataBuffer.size());
@@ -59,29 +53,6 @@ public class BitmapInvertedIndexReader implements InvertedIndexReader<ImmutableR
   @SuppressWarnings("unchecked")
   @Override
   public ImmutableRoaringBitmap getDocIds(int dictId) {
-    SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = (_bitmaps != null) ? _bitmaps.get() : null;
-    if (bitmapArrayReference != null) {
-      SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId];
-      ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null;
-      if (bitmap != null) {
-        return bitmap;
-      }
-    } else {
-      bitmapArrayReference = new SoftReference[_numBitmaps];
-      _bitmaps = new SoftReference<>(bitmapArrayReference);
-    }
-    synchronized (this) {
-      SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId];
-      ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null;
-      if (bitmap == null) {
-        bitmap = buildRoaringBitmap(dictId);
-        bitmapArrayReference[dictId] = new SoftReference<>(bitmap);
-      }
-      return bitmap;
-    }
-  }
-
-  private ImmutableRoaringBitmap buildRoaringBitmap(int dictId) {
     int offset = _offsetBuffer.getInt(dictId * Integer.BYTES);
     int length = _offsetBuffer.getInt((dictId + 1) * Integer.BYTES) - offset;
     return new ImmutableRoaringBitmap(_bitmapBuffer.toDirectByteBuffer(offset - _firstOffset, length));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java
index 7a60cd8..ccd79a2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java
@@ -25,18 +25,18 @@ import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 
 public class NullValueVectorReaderImpl implements NullValueVectorReader {
 
-  private final ImmutableRoaringBitmap _nullBitmap;
+  private final PinotDataBuffer _dataBuffer;
 
   public NullValueVectorReaderImpl(PinotDataBuffer dataBuffer) {
-    _nullBitmap = new ImmutableRoaringBitmap(dataBuffer.toDirectByteBuffer(0, (int) dataBuffer.size()));
+    _dataBuffer = dataBuffer;
   }
 
   public boolean isNull(int docId) {
-    return _nullBitmap.contains(docId);
+    return getNullBitmap().contains(docId);
   }
 
   @Override
   public ImmutableRoaringBitmap getNullBitmap() {
-    return _nullBitmap;
+    return new ImmutableRoaringBitmap(_dataBuffer.toDirectByteBuffer(0, (int) _dataBuffer.size()));
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java
index 09bb619..510fa80 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.segment.local.segment.index.readers;
 
 import com.google.common.base.Preconditions;
-import java.lang.ref.SoftReference;
 import java.nio.ByteBuffer;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator;
@@ -44,8 +43,6 @@ public class RangeIndexReaderImpl implements RangeIndexReader<ImmutableRoaringBi
   private final Number[] _rangeStartArray;
   private final Number _lastRangeEnd;
 
-  private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps;
-
   public RangeIndexReaderImpl(PinotDataBuffer dataBuffer) {
     _dataBuffer = dataBuffer;
     long offset = 0;
@@ -179,39 +176,6 @@ public class RangeIndexReaderImpl implements RangeIndexReader<ImmutableRoaringBi
   }
 
   private ImmutableRoaringBitmap getDocIds(int rangeId) {
-    SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = null;
-    // Return the bitmap if it's still on heap
-    if (_bitmaps != null) {
-      bitmapArrayReference = _bitmaps.get();
-      if (bitmapArrayReference != null) {
-        SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[rangeId];
-        if (bitmapReference != null) {
-          ImmutableRoaringBitmap value = bitmapReference.get();
-          if (value != null) {
-            return value;
-          }
-        }
-      } else {
-        bitmapArrayReference = new SoftReference[_numRanges];
-        _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
-      }
-    } else {
-      bitmapArrayReference = new SoftReference[_numRanges];
-      _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
-    }
-    synchronized (this) {
-      ImmutableRoaringBitmap value;
-      if (bitmapArrayReference[rangeId] == null || bitmapArrayReference[rangeId].get() == null) {
-        value = buildRoaringBitmapForIndex(rangeId);
-        bitmapArrayReference[rangeId] = new SoftReference<ImmutableRoaringBitmap>(value);
-      } else {
-        value = bitmapArrayReference[rangeId].get();
-      }
-      return value;
-    }
-  }
-
-  private synchronized ImmutableRoaringBitmap buildRoaringBitmapForIndex(final int rangeId) {
     final long currentOffset = getOffset(rangeId);
     final long nextOffset = getOffset(rangeId + 1);
     final int bufferLength = (int) (nextOffset - currentOffset);
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/FetchContext.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/FetchContext.java
index 03e0c68..e00ed08 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/FetchContext.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/FetchContext.java
@@ -27,10 +27,12 @@ import java.util.UUID;
  */
 public class FetchContext {
   private final UUID _fetchId;
+  private final String _segmentName;
   private final Set<String> _columns;
 
-  public FetchContext(UUID fetchId, Set<String> columns) {
+  public FetchContext(UUID fetchId, String segmentName, Set<String> columns) {
     _fetchId = fetchId;
+    _segmentName = segmentName;
     _columns = columns;
   }
 
@@ -43,6 +45,13 @@ public class FetchContext {
   }
 
   /**
+   * Segment name associated with this fetch context
+   */
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  /**
    * Columns to be fetched as part of this request
    */
   public Set<String> getColumns() {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org