You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/12/08 05:04:05 UTC

[incubator-pinot] branch master updated: use RoaringBitmapWriter and direct to ByteBuffer serialization in BitmapInvertedIndexCreators (#6320)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fe2a63c  use RoaringBitmapWriter and direct to ByteBuffer serialization in BitmapInvertedIndexCreators (#6320)
fe2a63c is described below

commit fe2a63ca6d21015cbb09500f00d98a702338d488
Author: Richard Startin <ri...@datadoghq.com>
AuthorDate: Tue Dec 8 05:03:45 2020 +0000

    use RoaringBitmapWriter and direct to ByteBuffer serialization in BitmapInvertedIndexCreators (#6320)
    
    This PR uses some more efficient APIs from RoaringBitmap.
    - `RoaringBitmapWriter` is optimized for writing row ids into the bitmap sequentially. This class has a few options to control memory usage, but I've used its defaults.
    - A `RoaringBitmap` can be serialized to a `ByteBuffer`, which is generally a lot faster than a `DataOutput`. However, this requires mapping a file outside of the control of `PinotByteBuffer`. The buffer also has to be big endian for backward compatibility with `DataOutput`.
---
 .../inv/OffHeapBitmapInvertedIndexCreator.java     |  59 +++++++-----
 .../impl/inv/OnHeapBitmapInvertedIndexCreator.java |  63 ++++++++-----
 ...BenchmarkOffheapBitmapInvertedIndexCreator.java | 105 +++++++++++++++++++++
 3 files changed, 181 insertions(+), 46 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
index b43b330..1e72bc1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
@@ -18,19 +18,22 @@
  */
 package org.apache.pinot.core.segment.creator.impl.inv;
 
-import com.google.common.base.Preconditions;
-import java.io.BufferedOutputStream;
 import java.io.Closeable;
-import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.util.CleanerUtil;
 import org.apache.pinot.spi.data.FieldSpec;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.RoaringBitmapWriter;
 
 
 /**
@@ -181,33 +184,45 @@ public final class OffHeapBitmapInvertedIndexCreator implements DictionaryBasedI
     }
 
     // Create bitmaps from inverted index buffers and serialize them to file
-    try (DataOutputStream offsetDataStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)));
-        FileOutputStream bitmapFileStream = new FileOutputStream(_invertedIndexFile);
-        DataOutputStream bitmapDataStream = new DataOutputStream(new BufferedOutputStream(bitmapFileStream))) {
-      int bitmapOffset = (_cardinality + 1) * Integer.BYTES;
-      offsetDataStream.writeInt(bitmapOffset);
-      bitmapFileStream.getChannel().position(bitmapOffset);
-
+    ByteBuffer offsetBuffer = null;
+    ByteBuffer bitmapBuffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, "rw").getChannel()) {
+      // map the offsets buffer
+      int startOfBitmaps = (_cardinality + 1) * Integer.BYTES;
+      offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, startOfBitmaps)
+              .order(ByteOrder.BIG_ENDIAN);
+      offsetBuffer.putInt(startOfBitmaps);
+      bitmapBuffer = channel.map(FileChannel.MapMode.READ_WRITE, startOfBitmaps, Integer.MAX_VALUE - startOfBitmaps)
+              .order(ByteOrder.LITTLE_ENDIAN);
+      RoaringBitmapWriter<RoaringBitmap> writer = RoaringBitmapWriter.writer().runCompress(false).get();
       int startIndex = 0;
       for (int dictId = 0; dictId < _cardinality; dictId++) {
-        MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
         int endIndex = getInt(_invertedIndexLengthBuffer, dictId);
         for (int i = startIndex; i < endIndex; i++) {
-          bitmap.add(getInt(_invertedIndexValueBuffer, i));
+          writer.add(getInt(_invertedIndexValueBuffer, i));
         }
+        RoaringBitmap bitmap = writer.get();
+        bitmap.serialize(bitmapBuffer);
+        // write offset into file
+        offsetBuffer.putInt(startOfBitmaps + bitmapBuffer.position());
         startIndex = endIndex;
-
-        // Write offset and bitmap into file
-        bitmapOffset += bitmap.serializedSizeInBytes();
-        // Check for int overflow
-        Preconditions.checkState(bitmapOffset > 0, "Inverted index file: %s exceeds 2GB limit", _invertedIndexFile);
-        offsetDataStream.writeInt(bitmapOffset);
-        bitmap.serialize(bitmapDataStream);
+        writer.reset();
       }
+      // we know how long the file should be now, so truncate it
+      channel.truncate(startOfBitmaps + bitmapBuffer.position());
     } catch (Exception e) {
       FileUtils.deleteQuietly(_invertedIndexFile);
       throw e;
+    } finally {
+      if (CleanerUtil.UNMAP_SUPPORTED) {
+        CleanerUtil.BufferCleaner cleaner = CleanerUtil.getCleaner();
+        if (offsetBuffer != null) {
+          cleaner.freeBuffer(offsetBuffer);
+        }
+        if (bitmapBuffer != null) {
+          cleaner.freeBuffer(bitmapBuffer);
+        }
+      }
     }
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
index 84c9929..44f4588 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
@@ -18,16 +18,20 @@
  */
 package org.apache.pinot.core.segment.creator.impl.inv;
 
-import com.google.common.base.Preconditions;
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
+
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.apache.pinot.core.util.CleanerUtil;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.RoaringBitmapWriter;
 
 
 /**
@@ -35,26 +39,27 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
  */
 public final class OnHeapBitmapInvertedIndexCreator implements DictionaryBasedInvertedIndexCreator {
   private final File _invertedIndexFile;
-  private final MutableRoaringBitmap[] _bitmaps;
+  private final RoaringBitmapWriter<RoaringBitmap>[] _bitmapWriters;
   private int _nextDocId;
 
+  @SuppressWarnings("unchecked")
   public OnHeapBitmapInvertedIndexCreator(File indexDir, String columnName, int cardinality) {
     _invertedIndexFile = new File(indexDir, columnName + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION);
-    _bitmaps = new MutableRoaringBitmap[cardinality];
+    _bitmapWriters = new RoaringBitmapWriter[cardinality];
     for (int i = 0; i < cardinality; i++) {
-      _bitmaps[i] = new MutableRoaringBitmap();
+      _bitmapWriters[i] = RoaringBitmapWriter.writer().runCompress(false).get();
     }
   }
 
   @Override
   public void add(int dictId) {
-    _bitmaps[dictId].add(_nextDocId++);
+    _bitmapWriters[dictId].add(_nextDocId++);
   }
 
   @Override
   public void add(int[] dictIds, int length) {
     for (int i = 0; i < length; i++) {
-      _bitmaps[dictIds[i]].add(_nextDocId);
+      _bitmapWriters[dictIds[i]].add(_nextDocId);
     }
     _nextDocId++;
   }
@@ -62,25 +67,35 @@ public final class OnHeapBitmapInvertedIndexCreator implements DictionaryBasedIn
   @Override
   public void seal()
       throws IOException {
-    try (DataOutputStream out = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)))) {
+    int startOfBitmaps = (_bitmapWriters.length + 1) * Integer.BYTES;
+    ByteBuffer bitmapBuffer = null;
+    ByteBuffer offsetBuffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, "rw").getChannel()) {
+      // map the offsets buffer
+      offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, startOfBitmaps)
+              .order(ByteOrder.BIG_ENDIAN);
+      bitmapBuffer = channel.map(FileChannel.MapMode.READ_WRITE, startOfBitmaps, Integer.MAX_VALUE - startOfBitmaps)
+              .order(ByteOrder.LITTLE_ENDIAN);
       // Write bitmap offsets
