You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/12/05 20:50:16 UTC

[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6320: More efficient use of RoaringBitmap in OnHeapBitmapInvertedIndexCreator and OffHeapBitmapInvertedIndexCreator

Jackie-Jiang commented on a change in pull request #6320:
URL: https://github.com/apache/incubator-pinot/pull/6320#discussion_r536887337



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
##########
@@ -19,18 +19,25 @@
 package org.apache.pinot.core.segment.creator.impl.inv;
 
 import com.google.common.base.Preconditions;
-import java.io.BufferedOutputStream;
+
 import java.io.Closeable;
-import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.util.CleanerUtil;
 import org.apache.pinot.spi.data.FieldSpec;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.RoaringBitmapWriter;
+
+import static java.lang.Integer.reverseBytes;

Review comment:
       Don't use static import in production class

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
##########
@@ -181,33 +188,51 @@ public void seal()
     }
 
     // Create bitmaps from inverted index buffers and serialize them to file
-    try (DataOutputStream offsetDataStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)));
-        FileOutputStream bitmapFileStream = new FileOutputStream(_invertedIndexFile);
-        DataOutputStream bitmapDataStream = new DataOutputStream(new BufferedOutputStream(bitmapFileStream))) {
-      int bitmapOffset = (_cardinality + 1) * Integer.BYTES;
-      offsetDataStream.writeInt(bitmapOffset);
-      bitmapFileStream.getChannel().position(bitmapOffset);
-
+    ByteBuffer offsetBuffer = null;
+    ByteBuffer bitmapBuffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, "rw").getChannel()) {
+      // map the offsets buffer
+      final int startOfBitmaps = (_cardinality + 1) * Integer.BYTES;
+      int bitmapOffset = startOfBitmaps;
+      offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, bitmapOffset).order(LITTLE_ENDIAN);
+      offsetBuffer.putInt(reverseBytes(bitmapOffset));
+      RoaringBitmap[] bitmaps = new RoaringBitmap[_cardinality];

Review comment:
       For off-heap creator, we don't want to keep all bitmaps on heap. We should try to create and serialize the bitmaps one by one. Not sure about the cost if we map a buffer per bitmap

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
##########
@@ -181,33 +188,51 @@ public void seal()
     }
 
     // Create bitmaps from inverted index buffers and serialize them to file
-    try (DataOutputStream offsetDataStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)));
-        FileOutputStream bitmapFileStream = new FileOutputStream(_invertedIndexFile);
-        DataOutputStream bitmapDataStream = new DataOutputStream(new BufferedOutputStream(bitmapFileStream))) {
-      int bitmapOffset = (_cardinality + 1) * Integer.BYTES;
-      offsetDataStream.writeInt(bitmapOffset);
-      bitmapFileStream.getChannel().position(bitmapOffset);
-
+    ByteBuffer offsetBuffer = null;
+    ByteBuffer bitmapBuffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, "rw").getChannel()) {
+      // map the offsets buffer
+      final int startOfBitmaps = (_cardinality + 1) * Integer.BYTES;
+      int bitmapOffset = startOfBitmaps;
+      offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, bitmapOffset).order(LITTLE_ENDIAN);

Review comment:
       The offset buffer does not have to be LE, as all the values are written as BE

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
##########
@@ -181,33 +188,51 @@ public void seal()
     }
 
     // Create bitmaps from inverted index buffers and serialize them to file
-    try (DataOutputStream offsetDataStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)));
-        FileOutputStream bitmapFileStream = new FileOutputStream(_invertedIndexFile);
-        DataOutputStream bitmapDataStream = new DataOutputStream(new BufferedOutputStream(bitmapFileStream))) {
-      int bitmapOffset = (_cardinality + 1) * Integer.BYTES;
-      offsetDataStream.writeInt(bitmapOffset);
-      bitmapFileStream.getChannel().position(bitmapOffset);
-
+    ByteBuffer offsetBuffer = null;
+    ByteBuffer bitmapBuffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, "rw").getChannel()) {
+      // map the offsets buffer
+      final int startOfBitmaps = (_cardinality + 1) * Integer.BYTES;

Review comment:
       (nit) we don't usually use final for local variable

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
##########
@@ -181,33 +188,51 @@ public void seal()
     }
 
     // Create bitmaps from inverted index buffers and serialize them to file
