You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/08/06 00:47:32 UTC

[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #5816: Enhance VarByteChunkSVForwardIndexReader to directly read from data buffer for uncompressed data

siddharthteotia commented on a change in pull request #5816:
URL: https://github.com/apache/incubator-pinot/pull/5816#discussion_r466081565



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
##########
@@ -19,92 +19,172 @@
 package org.apache.pinot.core.segment.index.readers.forward;
 
 import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.core.io.writer.impl.VarByteChunkSVForwardIndexWriter;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 /**
- * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of  of variable length data
- * type (STRING, BYTES).
+ * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of variable length data type
+ * (STRING, BYTES).
  * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
  */
 public final class VarByteChunkSVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+  private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+
   private final int _maxChunkSize;
 
   // Thread local (reusable) byte[] to read bytes from data file.
   private final ThreadLocal<byte[]> _reusableBytes = ThreadLocal.withInitial(() -> new byte[_lengthOfLongestEntry]);
 
   public VarByteChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
     super(dataBuffer, valueType);
-    _maxChunkSize = _numDocsPerChunk * (VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE
-        + _lengthOfLongestEntry);
+    _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
   }
 
+  @Nullable
   @Override
   public ChunkReaderContext createContext() {
-    return new ChunkReaderContext(_maxChunkSize);
+    if (_isCompressed) {
+      return new ChunkReaderContext(_maxChunkSize);
+    } else {
+      return null;
+    }
   }
 
   @Override
   public String getString(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return getStringCompressed(docId, context);
+    } else {
+      return getStringUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read STRING value from the compressed index.
+   */
+  private String getStringCompressed(int docId, ChunkReaderContext context) {
     int chunkRowId = docId % _numDocsPerChunk;
     ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
 
-    int rowOffset =
-        chunkBuffer.getInt(chunkRowId * VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE);
-    int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer);
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
 
-    int length = nextRowOffset - rowOffset;
+    int length = valueEndOffset - valueStartOffset;
     byte[] bytes = _reusableBytes.get();
-
-    chunkBuffer.position(rowOffset);
+    chunkBuffer.position(valueStartOffset);
     chunkBuffer.get(bytes, 0, length);
+    return StringUtil.decodeUtf8(bytes, 0, length);
+  }
+
+  /**
+   * Helper method to read STRING value from the uncompressed index.
+   */
+  private String getStringUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
 
+    int length = (int) (valueEndOffset - valueStartOffset);
+    byte[] bytes = _reusableBytes.get();
+    _dataBuffer.copyTo(valueStartOffset, bytes, 0, length);
     return StringUtil.decodeUtf8(bytes, 0, length);
   }
 
   @Override
   public byte[] getBytes(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return getBytesCompressed(docId, context);
+    } else {
+      return getBytesUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read BYTES value from the compressed index.
+   */
+  private byte[] getBytesCompressed(int docId, ChunkReaderContext context) {
     int chunkRowId = docId % _numDocsPerChunk;
     ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
 
-    int rowOffset =
-        chunkBuffer.getInt(chunkRowId * VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE);
-    int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer);
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
 
-    int length = nextRowOffset - rowOffset;
-    byte[] bytes = new byte[length];
+    byte[] bytes = new byte[valueEndOffset - valueStartOffset];
+    chunkBuffer.position(valueStartOffset);
+    chunkBuffer.get(bytes);
+    return bytes;
+  }
 
-    chunkBuffer.position(rowOffset);
-    chunkBuffer.get(bytes, 0, length);
+  /**
+   * Helper method to read BYTES value from the uncompressed index.
+   */
+  private byte[] getBytesUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
+
+    byte[] bytes = new byte[(int) (valueEndOffset - valueStartOffset)];
+    _dataBuffer.copyTo(valueStartOffset, bytes);
     return bytes;
   }
 
   /**
-   * Helper method to compute the offset of next row in the chunk buffer.
-   *
-   * @param currentRowId Current row id within the chunk buffer.
-   * @param chunkBuffer Chunk buffer containing the rows.
-   *
-   * @return Offset of next row within the chunk buffer. If current row is the last one,
-   * chunkBuffer.limit() is returned.
+   * Helper method to compute the end offset of the value in the chunk buffer.
    */
-  private int getNextRowOffset(int currentRowId, ByteBuffer chunkBuffer) {
-    int nextRowOffset;
+  private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) {
+    if (rowId == _numDocsPerChunk - 1) {
+      // Last row in the trunk

Review comment:
       typo; trunk -> chunk




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org