You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/09/24 01:53:44 UTC

[pinot] 01/01: Benchmark

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch acquire_release
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 57a971d8c50be484249341bd80d0893d8fcc95cc
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Sep 23 18:53:07 2021 -0700

    Benchmark
---
 .../AcquireReleaseColumnsSegmentOperator.java      |  25 ++--
 .../core/operator/combine/BaseCombineOperator.java |  11 +-
 .../combine/GroupByOrderByCombineOperator.java     |  17 ++-
 .../plan/AcquireReleaseColumnsSegmentPlanNode.java |   2 +-
 .../pinot/perf/BenchmarkRoaringBitmapCreation.java | 164 +++++++++++++++++++++
 .../index/readers/NullValueVectorReaderImpl.java   |   9 +-
 .../index/readers/RangeIndexReaderImpl.java        |  36 -----
 7 files changed, 207 insertions(+), 57 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
index 127f38f..04dc79b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.operator;
 
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.plan.PlanNode;
 import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 
@@ -31,13 +32,13 @@ import org.apache.pinot.segment.spi.IndexSegment;
 public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
   private static final String OPERATOR_NAME = "AcquireReleaseColumnsSegmentOperator";
 
-  private final Operator _childOperator;
+  private final PlanNode _planNode;
   private final IndexSegment _indexSegment;
   private final FetchContext _fetchContext;
+  private Operator _childOperator;
 
-  public AcquireReleaseColumnsSegmentOperator(Operator childOperator, IndexSegment indexSegment,
-      FetchContext fetchContext) {
-    _childOperator = childOperator;
+  public AcquireReleaseColumnsSegmentOperator(PlanNode planNode, IndexSegment indexSegment, FetchContext fetchContext) {
+    _planNode = planNode;
     _indexSegment = indexSegment;
     _fetchContext = fetchContext;
   }
@@ -49,12 +50,16 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
    */
   @Override
   protected Block getNextBlock() {
+    _childOperator = _planNode.run();
+    return _childOperator.nextBlock();
+  }
+
+  public void acquire() {
     _indexSegment.acquire(_fetchContext);
-    try {
-      return _childOperator.nextBlock();
-    } finally {
-      _indexSegment.release(_fetchContext);
-    }
+  }
+
+  public void release() {
+    _indexSegment.release(_fetchContext);
   }
 
   @Override
@@ -64,6 +69,6 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator {
 
   @Override
   public ExecutionStatistics getExecutionStatistics() {
-    return _childOperator.getExecutionStatistics();
+    return _childOperator == null ? new ExecutionStatistics(0, 0, 0, 0) : _childOperator.getExecutionStatistics();
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index eeb27c2..89c6f1f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -146,8 +147,12 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
    */
   protected void processSegments(int taskIndex) {
     for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) {
+      Operator operator = _operators.get(operatorIndex);
       try {
-        IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock();
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+        }
+        IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) operator.nextBlock();
         if (isQuerySatisfied(resultsBlock)) {
           // Query is satisfied, skip processing the remaining segments
           _blockingQueue.offer(resultsBlock);
@@ -164,6 +169,10 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul
             e);
         _blockingQueue.offer(new IntermediateResultsBlock(e));
         return;
+      } finally {
+        if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+          ((AcquireReleaseColumnsSegmentOperator) operator).release();
+        }
       }
     }
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index b2d9373..d6c5dac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -41,6 +41,7 @@ import org.apache.pinot.core.data.table.IntermediateRecord;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
@@ -125,9 +126,12 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
    */
   @Override
   protected void processSegments(int threadIndex) {
+    Operator operator = _operators.get(threadIndex);
     try {
-      IntermediateResultsBlock intermediateResultsBlock =
-          (IntermediateResultsBlock) _operators.get(threadIndex).nextBlock();
+      if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+        ((AcquireReleaseColumnsSegmentOperator) operator).acquire();
+      }
+      IntermediateResultsBlock intermediateResultsBlock = (IntermediateResultsBlock) operator.nextBlock();
 
       _initLock.lock();
       try {
@@ -186,9 +190,12 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
       // Early-terminated because query times out or is already satisfied
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing and combining group-by order-by for index: {}, operator: {}, "
-          + "queryContext: {}", threadIndex, _operators.get(threadIndex).getClass().getName(), _queryContext, e);
+          + "queryContext: {}", threadIndex, operator.getClass().getName(), _queryContext, e);
       _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
     } finally {
+      if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
+        ((AcquireReleaseColumnsSegmentOperator) operator).release();
+      }
       _operatorLatch.countDown();
     }
   }