-      int bitmapOffset = (_bitmaps.length + 1) * Integer.BYTES;
-      out.writeInt(bitmapOffset);
-      for (MutableRoaringBitmap bitmap : _bitmaps) {
-        bitmapOffset += bitmap.serializedSizeInBytes();
-        // Check for int overflow
-        Preconditions.checkState(bitmapOffset > 0, "Inverted index file: %s exceeds 2GB limit", _invertedIndexFile);
-        out.writeInt(bitmapOffset);
-      }
-
-      // Write bitmap data
-      for (MutableRoaringBitmap bitmap : _bitmaps) {
-        bitmap.serialize(out);
+      offsetBuffer.putInt(startOfBitmaps);
+      for (RoaringBitmapWriter<RoaringBitmap> writer : _bitmapWriters) {
+        writer.get().serialize(bitmapBuffer);
+        offsetBuffer.putInt(startOfBitmaps + bitmapBuffer.position());
       }
+      channel.truncate(startOfBitmaps + bitmapBuffer.position());
     } catch (Exception e) {
       FileUtils.deleteQuietly(_invertedIndexFile);
       throw e;
+    } finally {
+      if (CleanerUtil.UNMAP_SUPPORTED) {
+        CleanerUtil.BufferCleaner cleaner = CleanerUtil.getCleaner();
+        if (bitmapBuffer != null) {
+          cleaner.freeBuffer(bitmapBuffer);
+        }
+        if (offsetBuffer != null) {
+          cleaner.freeBuffer(offsetBuffer);
+        }
+      }
     }
   }
 
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOffheapBitmapInvertedIndexCreator.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOffheapBitmapInvertedIndexCreator.java
new file mode 100644
index 0000000..0979c98
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOffheapBitmapInvertedIndexCreator.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.perf;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+@State(Scope.Benchmark)
+public class BenchmarkOffheapBitmapInvertedIndexCreator {
+
+  public enum Assignment {
+    ROUND_ROBIN {
+      @Override
+      void assign(OffHeapBitmapInvertedIndexCreator creator, int docs, int cardinality) {
+        for (int i = 0; i < docs; ++i) {
+          creator.add(i % cardinality);
+        }
+      }
+    },
+    SORTED_UNIFORM {
+      @Override
+      void assign(OffHeapBitmapInvertedIndexCreator creator, int docs, int cardinality) {
+        for (int i = 0; i < cardinality; ++i) {
+          for (int j = 0; j < docs / cardinality; ++j) {
+            creator.add(i);
+          }
+        }
+      }
+    };
+
+    abstract void assign(OffHeapBitmapInvertedIndexCreator creator, int docs, int cardinality);
+  }
+
+  private Path indexDir;
+  @Param({"10", "1000", "10000"})
+  int cardinality;
+
+  @Param({"1000000", "10000000", "100000000"})
+  int numDocs;
+
+  @Param
+  Assignment assignment;
+
+  private OffHeapBitmapInvertedIndexCreator creator;
+
+  @Setup(Level.Invocation)
+  public void setup() throws IOException {
+    indexDir = Files.createTempDirectory("index");
+    creator = new OffHeapBitmapInvertedIndexCreator(
+            indexDir.toFile(), new DimensionFieldSpec("foo", FieldSpec.DataType.STRING, true),
+            cardinality, numDocs, -1);
+    assignment.assign(creator, numDocs, cardinality);
+  }
+
+  @TearDown(Level.Invocation)
+  public void tearDown() throws IOException {
+    if (null != indexDir) {
+      FileUtils.deleteDirectory(indexDir.toFile());
+    }
+    creator.close();
+  }
+
+  @Benchmark
+  public Object seal() throws IOException {
+    creator.seal();
+    return creator;
+  }
+
+  public static void main(String[] args)
+          throws Exception {
+    ChainedOptionsBuilder opt =
+            new OptionsBuilder().include(BenchmarkOffheapBitmapInvertedIndexCreator.class.getSimpleName())
+                    .mode(Mode.SingleShotTime)
+                    .warmupIterations(8).measurementIterations(8).forks(5);
+
+    new Runner(opt.build()).run();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org