You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/08/06 04:25:12 UTC

[incubator-pinot] branch master updated: Enhance VarByteChunkSVForwardIndexReader to directly read from data buffer for uncompressed data (#5816)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f68b82e  Enhance VarByteChunkSVForwardIndexReader to directly read from data buffer for uncompressed data (#5816)
f68b82e is described below

commit f68b82e511a0776f6f9c449637efcd332ab434ea
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Aug 5 21:25:01 2020 -0700

    Enhance VarByteChunkSVForwardIndexReader to directly read from data buffer for uncompressed data (#5816)
    
    Currently for var-byte raw index, we always pre-allocate the chunk buffer even for uncompressed data (the buffer could be huge if the index contains large size value). When reading values, we first copy the data into the chunk buffer, then read from the buffer. This could cause unnecessary overhead on copying the data as well as allocating the direct memory, and can even cause OOM if the buffer size is too big. The chunk buffer is needed for compressed data in order to decompress it,  [...]
    
    This PR enhances the VarByteChunkSVForwardIndexReader to directly read from the data buffer for uncompressed data, and avoid the overhead of the chunk buffer.
---
 .../forward/BaseChunkSVForwardIndexReader.java     |  17 +--
 .../FixedByteChunkSVForwardIndexReader.java        |   4 +
 .../forward/VarByteChunkSVForwardIndexReader.java  | 152 ++++++++++++++++-----
 3 files changed, 127 insertions(+), 46 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
index a573a4f..7921f41 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
@@ -39,19 +39,17 @@ import org.slf4j.LoggerFactory;
 public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReader<BaseChunkSVForwardIndexReader.ChunkReaderContext> {
   private static final Logger LOGGER = LoggerFactory.getLogger(BaseChunkSVForwardIndexReader.class);
 
-  protected final int _chunkSize;
-  protected final int _numDocsPerChunk;
+  protected final PinotDataBuffer _dataBuffer;
+  protected final DataType _valueType;
   protected final int _numChunks;
+  protected final int _numDocsPerChunk;
   protected final int _lengthOfLongestEntry;
   protected final boolean _isCompressed;
+  protected final ChunkDecompressor _chunkDecompressor;
+  protected final PinotDataBuffer _dataHeader;
+  protected final int _headerEntryChunkOffsetSize;
   protected final PinotDataBuffer _rawData;
 
-  private final PinotDataBuffer _dataBuffer;
-  private final DataType _valueType;
-  private final PinotDataBuffer _dataHeader;
-  private final ChunkDecompressor _chunkDecompressor;
-  private final int _headerEntryChunkOffsetSize;
-
   public BaseChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
     _dataBuffer = dataBuffer;
     _valueType = valueType;
@@ -89,7 +87,6 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
       _chunkDecompressor = ChunkCompressorFactory.getDecompressor(ChunkCompressorFactory.CompressionType.SNAPPY);
     }
 
-    _chunkSize = (_lengthOfLongestEntry * _numDocsPerChunk);
     _headerEntryChunkOffsetSize = BaseChunkSVForwardIndexWriter.getHeaderEntryChunkOffsetSize(version);
 
     // Slice out the header from the data buffer.
@@ -146,7 +143,7 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
    * @param chunkId Id of the chunk for which to return the position.
    * @return Position (offset) of the chunk in the data.
    */
-  private long getChunkPosition(int chunkId) {
+  protected long getChunkPosition(int chunkId) {
     if (_headerEntryChunkOffsetSize == Integer.BYTES) {
       return _dataHeader.getInt(chunkId * _headerEntryChunkOffsetSize);
     } else {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
index a02ea31..5aba57d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.segment.index.readers.forward;
 
 import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
 import org.apache.pinot.core.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -30,11 +31,14 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
  * <p>For data layout, please refer to the documentation for {@link FixedByteChunkSVForwardIndexWriter}
  */
 public final class FixedByteChunkSVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+  private final int _chunkSize;
 
   public FixedByteChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
     super(dataBuffer, valueType);
+    _chunkSize = _numDocsPerChunk * _lengthOfLongestEntry;
   }
 
+  @Nullable
   @Override
   public ChunkReaderContext createContext() {
     if (_isCompressed) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
index e0785d8..e297c8a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.segment.index.readers.forward;
 
 import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.core.io.writer.impl.VarByteChunkSVForwardIndexWriter;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@@ -26,11 +27,13 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 /**
- * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of  of variable length data
- * type (STRING, BYTES).
+ * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of variable length data type
+ * (STRING, BYTES).
  * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
  */
 public final class VarByteChunkSVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+  private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+
   private final int _maxChunkSize;
 
   // Thread local (reusable) byte[] to read bytes from data file.
@@ -38,73 +41,150 @@ public final class VarByteChunkSVForwardIndexReader extends BaseChunkSVForwardIn
 
   public VarByteChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
     super(dataBuffer, valueType);
-    _maxChunkSize = _numDocsPerChunk * (VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE
-        + _lengthOfLongestEntry);
+    _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
   }
 
+  @Nullable
   @Override
   public ChunkReaderContext createContext() {
-    return new ChunkReaderContext(_maxChunkSize);
+    if (_isCompressed) {
+      return new ChunkReaderContext(_maxChunkSize);
+    } else {
+      return null;
+    }
   }
 
   @Override
   public String getString(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return getStringCompressed(docId, context);
+    } else {
+      return getStringUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read STRING value from the compressed index.
+   */
+  private String getStringCompressed(int docId, ChunkReaderContext context) {
     int chunkRowId = docId % _numDocsPerChunk;
     ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
 
-    int rowOffset =
-        chunkBuffer.getInt(chunkRowId * VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE);
-    int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer);
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
 
-    int length = nextRowOffset - rowOffset;
+    int length = valueEndOffset - valueStartOffset;
     byte[] bytes = _reusableBytes.get();
-
-    chunkBuffer.position(rowOffset);
+    chunkBuffer.position(valueStartOffset);
     chunkBuffer.get(bytes, 0, length);
+    return StringUtil.decodeUtf8(bytes, 0, length);
+  }
+
+  /**
+   * Helper method to read STRING value from the uncompressed index.
+   */
+  private String getStringUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
 
+    int length = (int) (valueEndOffset - valueStartOffset);
+    byte[] bytes = _reusableBytes.get();
+    _dataBuffer.copyTo(valueStartOffset, bytes, 0, length);
     return StringUtil.decodeUtf8(bytes, 0, length);
   }
 
   @Override
   public byte[] getBytes(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return getBytesCompressed(docId, context);
+    } else {
+      return getBytesUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read BYTES value from the compressed index.
+   */
+  private byte[] getBytesCompressed(int docId, ChunkReaderContext context) {
     int chunkRowId = docId % _numDocsPerChunk;
     ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
 
-    int rowOffset =
-        chunkBuffer.getInt(chunkRowId * VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE);
-    int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer);
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
 
-    int length = nextRowOffset - rowOffset;
-    byte[] bytes = new byte[length];
+    byte[] bytes = new byte[valueEndOffset - valueStartOffset];
+    chunkBuffer.position(valueStartOffset);
+    chunkBuffer.get(bytes);
+    return bytes;
+  }
 
-    chunkBuffer.position(rowOffset);
-    chunkBuffer.get(bytes, 0, length);
+  /**
+   * Helper method to read BYTES value from the uncompressed index.
+   */
+  private byte[] getBytesUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
+
+    byte[] bytes = new byte[(int) (valueEndOffset - valueStartOffset)];
+    _dataBuffer.copyTo(valueStartOffset, bytes);
     return bytes;
   }
 
   /**
-   * Helper method to compute the offset of next row in the chunk buffer.
-   *
-   * @param currentRowId Current row id within the chunk buffer.
-   * @param chunkBuffer Chunk buffer containing the rows.
-   *
-   * @return Offset of next row within the chunk buffer. If current row is the last one,
-   * chunkBuffer.limit() is returned.
+   * Helper method to compute the end offset of the value in the chunk buffer.
    */
-  private int getNextRowOffset(int currentRowId, ByteBuffer chunkBuffer) {
-    int nextRowOffset;
+  private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) {
+    if (rowId == _numDocsPerChunk - 1) {
+      // Last row in the chunk
+      return chunkBuffer.limit();
+    } else {
+      int valueEndOffset = chunkBuffer.getInt((rowId + 1) * ROW_OFFSET_SIZE);
+      if (valueEndOffset == 0) {
+        // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows)
+        return chunkBuffer.limit();
+      } else {
+        return valueEndOffset;
+      }
+    }
+  }
 
-    if (currentRowId == _numDocsPerChunk - 1) {
-      // Last row in this trunk.
-      nextRowOffset = chunkBuffer.limit();
+  /**
+   * Helper method to compute the end offset of the value in the data buffer.
+   */
+  private long getValueEndOffset(int chunkId, int chunkRowId, long chunkStartOffset) {
+    if (chunkId == _numChunks - 1) {
+      // Last chunk
+      if (chunkRowId == _numDocsPerChunk - 1) {
+        // Last row in the last chunk
+        return _dataBuffer.size();
+      } else {
+        int valueEndOffsetInChunk = _dataBuffer.getInt(chunkStartOffset + (chunkRowId + 1) * ROW_OFFSET_SIZE);
+        if (valueEndOffsetInChunk == 0) {
+          // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows)
+          return _dataBuffer.size();
+        } else {
+          return chunkStartOffset + valueEndOffsetInChunk;
+        }
+      }
     } else {
-      nextRowOffset =
-          chunkBuffer.getInt((currentRowId + 1) * VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE);
-      // For incomplete chunks, the next string's offset will be 0 as row offset for absent rows are 0.
-      if (nextRowOffset == 0) {
-        nextRowOffset = chunkBuffer.limit();
+      if (chunkRowId == _numDocsPerChunk - 1) {
+        // Last row in the chunk
+        return getChunkPosition(chunkId + 1);
+      } else {
+        return chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (chunkRowId + 1) * ROW_OFFSET_SIZE);
       }
     }
-    return nextRowOffset;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org