-    try (DataOutputStream offsetDataStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)));
-        FileOutputStream bitmapFileStream = new FileOutputStream(_invertedIndexFile);
-        DataOutputStream bitmapDataStream = new DataOutputStream(new BufferedOutputStream(bitmapFileStream))) {
-      int bitmapOffset = (_cardinality + 1) * Integer.BYTES;
-      offsetDataStream.writeInt(bitmapOffset);
-      bitmapFileStream.getChannel().position(bitmapOffset);
-
+    ByteBuffer offsetBuffer = null;
+    ByteBuffer bitmapBuffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, "rw").getChannel()) {
+      // map the offsets buffer
+      final int startOfBitmaps = (_cardinality + 1) * Integer.BYTES;
+      int bitmapOffset = startOfBitmaps;
+      offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, bitmapOffset).order(LITTLE_ENDIAN);
+      offsetBuffer.putInt(reverseBytes(bitmapOffset));
+      RoaringBitmap[] bitmaps = new RoaringBitmap[_cardinality];

Review comment:
       Do `RoaringBitmap` and `MutableRoaringBitmap` serialize to the same bytes?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
##########
@@ -181,33 +188,51 @@ public void seal()
     }
 
     // Create bitmaps from inverted index buffers and serialize them to file
-    try (DataOutputStream offsetDataStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)));
-        FileOutputStream bitmapFileStream = new FileOutputStream(_invertedIndexFile);
-        DataOutputStream bitmapDataStream = new DataOutputStream(new BufferedOutputStream(bitmapFileStream))) {
-      int bitmapOffset = (_cardinality + 1) * Integer.BYTES;
-      offsetDataStream.writeInt(bitmapOffset);
-      bitmapFileStream.getChannel().position(bitmapOffset);
-
+    ByteBuffer offsetBuffer = null;
+    ByteBuffer bitmapBuffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, "rw").getChannel()) {
+      // map the offsets buffer
+      final int startOfBitmaps = (_cardinality + 1) * Integer.BYTES;
+      int bitmapOffset = startOfBitmaps;
+      offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, bitmapOffset).order(LITTLE_ENDIAN);
+      offsetBuffer.putInt(reverseBytes(bitmapOffset));
+      RoaringBitmap[] bitmaps = new RoaringBitmap[_cardinality];
+      RoaringBitmapWriter<RoaringBitmap> writer = RoaringBitmapWriter.writer()
+              .initialCapacity(((_nextDocId - 1) >>> 16) / _cardinality).get();

Review comment:
       Why do you need to divide it with `_cardinality`? Though this should be `.expectedRange(0, _nextDocId)`?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
##########
@@ -181,33 +188,51 @@ public void seal()
     }
 
     // Create bitmaps from inverted index buffers and serialize them to file
