You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/09/24 01:53:44 UTC
[pinot] 01/01: Benchmark
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch acquire_release
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 57a971d8c50be484249341bd80d0893d8fcc95cc
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Sep 23 18:53:07 2021 -0700
Benchmark
---
.../AcquireReleaseColumnsSegmentOperator.java | 25 ++--
.../core/operator/combine/BaseCombineOperator.java | 11 +-
.../combine/GroupByOrderByCombineOperator.java | 17 ++-
.../plan/AcquireReleaseColumnsSegmentPlanNode.java | 2 +-
.../pinot/perf/BenchmarkRoaringBitmapCreation.java | 164 +++++++++++++++++++++
.../index/readers/NullValueVectorReaderImpl.java | 9 +-
.../index/readers/RangeIndexReaderImpl.java | 36 -----
7 files changed, 207 insertions(+), 57 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
index 127f38f..04dc79b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.operator;
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.plan.PlanNode;
import org.apache.pinot.segment.spi.FetchContext;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -31,13 +32,13 @@ import org.apache.pinot.segment.spi.IndexSegment;
public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
private static final String OPERATOR_NAME = "AcquireReleaseColumnsSegmentOperator";
- private final Operator _childOperator;
+ private final PlanNode _planNode;
private final IndexSegment _indexSegment;
private final FetchContext _fetchContext;
+ private Operator _childOperator;
- public AcquireReleaseColumnsSegmentOperator(Operator childOperator, IndexSegment indexSegment,
- FetchContext fetchContext) {
- _childOperator = childOperator;
+ public AcquireReleaseColumnsSegmentOperator(PlanNode planNode, IndexSegment indexSegment, FetchContext fetchContext) {
+ _planNode = planNode;
_indexSegment = indexSegment;
_fetchContext = fetchContext;
}
@@ -49,12 +50,16 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
*/
@Override
protected Block getNextBlock() {
+ _childOperator = _planNode.run();
+ return _childOperator.nextBlock();
+ }
+
+ public void acquire() {
_indexSegment.acquire(_fetchContext);
- try {
- return _childOperator.nextBlock();
- } finally {
- _indexSegment.release(_fetchContext);
- }
+ }
+
+ public void release() {
+ _indexSegment.release(_fetchContext);
}
@Override
@@ -64,6 +69,6 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
@Override
public ExecutionStatistics getExecutionStatistics() {
- return _childOperator.getExecutionStatistics();
+ return _childOperator == null ? new ExecutionStatistics(0, 0, 0, 0) : _childOperator.getExecutionStatistics();
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index eeb27c2..89c6f1f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -146,8 +147,12 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
*/
protected void processSegments(int taskIndex) {
for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
+ Operator operator = _operators.get(operatorIndex);
try {
- IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+ if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+ ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+ }
+ IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) operator.nextBlock();
if (isQuerySatisfied(resultsBlock)) {
// Query is satisfied, skip processing the remaining segments
_blockingQueue.offer(resultsBlock);
@@ -164,6 +169,10 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
e);
_blockingQueue.offer(new IntermediateResultsBlock(e));
return;
+ } finally {
+ if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+ ((AcquireReleaseColumnsSegmentOperator) operator).release();
+ }
}
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index b2d9373..d6c5dac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -41,6 +41,7 @@ import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
@@ -125,9 +126,12 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
*/
@Override
protected void processSegments(int threadIndex) {
+ Operator operator = _operators.get(threadIndex);
try {
- IntermediateResultsBlock intermediateResultsBlock =
- (IntermediateResultsBlock) _operators.get(threadIndex).nextBlock();
+ if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+ ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+ }
+ IntermediateResultsBlock intermediateResultsBlock = (IntermediateResultsBlock) operator.nextBlock();
_initLock.lock();
try {
@@ -186,9 +190,12 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
// Early-terminated because query times out or is already satisfied
} catch (Exception e) {
LOGGER.error("Caught exception while processing and combining group-by order-by for index: {}, operator: {}, "
- + "queryContext: {}", threadIndex, _operators.get(threadIndex).getClass().getName(), _queryContext, e);
+ + "queryContext: {}", threadIndex, operator.getClass().getName(), _queryContext, e);
_mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
} finally {
+ if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+ ((AcquireReleaseColumnsSegmentOperator) operator).release();
+ }
_operatorLatch.countDown();
}
}
@@ -213,8 +220,8 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
boolean opCompleted = _operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
if (!opCompleted) {
// If this happens, the broker side should already timed out, just log the error and return
- String errorMessage =
- String.format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs,
+ String errorMessage = String
+ .format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs,
_queryContext);
LOGGER.error(errorMessage);
return new IntermediateResultsBlock(new TimeoutException(errorMessage));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
index 5a9f506..5e517e1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
@@ -41,6 +41,6 @@ public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode {
@Override
public AcquireReleaseColumnsSegmentOperator run() {
- return new AcquireReleaseColumnsSegmentOperator(_childPlanNode.run(), _indexSegment, _fetchContext);
+ return new AcquireReleaseColumnsSegmentOperator(_childPlanNode, _indexSegment, _fetchContext);
}
}
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java
new file mode 100644
index 0000000..2b43508
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.nio.ByteOrder;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.BitmapInvertedIndexWriter;
+import org.apache.pinot.segment.spi.memory.PinotByteBuffer;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.TimeValue;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+
+@State(Scope.Benchmark)
+@Fork(value = 1, jvmArgs = {"-server", "-Xmx8G", "-XX:MaxDirectMemorySize=16G"})
+public class BenchmarkRoaringBitmapCreation {
+
+ private static final int NUM_DOCS = 1_000_000;
+ private static final int NUM_READS = 10000;
+ private static final File TEMP_DIR =
+ new File(FileUtils.getTempDirectory(), "bitmap_creation_benchmark_" + System.currentTimeMillis());
+
+ @Param({"1000", "10000", "100000"}) // 1k, 10k, 100k
+ public int _cardinality;
+
+ private int _numBitmaps;
+ private BitmapInvertedIndexWriter _bitmapInvertedIndexWriter;
+ private SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps;
+ private PinotDataBuffer _offsetBuffer;
+ private PinotDataBuffer _bitmapBuffer;
+ private int _firstOffset;
+ private int[] _dictIdsToQuery;
+
+ @Setup
+ public void setup()
+ throws IllegalAccessException, InstantiationException, IOException {
+ _numBitmaps = _cardinality;
+ File bufferDir = new File(TEMP_DIR, "cardinality_" + _cardinality);
+ FileUtils.forceMkdir(bufferDir);
+ File bufferFile = new File(bufferDir, "buffer");
+ _bitmapInvertedIndexWriter = new BitmapInvertedIndexWriter(bufferFile, _numBitmaps);
+ // Insert between 10-1000 values per bitmap
+ for (int i = 0; i < _numBitmaps; i++) {
+ int size = 10 + RandomUtils.nextInt(990);
+ int[] data = new int[size];
+ for (int j = 0; j < size; j++) {
+ data[j] = RandomUtils.nextInt(NUM_DOCS); // docIds will repeat across bitmaps, but doesn't matter for purpose of this benchmark
+ }
+ RoaringBitmap bitmap = RoaringBitmap.bitmapOf(data);
+ _bitmapInvertedIndexWriter.add(bitmap);
+ }
+ PinotDataBuffer dataBuffer = PinotByteBuffer.mapReadOnlyBigEndianFile(bufferFile);
+ long offsetBufferEndOffset = (long) (_numBitmaps + 1) * Integer.BYTES;
+ _offsetBuffer = dataBuffer.view(0, offsetBufferEndOffset, ByteOrder.BIG_ENDIAN);
+ _bitmapBuffer = dataBuffer.view(offsetBufferEndOffset, dataBuffer.size());
+ _firstOffset = _offsetBuffer.getInt(0);
+
+ // A fixed set of dictIds to read. This ensures same bitmap accessed multiple times.
+ _dictIdsToQuery = new int[100];
+ for (int i = 0; i < 100; i++) {
+ _dictIdsToQuery[i] = RandomUtils.nextInt(_cardinality);
+ }
+ }
+
+ @TearDown
+ public void teardown()
+ throws IOException {
+ _bitmapInvertedIndexWriter.close();
+ FileUtils.deleteQuietly(TEMP_DIR);
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public void cacheReferences() {
+ _bitmaps = null;
+ for (int i = 0; i < NUM_READS; i++) {
+ int dictId = _dictIdsToQuery[RandomUtils.nextInt(_dictIdsToQuery.length)];
+ getRoaringBitmapFromCache(dictId);
+ }
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.AverageTime)
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public void alwaysBuild() {
+ for (int i = 0; i < NUM_READS; i++) {
+ int dictId = _dictIdsToQuery[RandomUtils.nextInt(_dictIdsToQuery.length)];
+ buildRoaringBitmap(dictId);
+ }
+ }
+
+ private ImmutableRoaringBitmap getRoaringBitmapFromCache(int dictId) {
+ SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = (_bitmaps != null) ? _bitmaps.get() : null;
+ if (bitmapArrayReference != null) {
+ SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId];
+ ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null;
+ if (bitmap != null) {
+ return bitmap;
+ }
+ } else {
+ bitmapArrayReference = new SoftReference[_numBitmaps];
+ _bitmaps = new SoftReference<>(bitmapArrayReference);
+ }
+ synchronized (this) {
+ SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId];
+ ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null;
+ if (bitmap == null) {
+ bitmap = buildRoaringBitmap(dictId);
+ bitmapArrayReference[dictId] = new SoftReference<>(bitmap);
+ }
+ return bitmap;
+ }
+ }
+
+ private ImmutableRoaringBitmap buildRoaringBitmap(int dictId) {
+ int offset = _offsetBuffer.getInt(dictId * Integer.BYTES);
+ int length = _offsetBuffer.getInt((dictId + 1) * Integer.BYTES) - offset;
+ return new ImmutableRoaringBitmap(_bitmapBuffer.toDirectByteBuffer(offset - _firstOffset, length));
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ ChainedOptionsBuilder opt = new OptionsBuilder().include(BenchmarkRoaringBitmapCreation.class.getSimpleName())
+ .warmupTime(TimeValue.seconds(10)).warmupIterations(1).measurementTime(TimeValue.seconds(5))
+ .measurementIterations(1).forks(1);
+ new Runner(opt.build()).run();
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java
index 7a60cd8..9dc6112 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java
@@ -25,18 +25,19 @@ import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
public class NullValueVectorReaderImpl implements NullValueVectorReader {
- private final ImmutableRoaringBitmap _nullBitmap;
+ private final PinotDataBuffer _dataBuffer;
public NullValueVectorReaderImpl(PinotDataBuffer dataBuffer) {
- _nullBitmap = new ImmutableRoaringBitmap(dataBuffer.toDirectByteBuffer(0, (int) dataBuffer.size()));
+ _dataBuffer = dataBuffer;
}
public boolean isNull(int docId) {
- return _nullBitmap.contains(docId);
+ ImmutableRoaringBitmap nullBitmap = new ImmutableRoaringBitmap(_dataBuffer.toDirectByteBuffer(0, (int) _dataBuffer.size()));
+ return nullBitmap.contains(docId);
}
@Override
public ImmutableRoaringBitmap getNullBitmap() {
- return _nullBitmap;
+ return new ImmutableRoaringBitmap(_dataBuffer.toDirectByteBuffer(0, (int) _dataBuffer.size()));
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java
index a2733ce..697fda1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java
@@ -19,7 +19,6 @@
package org.apache.pinot.segment.local.segment.index.readers;
import com.google.common.base.Preconditions;
-import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
@@ -43,8 +42,6 @@ public class RangeIndexReaderImpl implements RangeIndexReader<ImmutableRoaringBi
private final Number[] _rangeStartArray;
private final Number _lastRangeEnd;
- private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps;
-
public RangeIndexReaderImpl(PinotDataBuffer dataBuffer) {
_dataBuffer = dataBuffer;
long offset = 0;
@@ -177,39 +174,6 @@ public class RangeIndexReaderImpl implements RangeIndexReader<ImmutableRoaringBi
}
private ImmutableRoaringBitmap getDocIds(int rangeId) {
- SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = null;
- // Return the bitmap if it's still on heap
- if (_bitmaps != null) {
- bitmapArrayReference = _bitmaps.get();
- if (bitmapArrayReference != null) {
- SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[rangeId];
- if (bitmapReference != null) {
- ImmutableRoaringBitmap value = bitmapReference.get();
- if (value != null) {
- return value;
- }
- }
- } else {
- bitmapArrayReference = new SoftReference[_numRanges];
- _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
- }
- } else {
- bitmapArrayReference = new SoftReference[_numRanges];
- _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
- }
- synchronized (this) {
- ImmutableRoaringBitmap value;
- if (bitmapArrayReference[rangeId] == null || bitmapArrayReference[rangeId].get() == null) {
- value = buildRoaringBitmapForIndex(rangeId);
- bitmapArrayReference[rangeId] = new SoftReference<ImmutableRoaringBitmap>(value);
- } else {
- value = bitmapArrayReference[rangeId].get();
- }
- return value;
- }
- }
-
- private synchronized ImmutableRoaringBitmap buildRoaringBitmapForIndex(final int rangeId) {
final long currentOffset = getOffset(rangeId);
final long nextOffset = getOffset(rangeId + 1);
final int bufferLength = (int) (nextOffset - currentOffset);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org