You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/12/08 05:04:05 UTC
[incubator-pinot] branch master updated: use RoaringBitmapWriter
and direct to ByteBuffer serialization in BitmapInvertedIndexCreators
(#6320)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fe2a63c use RoaringBitmapWriter and direct to ByteBuffer serialization in BitmapInvertedIndexCreators (#6320)
fe2a63c is described below
commit fe2a63ca6d21015cbb09500f00d98a702338d488
Author: Richard Startin <ri...@datadoghq.com>
AuthorDate: Tue Dec 8 05:03:45 2020 +0000
use RoaringBitmapWriter and direct to ByteBuffer serialization in BitmapInvertedIndexCreators (#6320)
This PR uses some more efficient APIs from RoaringBitmap.
- `RoaringBitmapWriter` is optimized for writing row ids into the bitmap sequentially. This class has a few options to control memory usage, but I've used its defaults.
- A `RoaringBitmap` can be serialized to a `ByteBuffer`, which is generally a lot faster than a `DataOutput`. However, this requires mapping a file outside of the control of `PinotByteBuffer`. The buffer also has to be big endian for backward compatibility with `DataOutput`.
---
.../inv/OffHeapBitmapInvertedIndexCreator.java | 59 +++++++-----
.../impl/inv/OnHeapBitmapInvertedIndexCreator.java | 63 ++++++++-----
...BenchmarkOffheapBitmapInvertedIndexCreator.java | 105 +++++++++++++++++++++
3 files changed, 181 insertions(+), 46 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
index b43b330..1e72bc1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
@@ -18,19 +18,22 @@
*/
package org.apache.pinot.core.segment.creator.impl.inv;
-import com.google.common.base.Preconditions;
-import java.io.BufferedOutputStream;
import java.io.Closeable;
-import java.io.DataOutputStream;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+
import org.apache.commons.io.FileUtils;
import org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.util.CleanerUtil;
import org.apache.pinot.spi.data.FieldSpec;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.RoaringBitmapWriter;
/**
@@ -181,33 +184,45 @@ public final class OffHeapBitmapInvertedIndexCreator implements DictionaryBasedI
}
// Create bitmaps from inverted index buffers and serialize them to file
- try (DataOutputStream offsetDataStream = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)));
- FileOutputStream bitmapFileStream = new FileOutputStream(_invertedIndexFile);
- DataOutputStream bitmapDataStream = new DataOutputStream(new BufferedOutputStream(bitmapFileStream))) {
- int bitmapOffset = (_cardinality + 1) * Integer.BYTES;
- offsetDataStream.writeInt(bitmapOffset);
- bitmapFileStream.getChannel().position(bitmapOffset);
-
+ ByteBuffer offsetBuffer = null;
+ ByteBuffer bitmapBuffer = null;
+ try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, "rw").getChannel()) {
+ // map the offsets buffer
+ int startOfBitmaps = (_cardinality + 1) * Integer.BYTES;
+ offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, startOfBitmaps)
+ .order(ByteOrder.BIG_ENDIAN);
+ offsetBuffer.putInt(startOfBitmaps);
+ bitmapBuffer = channel.map(FileChannel.MapMode.READ_WRITE, startOfBitmaps, Integer.MAX_VALUE - startOfBitmaps)
+ .order(ByteOrder.LITTLE_ENDIAN);
+ RoaringBitmapWriter<RoaringBitmap> writer = RoaringBitmapWriter.writer().runCompress(false).get();
int startIndex = 0;
for (int dictId = 0; dictId < _cardinality; dictId++) {
- MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
int endIndex = getInt(_invertedIndexLengthBuffer, dictId);
for (int i = startIndex; i < endIndex; i++) {
- bitmap.add(getInt(_invertedIndexValueBuffer, i));
+ writer.add(getInt(_invertedIndexValueBuffer, i));
}
+ RoaringBitmap bitmap = writer.get();
+ bitmap.serialize(bitmapBuffer);
+ // write offset into file
+ offsetBuffer.putInt(startOfBitmaps + bitmapBuffer.position());
startIndex = endIndex;
-
- // Write offset and bitmap into file
- bitmapOffset += bitmap.serializedSizeInBytes();
- // Check for int overflow
- Preconditions.checkState(bitmapOffset > 0, "Inverted index file: %s exceeds 2GB limit", _invertedIndexFile);
- offsetDataStream.writeInt(bitmapOffset);
- bitmap.serialize(bitmapDataStream);
+ writer.reset();
}
+ // we know how long the file should be now, so truncate it
+ channel.truncate(startOfBitmaps + bitmapBuffer.position());
} catch (Exception e) {
FileUtils.deleteQuietly(_invertedIndexFile);
throw e;
+ } finally {
+ if (CleanerUtil.UNMAP_SUPPORTED) {
+ CleanerUtil.BufferCleaner cleaner = CleanerUtil.getCleaner();
+ if (offsetBuffer != null) {
+ cleaner.freeBuffer(offsetBuffer);
+ }
+ if (bitmapBuffer != null) {
+ cleaner.freeBuffer(bitmapBuffer);
+ }
+ }
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
index 84c9929..44f4588 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
@@ -18,16 +18,20 @@
*/
package org.apache.pinot.core.segment.creator.impl.inv;
-import com.google.common.base.Preconditions;
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
+
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+
import org.apache.commons.io.FileUtils;
import org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.apache.pinot.core.util.CleanerUtil;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.RoaringBitmapWriter;
/**
@@ -35,26 +39,27 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
*/
public final class OnHeapBitmapInvertedIndexCreator implements DictionaryBasedInvertedIndexCreator {
private final File _invertedIndexFile;
- private final MutableRoaringBitmap[] _bitmaps;
+ private final RoaringBitmapWriter<RoaringBitmap>[] _bitmapWriters;
private int _nextDocId;
+ @SuppressWarnings("unchecked")
public OnHeapBitmapInvertedIndexCreator(File indexDir, String columnName, int cardinality) {
_invertedIndexFile = new File(indexDir, columnName + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION);
- _bitmaps = new MutableRoaringBitmap[cardinality];
+ _bitmapWriters = new RoaringBitmapWriter[cardinality];
for (int i = 0; i < cardinality; i++) {
- _bitmaps[i] = new MutableRoaringBitmap();
+ _bitmapWriters[i] = RoaringBitmapWriter.writer().runCompress(false).get();
}
}
@Override
public void add(int dictId) {
- _bitmaps[dictId].add(_nextDocId++);
+ _bitmapWriters[dictId].add(_nextDocId++);
}
@Override
public void add(int[] dictIds, int length) {
for (int i = 0; i < length; i++) {
- _bitmaps[dictIds[i]].add(_nextDocId);
+ _bitmapWriters[dictIds[i]].add(_nextDocId);
}
_nextDocId++;
}
@@ -62,25 +67,35 @@ public final class OnHeapBitmapInvertedIndexCreator implements DictionaryBasedIn
@Override
public void seal()
throws IOException {
- try (DataOutputStream out = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)))) {
+ int startOfBitmaps = (_bitmapWriters.length + 1) * Integer.BYTES;
+ ByteBuffer bitmapBuffer = null;
+ ByteBuffer offsetBuffer = null;
+ try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, "rw").getChannel()) {
+ // map the offsets buffer
+ offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, startOfBitmaps)
+ .order(ByteOrder.BIG_ENDIAN);
+ bitmapBuffer = channel.map(FileChannel.MapMode.READ_WRITE, startOfBitmaps, Integer.MAX_VALUE - startOfBitmaps)
+ .order(ByteOrder.LITTLE_ENDIAN);
// Write bitmap offsets
- int bitmapOffset = (_bitmaps.length + 1) * Integer.BYTES;
- out.writeInt(bitmapOffset);
- for (MutableRoaringBitmap bitmap : _bitmaps) {
- bitmapOffset += bitmap.serializedSizeInBytes();
- // Check for int overflow
- Preconditions.checkState(bitmapOffset > 0, "Inverted index file: %s exceeds 2GB limit", _invertedIndexFile);
- out.writeInt(bitmapOffset);
- }
-
- // Write bitmap data
- for (MutableRoaringBitmap bitmap : _bitmaps) {
- bitmap.serialize(out);
+ offsetBuffer.putInt(startOfBitmaps);
+ for (RoaringBitmapWriter<RoaringBitmap> writer : _bitmapWriters) {
+ writer.get().serialize(bitmapBuffer);
+ offsetBuffer.putInt(startOfBitmaps + bitmapBuffer.position());
}
+ channel.truncate(startOfBitmaps + bitmapBuffer.position());
} catch (Exception e) {
FileUtils.deleteQuietly(_invertedIndexFile);
throw e;
+ } finally {
+ if (CleanerUtil.UNMAP_SUPPORTED) {
+ CleanerUtil.BufferCleaner cleaner = CleanerUtil.getCleaner();
+ if (bitmapBuffer != null) {
+ cleaner.freeBuffer(bitmapBuffer);
+ }
+ if (offsetBuffer != null) {
+ cleaner.freeBuffer(offsetBuffer);
+ }
+ }
}
}
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOffheapBitmapInvertedIndexCreator.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOffheapBitmapInvertedIndexCreator.java
new file mode 100644
index 0000000..0979c98
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOffheapBitmapInvertedIndexCreator.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.perf;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+@State(Scope.Benchmark)
+public class BenchmarkOffheapBitmapInvertedIndexCreator {
+
+ public enum Assignment {
+ ROUND_ROBIN {
+ @Override
+ void assign(OffHeapBitmapInvertedIndexCreator creator, int docs, int cardinality) {
+ for (int i = 0; i < docs; ++i) {
+ creator.add(i % cardinality);
+ }
+ }
+ },
+ SORTED_UNIFORM {
+ @Override
+ void assign(OffHeapBitmapInvertedIndexCreator creator, int docs, int cardinality) {
+ for (int i = 0; i < cardinality; ++i) {
+ for (int j = 0; j < docs / cardinality; ++j) {
+ creator.add(i);
+ }
+ }
+ }
+ };
+
+ abstract void assign(OffHeapBitmapInvertedIndexCreator creator, int docs, int cardinality);
+ }
+
+ private Path indexDir;
+ @Param({"10", "1000", "10000"})
+ int cardinality;
+
+ @Param({"1000000", "10000000", "100000000"})
+ int numDocs;
+
+ @Param
+ Assignment assignment;
+
+ private OffHeapBitmapInvertedIndexCreator creator;
+
+ @Setup(Level.Invocation)
+ public void setup() throws IOException {
+ indexDir = Files.createTempDirectory("index");
+ creator = new OffHeapBitmapInvertedIndexCreator(
+ indexDir.toFile(), new DimensionFieldSpec("foo", FieldSpec.DataType.STRING, true),
+ cardinality, numDocs, -1);
+ assignment.assign(creator, numDocs, cardinality);
+ }
+
+ @TearDown(Level.Invocation)
+ public void tearDown() throws IOException {
+ if (null != indexDir) {
+ FileUtils.deleteDirectory(indexDir.toFile());
+ }
+ creator.close();
+ }
+
+ @Benchmark
+ public Object seal() throws IOException {
+ creator.seal();
+ return creator;
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ ChainedOptionsBuilder opt =
+ new OptionsBuilder().include(BenchmarkOffheapBitmapInvertedIndexCreator.class.getSimpleName())
+ .mode(Mode.SingleShotTime)
+ .warmupIterations(8).measurementIterations(8).forks(5);
+
+ new Runner(opt.build()).run();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org