-    try (DataOutputStream offsetDataStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)));
-        FileOutputStream bitmapFileStream = new FileOutputStream(_invertedIndexFile);
-        DataOutputStream bitmapDataStream = new DataOutputStream(new BufferedOutputStream(bitmapFileStream))) {
-      int bitmapOffset = (_cardinality + 1) * Integer.BYTES;
-      offsetDataStream.writeInt(bitmapOffset);
-      bitmapFileStream.getChannel().position(bitmapOffset);
-
+    ByteBuffer offsetBuffer = null;
+    ByteBuffer bitmapBuffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, "rw").getChannel()) {
+      // map the offsets buffer
+      final int startOfBitmaps = (_cardinality + 1) * Integer.BYTES;
+      int bitmapOffset = startOfBitmaps;
+      offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, bitmapOffset).order(LITTLE_ENDIAN);
+      offsetBuffer.putInt(reverseBytes(bitmapOffset));
+      RoaringBitmap[] bitmaps = new RoaringBitmap[_cardinality];
+      RoaringBitmapWriter<RoaringBitmap> writer = RoaringBitmapWriter.writer()
+              .initialCapacity(((_nextDocId - 1) >>> 16) / _cardinality).get();
       int startIndex = 0;
       for (int dictId = 0; dictId < _cardinality; dictId++) {
-        MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
         int endIndex = getInt(_invertedIndexLengthBuffer, dictId);
         for (int i = startIndex; i < endIndex; i++) {
-          bitmap.add(getInt(_invertedIndexValueBuffer, i));
+          writer.add(getInt(_invertedIndexValueBuffer, i));
         }
-        startIndex = endIndex;
-
-        // Write offset and bitmap into file
-        bitmapOffset += bitmap.serializedSizeInBytes();
+        bitmaps[dictId] = writer.get();
+        writer.reset();
+        int serializedSize = bitmaps[dictId].serializedSizeInBytes();
+        bitmapOffset += serializedSize;
         // Check for int overflow
         Preconditions.checkState(bitmapOffset > 0, "Inverted index file: %s exceeds 2GB limit", _invertedIndexFile);
-        offsetDataStream.writeInt(bitmapOffset);
-        bitmap.serialize(bitmapDataStream);
+        // write offset into file
+        offsetBuffer.putInt(reverseBytes(bitmapOffset));
+        startIndex = endIndex;
+      }
+      // we know how long the file should be now, so can map it
+      bitmapBuffer = channel.map(FileChannel.MapMode.READ_WRITE, startOfBitmaps, bitmapOffset - startOfBitmaps);
+      for (RoaringBitmap bitmap : bitmaps) {
+        bitmap.serialize(bitmapBuffer);
       }
     } catch (Exception e) {
       FileUtils.deleteQuietly(_invertedIndexFile);
       throw e;
+    } finally {
+      if (CleanerUtil.UNMAP_SUPPORTED) {
+        CleanerUtil.BufferCleaner cleaner = CleanerUtil.getCleaner();
+        if (null != offsetBuffer) {

Review comment:
       (nit) same for other places
   ```suggestion
           if (offsetBuffer != null) {
   ```

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
##########
@@ -19,68 +19,85 @@
 package org.apache.pinot.core.segment.creator.impl.inv;
 
 import com.google.common.base.Preconditions;
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
+
+
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.util.CleanerUtil;
+import org.roaringbitmap.RoaringBitmapWriter;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
+import static java.nio.ByteOrder.LITTLE_ENDIAN;
+
 
 /**
  * Implementation of {@link DictionaryBasedInvertedIndexCreator} that uses on-heap memory.
  */
 public final class OnHeapBitmapInvertedIndexCreator implements DictionaryBasedInvertedIndexCreator {
   private final File _invertedIndexFile;
-  private final MutableRoaringBitmap[] _bitmaps;
+  private final RoaringBitmapWriter<MutableRoaringBitmap>[] _bitmapWriters;
   private int _nextDocId;
 
+  @SuppressWarnings("unchecked")
   public OnHeapBitmapInvertedIndexCreator(File indexDir, String columnName, int cardinality) {
     _invertedIndexFile = new File(indexDir, columnName + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION);
-    _bitmaps = new MutableRoaringBitmap[cardinality];
+    _bitmapWriters = new RoaringBitmapWriter[cardinality];
     for (int i = 0; i < cardinality; i++) {
-      _bitmaps[i] = new MutableRoaringBitmap();
+      _bitmapWriters[i] = RoaringBitmapWriter.bufferWriter().get();
     }
   }
 
   @Override
   public void add(int dictId) {
-    _bitmaps[dictId].add(_nextDocId++);
+    _bitmapWriters[dictId].add(_nextDocId++);
   }
 
   @Override
   public void add(int[] dictIds, int length) {
     for (int i = 0; i < length; i++) {
-      _bitmaps[dictIds[i]].add(_nextDocId);
+      _bitmapWriters[dictIds[i]].add(_nextDocId);
     }
     _nextDocId++;
   }
 
   @Override
   public void seal()
       throws IOException {
-    try (DataOutputStream out = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)))) {
+    // calculate file size
+    int size = (_bitmapWriters.length + 1) * Integer.BYTES;
+    for (RoaringBitmapWriter<MutableRoaringBitmap> writer : _bitmapWriters) {
+      size += writer.get().serializedSizeInBytes();
+      // Check for int overflow
+      Preconditions.checkState(size > 0, "Inverted index file: %s exceeds 2GB limit", _invertedIndexFile);
+    }
+    ByteBuffer buffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, "rw").getChannel()) {
+      buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, size).order(LITTLE_ENDIAN);
       // Write bitmap offsets
-      int bitmapOffset = (_bitmaps.length + 1) * Integer.BYTES;
-      out.writeInt(bitmapOffset);
-      for (MutableRoaringBitmap bitmap : _bitmaps) {
-        bitmapOffset += bitmap.serializedSizeInBytes();
-        // Check for int overflow
-        Preconditions.checkState(bitmapOffset > 0, "Inverted index file: %s exceeds 2GB limit", _invertedIndexFile);
-        out.writeInt(bitmapOffset);
+      int bitmapOffset = (_bitmapWriters.length + 1) * Integer.BYTES;
+      buffer.putInt(Integer.reverseBytes(bitmapOffset));
+      for (RoaringBitmapWriter<MutableRoaringBitmap> writer : _bitmapWriters) {
+        bitmapOffset += writer.getUnderlying().serializedSizeInBytes();

Review comment:
       Do you need to `flush()` before `getUnderlying()`? Or just use `get()` to retrieve the bitmap?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
##########
@@ -19,68 +19,85 @@
 package org.apache.pinot.core.segment.creator.impl.inv;
 
 import com.google.common.base.Preconditions;
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
+
+
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.util.CleanerUtil;
+import org.roaringbitmap.RoaringBitmapWriter;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
+import static java.nio.ByteOrder.LITTLE_ENDIAN;
+
 
 /**
  * Implementation of {@link DictionaryBasedInvertedIndexCreator} that uses on-heap memory.
  */
 public final class OnHeapBitmapInvertedIndexCreator implements DictionaryBasedInvertedIndexCreator {
   private final File _invertedIndexFile;
-  private final MutableRoaringBitmap[] _bitmaps;
+  private final RoaringBitmapWriter<MutableRoaringBitmap>[] _bitmapWriters;
   private int _nextDocId;
 
+  @SuppressWarnings("unchecked")
   public OnHeapBitmapInvertedIndexCreator(File indexDir, String columnName, int cardinality) {
     _invertedIndexFile = new File(indexDir, columnName + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION);
-    _bitmaps = new MutableRoaringBitmap[cardinality];
+    _bitmapWriters = new RoaringBitmapWriter[cardinality];
     for (int i = 0; i < cardinality; i++) {
-      _bitmaps[i] = new MutableRoaringBitmap();
+      _bitmapWriters[i] = RoaringBitmapWriter.bufferWriter().get();
     }
   }
 
   @Override
   public void add(int dictId) {
-    _bitmaps[dictId].add(_nextDocId++);
+    _bitmapWriters[dictId].add(_nextDocId++);
   }
 
   @Override
   public void add(int[] dictIds, int length) {
     for (int i = 0; i < length; i++) {
-      _bitmaps[dictIds[i]].add(_nextDocId);
+      _bitmapWriters[dictIds[i]].add(_nextDocId);
     }
     _nextDocId++;
   }
 
   @Override
   public void seal()
       throws IOException {
-    try (DataOutputStream out = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)))) {
+    // calculate file size
+    int size = (_bitmapWriters.length + 1) * Integer.BYTES;
+    for (RoaringBitmapWriter<MutableRoaringBitmap> writer : _bitmapWriters) {
+      size += writer.get().serializedSizeInBytes();
+      // Check for int overflow
+      Preconditions.checkState(size > 0, "Inverted index file: %s exceeds 2GB limit", _invertedIndexFile);
+    }
+    ByteBuffer buffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, "rw").getChannel()) {
+      buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, size).order(LITTLE_ENDIAN);

Review comment:
       Maybe keeping 2 buffers similar to the off-heap one?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org