You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/09/29 19:57:22 UTC
[pinot] branch master updated: Move acquire/release to right
before/after execution call (#7493)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d638444 Move acquire/release to right before/after execution call (#7493)
d638444 is described below
commit d638444e7254bdc881a299f37ac0909a5664f0ca
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Wed Sep 29 12:56:48 2021 -0700
Move acquire/release to right before/after execution call (#7493)
---
.../AcquireReleaseColumnsSegmentOperator.java | 43 ++--
.../core/operator/combine/BaseCombineOperator.java | 14 +-
.../combine/GroupByOrderByCombineOperator.java | 96 +++++----
.../plan/AcquireReleaseColumnsSegmentPlanNode.java | 13 +-
.../core/plan/maker/InstancePlanMakerImplV2.java | 2 +-
.../pinot/perf/BenchmarkRoaringBitmapCreation.java | 217 +++++++++++++++++++++
.../index/readers/BitmapInvertedIndexReader.java | 29 ---
.../index/readers/NullValueVectorReaderImpl.java | 8 +-
.../index/readers/RangeIndexReaderImpl.java | 36 ----
.../org/apache/pinot/segment/spi/FetchContext.java | 11 +-
10 files changed, 339 insertions(+), 130 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
index 127f38f..422374f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
@@ -20,41 +20,56 @@ package org.apache.pinot.core.operator;
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.plan.PlanNode;
import org.apache.pinot.segment.spi.FetchContext;
import org.apache.pinot.segment.spi.IndexSegment;
/**
* A common wrapper around the segment-level operator.
- * Provides an opportunity to acquire and release column buffers before reading data
+ * NOTE: This is only used if <code>pinot.server.query.executor.enable.prefetch</code> is true
+ * This creates a mechanism to acquire and release column buffers before reading data.
+ * This Operator is different from others in the following way:
+ * It expects the PlanNode of the execution, instead of the Operator.
+ * It runs the plan to get the operator, before it begins execution.
+ * The reason this is done is the planners access segment buffers,
+ * and we need to acquire the segment before any access is made to the buffers.
*/
public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
private static final String OPERATOR_NAME = "AcquireReleaseColumnsSegmentOperator";
- private final Operator _childOperator;
+ private final PlanNode _planNode;
private final IndexSegment _indexSegment;
private final FetchContext _fetchContext;
+ private Operator _childOperator;
- public AcquireReleaseColumnsSegmentOperator(Operator childOperator, IndexSegment indexSegment,
- FetchContext fetchContext) {
- _childOperator = childOperator;
+ public AcquireReleaseColumnsSegmentOperator(PlanNode planNode, IndexSegment indexSegment, FetchContext fetchContext) {
+ _planNode = planNode;
_indexSegment = indexSegment;
_fetchContext = fetchContext;
}
/**
- * Makes a call to acquire column buffers from {@link IndexSegment} before getting nextBlock from childOperator,
- * and
- * a call to release the column buffers from {@link IndexSegment} after.
+ * Runs the planNode to get the childOperator, and then proceeds with execution.
*/
@Override
protected Block getNextBlock() {
+ _childOperator = _planNode.run();
+ return _childOperator.nextBlock();
+ }
+
+ /**
+ * Acquires the indexSegment using the provided fetchContext
+ */
+ public void acquire() {
_indexSegment.acquire(_fetchContext);
- try {
- return _childOperator.nextBlock();
- } finally {
- _indexSegment.release(_fetchContext);
- }
+ }
+
+ /**
+ * Releases the indexSegment using the provided fetchContext
+ */
+ public void release() {
+ _indexSegment.release(_fetchContext);
}
@Override
@@ -64,6 +79,6 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
@Override
public ExecutionStatistics getExecutionStatistics() {
- return _childOperator.getExecutionStatistics();
+ return _childOperator == null ? new ExecutionStatistics(0, 0, 0, 0) : _childOperator.getExecutionStatistics();
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 4d93c02..eb77329 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -151,7 +152,18 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
*/
protected void processSegments(int taskIndex) {
for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
- IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+ Operator operator = _operators.get(operatorIndex);
+ IntermediateResultsBlock resultsBlock;
+ try {
+ if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+ ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+ }
+ resultsBlock = (IntermediateResultsBlock) operator.nextBlock();
+ } finally {
+ if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+ ((AcquireReleaseColumnsSegmentOperator) operator).release();
+ }
+ }
if (isQuerySatisfied(resultsBlock)) {
// Query is satisfied, skip processing the remaining segments
_blockingQueue.offer(resultsBlock);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index 2ece042..e4203bc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -39,6 +39,7 @@ import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
@@ -121,58 +122,67 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
@Override
protected void processSegments(int taskIndex) {
for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
- IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
-
- if (_indexedTable == null) {
- synchronized (this) {
- if (_indexedTable == null) {
- DataSchema dataSchema = resultsBlock.getDataSchema();
- // NOTE: Use trimSize as resultSize on server size.
- if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
- // special case of trim threshold where it is set to max value.
- // there won't be any trimming during upsert in this case.
- // thus we can avoid the overhead of read-lock and write-lock
- // in the upsert method.
- _indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
- } else {
- _indexedTable =
- new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
+ Operator operator = _operators.get(operatorIndex);
+ try {
+ if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+ ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+ }
+ IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) operator.nextBlock();
+ if (_indexedTable == null) {
+ synchronized (this) {
+ if (_indexedTable == null) {
+ DataSchema dataSchema = resultsBlock.getDataSchema();
+ // NOTE: Use trimSize as resultSize on server size.
+ if (_trimThreshold >= MAX_TRIM_THRESHOLD) {
+ // special case of trim threshold where it is set to max value.
+ // there won't be any trimming during upsert in this case.
+ // thus we can avoid the overhead of read-lock and write-lock
+ // in the upsert method.
+ _indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize);
+ } else {
+ _indexedTable =
+ new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold);
+ }
}
}
}
- }
- // Merge processing exceptions.
- List<ProcessingException> processingExceptionsToMerge = resultsBlock.getProcessingExceptions();
- if (processingExceptionsToMerge != null) {
- _mergedProcessingExceptions.addAll(processingExceptionsToMerge);
- }
+ // Merge processing exceptions.
+ List<ProcessingException> processingExceptionsToMerge = resultsBlock.getProcessingExceptions();
+ if (processingExceptionsToMerge != null) {
+ _mergedProcessingExceptions.addAll(processingExceptionsToMerge);
+ }
- // Merge aggregation group-by result.
- // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable
- Collection<IntermediateRecord> intermediateRecords = resultsBlock.getIntermediateRecords();
- // For now, only GroupBy OrderBy query has pre-constructed intermediate records
- if (intermediateRecords == null) {
// Merge aggregation group-by result.
- AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
- if (aggregationGroupByResult != null) {
- // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable
- Iterator<GroupKeyGenerator.GroupKey> dicGroupKeyIterator = aggregationGroupByResult.getGroupKeyIterator();
- while (dicGroupKeyIterator.hasNext()) {
- GroupKeyGenerator.GroupKey groupKey = dicGroupKeyIterator.next();
- Object[] keys = groupKey._keys;
- Object[] values = Arrays.copyOf(keys, _numColumns);
- int groupId = groupKey._groupId;
- for (int i = 0; i < _numAggregationFunctions; i++) {
- values[_numGroupByExpressions + i] = aggregationGroupByResult.getResultForGroupId(i, groupId);
+ // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable
+ Collection<IntermediateRecord> intermediateRecords = resultsBlock.getIntermediateRecords();
+ // For now, only GroupBy OrderBy query has pre-constructed intermediate records
+ if (intermediateRecords == null) {
+ // Merge aggregation group-by result.
+ AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
+ if (aggregationGroupByResult != null) {
+ // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable
+ Iterator<GroupKeyGenerator.GroupKey> dicGroupKeyIterator = aggregationGroupByResult.getGroupKeyIterator();
+ while (dicGroupKeyIterator.hasNext()) {
+ GroupKeyGenerator.GroupKey groupKey = dicGroupKeyIterator.next();
+ Object[] keys = groupKey._keys;
+ Object[] values = Arrays.copyOf(keys, _numColumns);
+ int groupId = groupKey._groupId;
+ for (int i = 0; i < _numAggregationFunctions; i++) {
+ values[_numGroupByExpressions + i] = aggregationGroupByResult.getResultForGroupId(i, groupId);
+ }
+ _indexedTable.upsert(new Key(keys), new Record(values));
}
- _indexedTable.upsert(new Key(keys), new Record(values));
+ }
+ } else {
+ for (IntermediateRecord intermediateResult : intermediateRecords) {
+ //TODO: change upsert api so that it accepts intermediateRecord directly
+ _indexedTable.upsert(intermediateResult._key, intermediateResult._record);
}
}
- } else {
- for (IntermediateRecord intermediateResult : intermediateRecords) {
- //TODO: change upsert api so that it accepts intermediateRecord directly
- _indexedTable.upsert(intermediateResult._key, intermediateResult._record);
+ } finally {
+ if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+ ((AcquireReleaseColumnsSegmentOperator) operator).release();
}
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
index 5a9f506..73406c2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
@@ -25,6 +25,13 @@ import org.apache.pinot.segment.spi.IndexSegment;
/**
* A common wrapper for the segment-level plan node.
+ * NOTE: This is only used if <code>pinot.server.query.executor.enable.prefetch</code> is true
+ * This PlanNode differs from the other PlanNodes in the following way:
+ * This PlanNode does not invoke a <code>run</code> on the childOperator in its run method.
+ * Instead, it passes the childPlanNode as is, to the {@link AcquireReleaseColumnsSegmentOperator},
+ * and it is that operator's responsibility to run the childPlanNode and get the childOperator before execution.
+ * The reason this is done is the planners access segment buffers,
+ * and we need to acquire the segment before any access is made to the buffers.
*/
public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode {
@@ -39,8 +46,12 @@ public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode {
_fetchContext = fetchContext;
}
+ /**
+ * Doesn't run the childPlan,
+ * but instead just creates a {@link AcquireReleaseColumnsSegmentOperator} and passes the plan to it
+ */
@Override
public AcquireReleaseColumnsSegmentOperator run() {
- return new AcquireReleaseColumnsSegmentOperator(_childPlanNode.run(), _indexSegment, _fetchContext);
+ return new AcquireReleaseColumnsSegmentOperator(_childPlanNode, _indexSegment, _fetchContext);
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index ea54327..f5567a4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -175,7 +175,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
} else {
columns = queryContext.getColumns();
}
- FetchContext fetchContext = new FetchContext(UUID.randomUUID(), columns);
+ FetchContext fetchContext = new FetchContext(UUID.randomUUID(), indexSegment.getSegmentName(), columns);
fetchContexts.add(fetchContext);
planNodes.add(
new AcquireReleaseColumnsSegmentPlanNode(makeSegmentPlanNode(indexSegment, queryContext), indexSegment,
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java
new file mode 100644
index 0000000..2d85652
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.nio.ByteOrder;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.BitmapInvertedIndexWriter;
+import org.apache.pinot.segment.spi.memory.PinotByteBuffer;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.profile.GCProfiler;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.TimeValue;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+
+/**
+ * Benchmark created to test the impact of removing the SoftReference array cache for ImmutableRoaringBitmap
+ */
+@State(Scope.Benchmark)
+@Fork(1)
+public class BenchmarkRoaringBitmapCreation {
+
+ private static final int NUM_DOCS = 1_000_000;
+ private static final int CARDINALITY = 100_000;
+ private static final File TEMP_DIR =
+ new File(FileUtils.getTempDirectory(), "bitmap_creation_benchmark_" + System.currentTimeMillis());
+
+ @Param({"100", "10000", "99999"}) // higher this is, lesser the cache access
+ public int _dictIdsToRead;
+
+ private int _numBitmaps;
+ private BitmapInvertedIndexWriter _bitmapInvertedIndexWriter;
+ private SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmapsArrayReference = null;
+ private SoftReference<SoftReference<Pair<Integer, Integer>>[]> _offsetLengthPairsArrayReference = null;
+ private PinotDataBuffer _offsetLengthBuffer;
+ private PinotDataBuffer _bitmapBuffer;
+ private int _firstOffset;
+
+ @Setup
+ public void setup()
+ throws IllegalAccessException, InstantiationException, IOException {
+ _numBitmaps = CARDINALITY;
+
+ File bufferDir = new File(TEMP_DIR, "cardinality_" + CARDINALITY);
+ FileUtils.forceMkdir(bufferDir);
+ File bufferFile = new File(bufferDir, "buffer");
+ _bitmapInvertedIndexWriter = new BitmapInvertedIndexWriter(bufferFile, _numBitmaps);
+ // Insert between 10-1000 values per bitmap
+ for (int i = 0; i < _numBitmaps; i++) {
+ int size = 10 + RandomUtils.nextInt(990);
+ int[] data = new int[size];
+ for (int j = 0; j < size; j++) {
+ data[j] = RandomUtils
+ .nextInt(NUM_DOCS); // docIds will repeat across bitmaps, but doesn't matter for purpose of this benchmark
+ }
+ RoaringBitmap bitmap = RoaringBitmap.bitmapOf(data);
+ _bitmapInvertedIndexWriter.add(bitmap);
+ }
+
+ PinotDataBuffer dataBuffer = PinotByteBuffer.mapReadOnlyBigEndianFile(bufferFile);
+ long offsetBufferEndOffset = (long) (_numBitmaps + 1) * Integer.BYTES;
+ _offsetLengthBuffer = dataBuffer.view(0, offsetBufferEndOffset, ByteOrder.BIG_ENDIAN);
+ _bitmapBuffer = dataBuffer.view(offsetBufferEndOffset, dataBuffer.size());
+ _firstOffset = _offsetLengthBuffer.getInt(0);
+ }
+
+ @TearDown
+ public void teardown()
+ throws IOException {
+ _bitmapInvertedIndexWriter.close();
+ FileUtils.deleteQuietly(TEMP_DIR);
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public boolean cacheReferences() {
+ int dictId = RandomUtils.nextInt(_dictIdsToRead);
+ ImmutableRoaringBitmap roaringBitmapFromCache = getRoaringBitmapFromCache(dictId);
+ return roaringBitmapFromCache.isEmpty();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public boolean alwaysBuild() {
+ int dictId = RandomUtils.nextInt(_dictIdsToRead);
+ ImmutableRoaringBitmap immutableRoaringBitmap = buildRoaringBitmap(dictId);
+ return immutableRoaringBitmap.isEmpty();
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public boolean alwaysBuildCachedOffsetAndLength() {
+ int dictId = RandomUtils.nextInt(_dictIdsToRead);
+ ImmutableRoaringBitmap immutableRoaringBitmap = buildRoaringBitmapUsingOffsetPairFromCache(dictId);
+ return immutableRoaringBitmap.isEmpty();
+ }
+
+ /**
+ * Code as of before this commit, using an array of SoftReference for the ImmutableRoaringBitmap
+ */
+ private ImmutableRoaringBitmap getRoaringBitmapFromCache(int dictId) {
+ SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference =
+ (_bitmapsArrayReference != null) ? _bitmapsArrayReference.get() : null;
+ if (bitmapArrayReference != null) {
+ SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId];
+ ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null;
+ if (bitmap != null) {
+ return bitmap;
+ }
+ } else {
+ bitmapArrayReference = new SoftReference[_numBitmaps];
+ _bitmapsArrayReference = new SoftReference<>(bitmapArrayReference);
+ }
+ synchronized (this) {
+ SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId];
+ ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null;
+ if (bitmap == null) {
+ bitmap = buildRoaringBitmap(dictId);
+ bitmapArrayReference[dictId] = new SoftReference<>(bitmap);
+ }
+ return bitmap;
+ }
+ }
+
+ private ImmutableRoaringBitmap buildRoaringBitmap(int dictId) {
+ Pair<Integer, Integer> offsetLengthPair = buildOffsetLengthPair(dictId);
+ return buildRoaringBitmap(offsetLengthPair);
+ }
+
+ private Pair<Integer, Integer> buildOffsetLengthPair(int dictId) {
+ int offset = _offsetLengthBuffer.getInt(dictId * Integer.BYTES);
+ int length = _offsetLengthBuffer.getInt((dictId + 1) * Integer.BYTES) - offset;
+ return Pair.of(offset, length);
+ }
+
+ private ImmutableRoaringBitmap buildRoaringBitmap(Pair<Integer, Integer> offsetLengthPair) {
+ return new ImmutableRoaringBitmap(
+ _bitmapBuffer.toDirectByteBuffer(offsetLengthPair.getLeft() - _firstOffset, offsetLengthPair.getRight()));
+ }
+
+ private ImmutableRoaringBitmap buildRoaringBitmapUsingOffsetPairFromCache(int dictId) {
+ return buildRoaringBitmap(getOffsetLengthPairFromCache(dictId));
+ }
+
+ private Pair<Integer, Integer> getOffsetLengthPairFromCache(int dictId) {
+
+ SoftReference<Pair<Integer, Integer>>[] offsetLengthPairArrayReference =
+ (_offsetLengthPairsArrayReference != null) ? _offsetLengthPairsArrayReference.get() : null;
+ if (offsetLengthPairArrayReference != null) {
+ SoftReference<Pair<Integer, Integer>> offsetLengthPairReference = offsetLengthPairArrayReference[dictId];
+ Pair<Integer, Integer> offsetLengthPair =
+ (offsetLengthPairReference != null) ? offsetLengthPairReference.get() : null;
+ if (offsetLengthPair != null) {
+ return offsetLengthPair;
+ }
+ } else {
+ offsetLengthPairArrayReference = new SoftReference[_numBitmaps];
+ _offsetLengthPairsArrayReference = new SoftReference<>(offsetLengthPairArrayReference);
+ }
+ synchronized (this) {
+ SoftReference<Pair<Integer, Integer>> offsetLengthPairReference = offsetLengthPairArrayReference[dictId];
+ Pair<Integer, Integer> offsetLengthPair =
+ (offsetLengthPairReference != null) ? offsetLengthPairReference.get() : null;
+ if (offsetLengthPair == null) {
+ offsetLengthPair = buildOffsetLengthPair(dictId);
+ offsetLengthPairArrayReference[dictId] = new SoftReference<>(offsetLengthPair);
+ }
+ return offsetLengthPair;
+ }
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ ChainedOptionsBuilder opt = new OptionsBuilder().include(BenchmarkRoaringBitmapCreation.class.getSimpleName())
+ .warmupTime(TimeValue.seconds(10)).warmupIterations(1).measurementTime(TimeValue.seconds(60))
+ .measurementIterations(1).forks(1).addProfiler(GCProfiler.class);
+ new Runner(opt.build()).run();
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BitmapInvertedIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BitmapInvertedIndexReader.java
index 491fe67..48905a7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BitmapInvertedIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BitmapInvertedIndexReader.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.segment.local.segment.index.readers;
-import java.lang.ref.SoftReference;
import java.nio.ByteOrder;
import org.apache.pinot.segment.local.segment.creator.impl.inv.BitmapInvertedIndexWriter;
import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
@@ -35,7 +34,6 @@ import org.slf4j.LoggerFactory;
public class BitmapInvertedIndexReader implements InvertedIndexReader<ImmutableRoaringBitmap> {
public static final Logger LOGGER = LoggerFactory.getLogger(BitmapInvertedIndexReader.class);
- private final int _numBitmaps;
private final PinotDataBuffer _offsetBuffer;
private final PinotDataBuffer _bitmapBuffer;
@@ -44,11 +42,7 @@ public class BitmapInvertedIndexReader implements InvertedIndexReader<ImmutableR
// 2. Offset buffer stores the offsets within the bitmap buffer
private final int _firstOffset;
- private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps;
-
public BitmapInvertedIndexReader(PinotDataBuffer dataBuffer, int numBitmaps) {
- _numBitmaps = numBitmaps;
-
long offsetBufferEndOffset = (long) (numBitmaps + 1) * Integer.BYTES;
_offsetBuffer = dataBuffer.view(0, offsetBufferEndOffset, ByteOrder.BIG_ENDIAN);
_bitmapBuffer = dataBuffer.view(offsetBufferEndOffset, dataBuffer.size());
@@ -59,29 +53,6 @@ public class BitmapInvertedIndexReader implements InvertedIndexReader<ImmutableR
@SuppressWarnings("unchecked")
@Override
public ImmutableRoaringBitmap getDocIds(int dictId) {
- SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = (_bitmaps != null) ? _bitmaps.get() : null;
- if (bitmapArrayReference != null) {
- SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId];
- ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null;
- if (bitmap != null) {
- return bitmap;
- }
- } else {
- bitmapArrayReference = new SoftReference[_numBitmaps];
- _bitmaps = new SoftReference<>(bitmapArrayReference);
- }
- synchronized (this) {
- SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId];
- ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null;
- if (bitmap == null) {
- bitmap = buildRoaringBitmap(dictId);
- bitmapArrayReference[dictId] = new SoftReference<>(bitmap);
- }
- return bitmap;
- }
- }
-
- private ImmutableRoaringBitmap buildRoaringBitmap(int dictId) {
int offset = _offsetBuffer.getInt(dictId * Integer.BYTES);
int length = _offsetBuffer.getInt((dictId + 1) * Integer.BYTES) - offset;
return new ImmutableRoaringBitmap(_bitmapBuffer.toDirectByteBuffer(offset - _firstOffset, length));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java
index 7a60cd8..ccd79a2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java
@@ -25,18 +25,18 @@ import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
public class NullValueVectorReaderImpl implements NullValueVectorReader {
- private final ImmutableRoaringBitmap _nullBitmap;
+ private final PinotDataBuffer _dataBuffer;
public NullValueVectorReaderImpl(PinotDataBuffer dataBuffer) {
- _nullBitmap = new ImmutableRoaringBitmap(dataBuffer.toDirectByteBuffer(0, (int) dataBuffer.size()));
+ _dataBuffer = dataBuffer;
}
public boolean isNull(int docId) {
- return _nullBitmap.contains(docId);
+ return getNullBitmap().contains(docId);
}
@Override
public ImmutableRoaringBitmap getNullBitmap() {
- return _nullBitmap;
+ return new ImmutableRoaringBitmap(_dataBuffer.toDirectByteBuffer(0, (int) _dataBuffer.size()));
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java
index 09bb619..510fa80 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java
@@ -19,7 +19,6 @@
package org.apache.pinot.segment.local.segment.index.readers;
import com.google.common.base.Preconditions;
-import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator;
@@ -44,8 +43,6 @@ public class RangeIndexReaderImpl implements RangeIndexReader<ImmutableRoaringBi
private final Number[] _rangeStartArray;
private final Number _lastRangeEnd;
- private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps;
-
public RangeIndexReaderImpl(PinotDataBuffer dataBuffer) {
_dataBuffer = dataBuffer;
long offset = 0;
@@ -179,39 +176,6 @@ public class RangeIndexReaderImpl implements RangeIndexReader<ImmutableRoaringBi
}
private ImmutableRoaringBitmap getDocIds(int rangeId) {
- SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = null;
- // Return the bitmap if it's still on heap
- if (_bitmaps != null) {
- bitmapArrayReference = _bitmaps.get();
- if (bitmapArrayReference != null) {
- SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[rangeId];
- if (bitmapReference != null) {
- ImmutableRoaringBitmap value = bitmapReference.get();
- if (value != null) {
- return value;
- }
- }
- } else {
- bitmapArrayReference = new SoftReference[_numRanges];
- _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
- }
- } else {
- bitmapArrayReference = new SoftReference[_numRanges];
- _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
- }
- synchronized (this) {
- ImmutableRoaringBitmap value;
- if (bitmapArrayReference[rangeId] == null || bitmapArrayReference[rangeId].get() == null) {
- value = buildRoaringBitmapForIndex(rangeId);
- bitmapArrayReference[rangeId] = new SoftReference<ImmutableRoaringBitmap>(value);
- } else {
- value = bitmapArrayReference[rangeId].get();
- }
- return value;
- }
- }
-
- private synchronized ImmutableRoaringBitmap buildRoaringBitmapForIndex(final int rangeId) {
final long currentOffset = getOffset(rangeId);
final long nextOffset = getOffset(rangeId + 1);
final int bufferLength = (int) (nextOffset - currentOffset);
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/FetchContext.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/FetchContext.java
index 03e0c68..e00ed08 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/FetchContext.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/FetchContext.java
@@ -27,10 +27,12 @@ import java.util.UUID;
*/
public class FetchContext {
private final UUID _fetchId;
+ private final String _segmentName;
private final Set<String> _columns;
- public FetchContext(UUID fetchId, Set<String> columns) {
+ public FetchContext(UUID fetchId, String segmentName, Set<String> columns) {
_fetchId = fetchId;
+ _segmentName = segmentName;
_columns = columns;
}
@@ -43,6 +45,13 @@ public class FetchContext {
}
/**
+ * Segment name associated with this fetch context
+ */
+ public String getSegmentName() {
+ return _segmentName;
+ }
+
+ /**
* Columns to be fetched as part of this request
*/
public Set<String> getColumns() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org