@@ -213,8 +220,8 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator {
     boolean opCompleted = _operatorLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
     if (!opCompleted) {
       // If this happens, the broker side should already timed out, just log the error and return
-      String errorMessage =
-          String.format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs,
+      String errorMessage = String
+          .format("Timed out while combining group-by order-by results after %dms, queryContext = %s", timeoutMs,
               _queryContext);
       LOGGER.error(errorMessage);
       return new IntermediateResultsBlock(new TimeoutException(errorMessage));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
index 5a9f506..5e517e1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java
@@ -41,6 +41,6 @@ public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode {
 
   @Override
   public AcquireReleaseColumnsSegmentOperator run() {
-    return new AcquireReleaseColumnsSegmentOperator(_childPlanNode.run(), _indexSegment, _fetchContext);
+    return new AcquireReleaseColumnsSegmentOperator(_childPlanNode, _indexSegment, _fetchContext);
   }
 }
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java
new file mode 100644
index 0000000..2b43508
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.nio.ByteOrder;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.BitmapInvertedIndexWriter;
+import org.apache.pinot.segment.spi.memory.PinotByteBuffer;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.TimeValue;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+
+@State(Scope.Benchmark)
+@Fork(value = 1, jvmArgs = {"-server", "-Xmx8G", "-XX:MaxDirectMemorySize=16G"})
+public class BenchmarkRoaringBitmapCreation {
+
+  private static final int NUM_DOCS = 1_000_000;
+  private static final int NUM_READS = 10000;
+  private static final File TEMP_DIR =
+      new File(FileUtils.getTempDirectory(), "bitmap_creation_benchmark_" + System.currentTimeMillis());
+
+  @Param({"1000", "10000", "100000"}) // 1k, 10k, 100k
+  public int _cardinality;
+
+  private int _numBitmaps;
+  private BitmapInvertedIndexWriter _bitmapInvertedIndexWriter;
+  private SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps;
+  private PinotDataBuffer _offsetBuffer;
+  private PinotDataBuffer _bitmapBuffer;
+  private int _firstOffset;
+  private int[] _dictIdsToQuery;
+
+  @Setup
+  public void setup()
+      throws IllegalAccessException, InstantiationException, IOException {
+    _numBitmaps = _cardinality;
+    File bufferDir = new File(TEMP_DIR, "cardinality_" + _cardinality);
+    FileUtils.forceMkdir(bufferDir);
+    File bufferFile = new File(bufferDir, "buffer");
+    _bitmapInvertedIndexWriter = new BitmapInvertedIndexWriter(bufferFile, _numBitmaps);
+    // Insert between 10-1000 values per bitmap
+    for (int i = 0; i < _numBitmaps; i++) {
+      int size = 10 + RandomUtils.nextInt(990);
+      int[] data = new int[size];
+      for (int j = 0; j < size; j++) {
+        data[j] = RandomUtils.nextInt(NUM_DOCS); // docIds will repeat across bitmaps, but doesn't matter for purpose of this benchmark
+      }
+      RoaringBitmap bitmap = RoaringBitmap.bitmapOf(data);
+      _bitmapInvertedIndexWriter.add(bitmap);
+    }
+    PinotDataBuffer dataBuffer = PinotByteBuffer.mapReadOnlyBigEndianFile(bufferFile);
+    long offsetBufferEndOffset = (long) (_numBitmaps + 1) * Integer.BYTES;
+    _offsetBuffer = dataBuffer.view(0, offsetBufferEndOffset, ByteOrder.BIG_ENDIAN);
+    _bitmapBuffer = dataBuffer.view(offsetBufferEndOffset, dataBuffer.size());
+    _firstOffset = _offsetBuffer.getInt(0);
+
+    // A fixed set of dictIds to read. This ensures same bitmap accessed multiple times.
+    _dictIdsToQuery = new int[100];
+    for (int i = 0; i < 100; i++) {
+      _dictIdsToQuery[i] = RandomUtils.nextInt(_cardinality);
+    }
+  }
+
+  @TearDown
+  public void teardown()
+      throws IOException {
+    _bitmapInvertedIndexWriter.close();
+    FileUtils.deleteQuietly(TEMP_DIR);
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public void cacheReferences() {
+    _bitmaps = null;
+    for (int i = 0; i < NUM_READS; i++) {
+      int dictId = _dictIdsToQuery[RandomUtils.nextInt(_dictIdsToQuery.length)];
+      getRoaringBitmapFromCache(dictId);
+    }
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MICROSECONDS)
+  public void alwaysBuild() {
+    for (int i = 0; i < NUM_READS; i++) {
+      int dictId = _dictIdsToQuery[RandomUtils.nextInt(_dictIdsToQuery.length)];
+      buildRoaringBitmap(dictId);
+    }
+  }
+
+  private ImmutableRoaringBitmap getRoaringBitmapFromCache(int dictId) {
+    SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = (_bitmaps != null) ? _bitmaps.get() : null;
+    if (bitmapArrayReference != null) {
+      SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId];
+      ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null;
+      if (bitmap != null) {
+        return bitmap;
+      }
+    } else {
+      bitmapArrayReference = new SoftReference[_numBitmaps];
+      _bitmaps = new SoftReference<>(bitmapArrayReference);
+    }
+    synchronized (this) {
+      SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId];
+      ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null;
+      if (bitmap == null) {
+        bitmap = buildRoaringBitmap(dictId);
+        bitmapArrayReference[dictId] = new SoftReference<>(bitmap);
+      }
+      return bitmap;
+    }
+  }
+
+  private ImmutableRoaringBitmap buildRoaringBitmap(int dictId) {
+    int offset = _offsetBuffer.getInt(dictId * Integer.BYTES);
+    int length = _offsetBuffer.getInt((dictId + 1) * Integer.BYTES) - offset;
+    return new ImmutableRoaringBitmap(_bitmapBuffer.toDirectByteBuffer(offset - _firstOffset, length));
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    ChainedOptionsBuilder opt = new OptionsBuilder().include(BenchmarkRoaringBitmapCreation.class.getSimpleName())
+        .warmupTime(TimeValue.seconds(10)).warmupIterations(1).measurementTime(TimeValue.seconds(5))
+        .measurementIterations(1).forks(1);
+    new Runner(opt.build()).run();
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java
index 7a60cd8..9dc6112 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java
@@ -25,18 +25,19 @@ import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 
 public class NullValueVectorReaderImpl implements NullValueVectorReader {
 
-  private final ImmutableRoaringBitmap _nullBitmap;
+  private final PinotDataBuffer _dataBuffer;
 
   public NullValueVectorReaderImpl(PinotDataBuffer dataBuffer) {
-    _nullBitmap = new ImmutableRoaringBitmap(dataBuffer.toDirectByteBuffer(0, (int) dataBuffer.size()));
+    _dataBuffer = dataBuffer;
   }
 
   public boolean isNull(int docId) {
-    return _nullBitmap.contains(docId);
+    ImmutableRoaringBitmap nullBitmap = new ImmutableRoaringBitmap(_dataBuffer.toDirectByteBuffer(0, (int) _dataBuffer.size()));
+    return nullBitmap.contains(docId);
   }
 
   @Override
   public ImmutableRoaringBitmap getNullBitmap() {
-    return _nullBitmap;
+    return  new ImmutableRoaringBitmap(_dataBuffer.toDirectByteBuffer(0, (int) _dataBuffer.size()));
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java
index a2733ce..697fda1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.segment.local.segment.index.readers;
 
 import com.google.common.base.Preconditions;
-import java.lang.ref.SoftReference;
 import java.nio.ByteBuffer;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
@@ -43,8 +42,6 @@ public class RangeIndexReaderImpl implements RangeIndexReader<ImmutableRoaringBi
   private final Number[] _rangeStartArray;
   private final Number _lastRangeEnd;
 
-  private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps;
-
   public RangeIndexReaderImpl(PinotDataBuffer dataBuffer) {
     _dataBuffer = dataBuffer;
     long offset = 0;
@@ -177,39 +174,6 @@ public class RangeIndexReaderImpl implements RangeIndexReader<ImmutableRoaringBi
   }
 
   private ImmutableRoaringBitmap getDocIds(int rangeId) {
-    SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = null;
-    // Return the bitmap if it's still on heap
-    if (_bitmaps != null) {
-      bitmapArrayReference = _bitmaps.get();
-      if (bitmapArrayReference != null) {
-        SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[rangeId];
-        if (bitmapReference != null) {
-          ImmutableRoaringBitmap value = bitmapReference.get();
-          if (value != null) {
-            return value;
-          }
-        }
-      } else {
-        bitmapArrayReference = new SoftReference[_numRanges];
-        _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
-      }
-    } else {
-      bitmapArrayReference = new SoftReference[_numRanges];
-      _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
-    }
-    synchronized (this) {
-      ImmutableRoaringBitmap value;
-      if (bitmapArrayReference[rangeId] == null || bitmapArrayReference[rangeId].get() == null) {
-        value = buildRoaringBitmapForIndex(rangeId);
-        bitmapArrayReference[rangeId] = new SoftReference<ImmutableRoaringBitmap>(value);
-      } else {
-        value = bitmapArrayReference[rangeId].get();
-      }
-      return value;
-    }
-  }
-
-  private synchronized ImmutableRoaringBitmap buildRoaringBitmapForIndex(final int rangeId) {
     final long currentOffset = getOffset(rangeId);
     final long nextOffset = getOffset(rangeId + 1);
     final int bufferLength = (int) (nextOffset - currentOffset);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org