You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/08/05 23:21:27 UTC

[GitHub] [incubator-pinot] Jackie-Jiang opened a new pull request #5816: Enhance VarByteChunkSVForwardIndexReader to directly read from data buffer for uncompressed data

Jackie-Jiang opened a new pull request #5816:
URL: https://github.com/apache/incubator-pinot/pull/5816


   ## Description
   Currently for var-byte raw index, we always pre-allocate the chunk buffer even for uncompressed data (the buffer could be huge if the index contains large size value). When reading values, we first copy the data into the chunk buffer, then read from the buffer. This could cause unnecessary overhead on copying the data as well as allocating the direct memory, and can even cause OOM if the buffer size is too big. The chunk buffer is needed for compressed data in order to decompress it, but not necessary for uncompressed data as we can directly read from the data buffer.
   
   This PR enhances the VarByteChunkSVForwardIndexReader to directly read from the data buffer for uncompressed data, and avoid the overhead of the chunk buffer.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #5816: Enhance VarByteChunkSVForwardIndexReader to directly read from data buffer for uncompressed data

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #5816:
URL: https://github.com/apache/incubator-pinot/pull/5816#discussion_r466083535



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
##########
@@ -19,92 +19,172 @@
 package org.apache.pinot.core.segment.index.readers.forward;
 
 import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.core.io.writer.impl.VarByteChunkSVForwardIndexWriter;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 /**
- * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of  of variable length data
- * type (STRING, BYTES).
+ * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of variable length data type
+ * (STRING, BYTES).
  * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
  */
 public final class VarByteChunkSVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+  private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+
   private final int _maxChunkSize;
 
   // Thread local (reusable) byte[] to read bytes from data file.
   private final ThreadLocal<byte[]> _reusableBytes = ThreadLocal.withInitial(() -> new byte[_lengthOfLongestEntry]);
 
   public VarByteChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
     super(dataBuffer, valueType);
-    _maxChunkSize = _numDocsPerChunk * (VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE
-        + _lengthOfLongestEntry);
+    _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
   }
 
+  @Nullable
   @Override
   public ChunkReaderContext createContext() {
-    return new ChunkReaderContext(_maxChunkSize);
+    if (_isCompressed) {
+      return new ChunkReaderContext(_maxChunkSize);
+    } else {
+      return null;
+    }
   }
 
   @Override
   public String getString(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return getStringCompressed(docId, context);
+    } else {
+      return getStringUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read STRING value from the compressed index.
+   */
+  private String getStringCompressed(int docId, ChunkReaderContext context) {
     int chunkRowId = docId % _numDocsPerChunk;
     ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
 
-    int rowOffset =
-        chunkBuffer.getInt(chunkRowId * VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE);
-    int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer);
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
 
-    int length = nextRowOffset - rowOffset;
+    int length = valueEndOffset - valueStartOffset;
     byte[] bytes = _reusableBytes.get();
-
-    chunkBuffer.position(rowOffset);
+    chunkBuffer.position(valueStartOffset);
     chunkBuffer.get(bytes, 0, length);
+    return StringUtil.decodeUtf8(bytes, 0, length);
+  }
+
+  /**
+   * Helper method to read STRING value from the uncompressed index.
+   */
+  private String getStringUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
 
+    int length = (int) (valueEndOffset - valueStartOffset);
+    byte[] bytes = _reusableBytes.get();
+    _dataBuffer.copyTo(valueStartOffset, bytes, 0, length);
     return StringUtil.decodeUtf8(bytes, 0, length);
   }
 
   @Override
   public byte[] getBytes(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return getBytesCompressed(docId, context);
+    } else {
+      return getBytesUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read BYTES value from the compressed index.
+   */
+  private byte[] getBytesCompressed(int docId, ChunkReaderContext context) {
     int chunkRowId = docId % _numDocsPerChunk;
     ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
 
-    int rowOffset =
-        chunkBuffer.getInt(chunkRowId * VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE);
-    int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer);
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
 
-    int length = nextRowOffset - rowOffset;
-    byte[] bytes = new byte[length];
+    byte[] bytes = new byte[valueEndOffset - valueStartOffset];
+    chunkBuffer.position(valueStartOffset);
+    chunkBuffer.get(bytes);
+    return bytes;
+  }
 
-    chunkBuffer.position(rowOffset);
-    chunkBuffer.get(bytes, 0, length);
+  /**
+   * Helper method to read BYTES value from the uncompressed index.
+   */
+  private byte[] getBytesUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
+
+    byte[] bytes = new byte[(int) (valueEndOffset - valueStartOffset)];
+    _dataBuffer.copyTo(valueStartOffset, bytes);
     return bytes;
   }
 
   /**
-   * Helper method to compute the offset of next row in the chunk buffer.
-   *
-   * @param currentRowId Current row id within the chunk buffer.
-   * @param chunkBuffer Chunk buffer containing the rows.
-   *
-   * @return Offset of next row within the chunk buffer. If current row is the last one,
-   * chunkBuffer.limit() is returned.
+   * Helper method to compute the end offset of the value in the chunk buffer.
    */
-  private int getNextRowOffset(int currentRowId, ByteBuffer chunkBuffer) {
-    int nextRowOffset;
+  private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) {
+    if (rowId == _numDocsPerChunk - 1) {
+      // Last row in the trunk
+      return chunkBuffer.limit();
+    } else {
+      int valueEndOffset = chunkBuffer.getInt((rowId + 1) * ROW_OFFSET_SIZE);
+      if (valueEndOffset == 0) {
+        // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows)
+        return chunkBuffer.limit();
+      } else {
+        return valueEndOffset;
+      }
+    }
+  }
 
-    if (currentRowId == _numDocsPerChunk - 1) {
-      // Last row in this trunk.
-      nextRowOffset = chunkBuffer.limit();
+  /**
+   * Helper method to compute the end offset of the value in the data buffer.
+   */
+  private long getValueEndOffset(int chunkId, int chunkRowId, long chunkStartOffset) {

Review comment:
       Why the algorithm for getting endoffset or startOffset for next row is different for uncompressed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang merged pull request #5816: Enhance VarByteChunkSVForwardIndexReader to directly read from data buffer for uncompressed data

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang merged pull request #5816:
URL: https://github.com/apache/incubator-pinot/pull/5816


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on pull request #5816: Enhance VarByteChunkSVForwardIndexReader to directly read from data buffer for uncompressed data

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on pull request #5816:
URL: https://github.com/apache/incubator-pinot/pull/5816#issuecomment-669607425


   > Do we cover both compressed/uncompressed cases in the unit test?
   
   Yes, `VarByteChunkSVForwardIndexTest` covers both compressed and uncompressed indexes


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] siddharthteotia commented on a change in pull request #5816: Enhance VarByteChunkSVForwardIndexReader to directly read from data buffer for uncompressed data

Posted by GitBox <gi...@apache.org>.
siddharthteotia commented on a change in pull request #5816:
URL: https://github.com/apache/incubator-pinot/pull/5816#discussion_r466081565



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
##########
@@ -19,92 +19,172 @@
 package org.apache.pinot.core.segment.index.readers.forward;
 
 import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.core.io.writer.impl.VarByteChunkSVForwardIndexWriter;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 /**
- * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of  of variable length data
- * type (STRING, BYTES).
+ * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of variable length data type
+ * (STRING, BYTES).
  * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
  */
 public final class VarByteChunkSVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+  private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+
   private final int _maxChunkSize;
 
   // Thread local (reusable) byte[] to read bytes from data file.
   private final ThreadLocal<byte[]> _reusableBytes = ThreadLocal.withInitial(() -> new byte[_lengthOfLongestEntry]);
 
   public VarByteChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
     super(dataBuffer, valueType);
-    _maxChunkSize = _numDocsPerChunk * (VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE
-        + _lengthOfLongestEntry);
+    _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
   }
 
+  @Nullable
   @Override
   public ChunkReaderContext createContext() {
-    return new ChunkReaderContext(_maxChunkSize);
+    if (_isCompressed) {
+      return new ChunkReaderContext(_maxChunkSize);
+    } else {
+      return null;
+    }
   }
 
   @Override
   public String getString(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return getStringCompressed(docId, context);
+    } else {
+      return getStringUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read STRING value from the compressed index.
+   */
+  private String getStringCompressed(int docId, ChunkReaderContext context) {
     int chunkRowId = docId % _numDocsPerChunk;
     ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
 
-    int rowOffset =
-        chunkBuffer.getInt(chunkRowId * VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE);
-    int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer);
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
 
-    int length = nextRowOffset - rowOffset;
+    int length = valueEndOffset - valueStartOffset;
     byte[] bytes = _reusableBytes.get();
-
-    chunkBuffer.position(rowOffset);
+    chunkBuffer.position(valueStartOffset);
     chunkBuffer.get(bytes, 0, length);
+    return StringUtil.decodeUtf8(bytes, 0, length);
+  }
+
+  /**
+   * Helper method to read STRING value from the uncompressed index.
+   */
+  private String getStringUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
 
+    int length = (int) (valueEndOffset - valueStartOffset);
+    byte[] bytes = _reusableBytes.get();
+    _dataBuffer.copyTo(valueStartOffset, bytes, 0, length);
     return StringUtil.decodeUtf8(bytes, 0, length);
   }
 
   @Override
   public byte[] getBytes(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return getBytesCompressed(docId, context);
+    } else {
+      return getBytesUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read BYTES value from the compressed index.
+   */
+  private byte[] getBytesCompressed(int docId, ChunkReaderContext context) {
     int chunkRowId = docId % _numDocsPerChunk;
     ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
 
-    int rowOffset =
-        chunkBuffer.getInt(chunkRowId * VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE);
-    int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer);
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
 
-    int length = nextRowOffset - rowOffset;
-    byte[] bytes = new byte[length];
+    byte[] bytes = new byte[valueEndOffset - valueStartOffset];
+    chunkBuffer.position(valueStartOffset);
+    chunkBuffer.get(bytes);
+    return bytes;
+  }
 
-    chunkBuffer.position(rowOffset);
-    chunkBuffer.get(bytes, 0, length);
+  /**
+   * Helper method to read BYTES value from the uncompressed index.
+   */
+  private byte[] getBytesUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
+
+    byte[] bytes = new byte[(int) (valueEndOffset - valueStartOffset)];
+    _dataBuffer.copyTo(valueStartOffset, bytes);
     return bytes;
   }
 
   /**
-   * Helper method to compute the offset of next row in the chunk buffer.
-   *
-   * @param currentRowId Current row id within the chunk buffer.
-   * @param chunkBuffer Chunk buffer containing the rows.
-   *
-   * @return Offset of next row within the chunk buffer. If current row is the last one,
-   * chunkBuffer.limit() is returned.
+   * Helper method to compute the end offset of the value in the chunk buffer.
    */
-  private int getNextRowOffset(int currentRowId, ByteBuffer chunkBuffer) {
-    int nextRowOffset;
+  private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) {
+    if (rowId == _numDocsPerChunk - 1) {
+      // Last row in the trunk

Review comment:
       typo; trunk -> chunk




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5816: Enhance VarByteChunkSVForwardIndexReader to directly read from data buffer for uncompressed data

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5816:
URL: https://github.com/apache/incubator-pinot/pull/5816#discussion_r466085122



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
##########
@@ -19,92 +19,172 @@
 package org.apache.pinot.core.segment.index.readers.forward;
 
 import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.core.io.writer.impl.VarByteChunkSVForwardIndexWriter;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 /**
- * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of  of variable length data
- * type (STRING, BYTES).
+ * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of variable length data type
+ * (STRING, BYTES).
  * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
  */
 public final class VarByteChunkSVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+  private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+
   private final int _maxChunkSize;
 
   // Thread local (reusable) byte[] to read bytes from data file.
   private final ThreadLocal<byte[]> _reusableBytes = ThreadLocal.withInitial(() -> new byte[_lengthOfLongestEntry]);
 
   public VarByteChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
     super(dataBuffer, valueType);
-    _maxChunkSize = _numDocsPerChunk * (VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE
-        + _lengthOfLongestEntry);
+    _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
   }
 
+  @Nullable
   @Override
   public ChunkReaderContext createContext() {
-    return new ChunkReaderContext(_maxChunkSize);
+    if (_isCompressed) {
+      return new ChunkReaderContext(_maxChunkSize);
+    } else {
+      return null;
+    }
   }
 
   @Override
   public String getString(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return getStringCompressed(docId, context);
+    } else {
+      return getStringUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read STRING value from the compressed index.
+   */
+  private String getStringCompressed(int docId, ChunkReaderContext context) {
     int chunkRowId = docId % _numDocsPerChunk;
     ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
 
-    int rowOffset =
-        chunkBuffer.getInt(chunkRowId * VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE);
-    int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer);
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
 
-    int length = nextRowOffset - rowOffset;
+    int length = valueEndOffset - valueStartOffset;
     byte[] bytes = _reusableBytes.get();
-
-    chunkBuffer.position(rowOffset);
+    chunkBuffer.position(valueStartOffset);
     chunkBuffer.get(bytes, 0, length);
+    return StringUtil.decodeUtf8(bytes, 0, length);
+  }
+
+  /**
+   * Helper method to read STRING value from the uncompressed index.
+   */
+  private String getStringUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
 
+    int length = (int) (valueEndOffset - valueStartOffset);
+    byte[] bytes = _reusableBytes.get();
+    _dataBuffer.copyTo(valueStartOffset, bytes, 0, length);
     return StringUtil.decodeUtf8(bytes, 0, length);
   }
 
   @Override
   public byte[] getBytes(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return getBytesCompressed(docId, context);
+    } else {
+      return getBytesUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read BYTES value from the compressed index.
+   */
+  private byte[] getBytesCompressed(int docId, ChunkReaderContext context) {
     int chunkRowId = docId % _numDocsPerChunk;
     ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
 
-    int rowOffset =
-        chunkBuffer.getInt(chunkRowId * VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE);
-    int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer);
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
 
-    int length = nextRowOffset - rowOffset;
-    byte[] bytes = new byte[length];
+    byte[] bytes = new byte[valueEndOffset - valueStartOffset];
+    chunkBuffer.position(valueStartOffset);
+    chunkBuffer.get(bytes);
+    return bytes;
+  }
 
-    chunkBuffer.position(rowOffset);
-    chunkBuffer.get(bytes, 0, length);
+  /**
+   * Helper method to read BYTES value from the uncompressed index.
+   */
+  private byte[] getBytesUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
+
+    byte[] bytes = new byte[(int) (valueEndOffset - valueStartOffset)];
+    _dataBuffer.copyTo(valueStartOffset, bytes);
     return bytes;
   }
 
   /**
-   * Helper method to compute the offset of next row in the chunk buffer.
-   *
-   * @param currentRowId Current row id within the chunk buffer.
-   * @param chunkBuffer Chunk buffer containing the rows.
-   *
-   * @return Offset of next row within the chunk buffer. If current row is the last one,
-   * chunkBuffer.limit() is returned.
+   * Helper method to compute the end offset of the value in the chunk buffer.
    */
-  private int getNextRowOffset(int currentRowId, ByteBuffer chunkBuffer) {
-    int nextRowOffset;
+  private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) {
+    if (rowId == _numDocsPerChunk - 1) {
+      // Last row in the trunk
+      return chunkBuffer.limit();
+    } else {
+      int valueEndOffset = chunkBuffer.getInt((rowId + 1) * ROW_OFFSET_SIZE);
+      if (valueEndOffset == 0) {
+        // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows)
+        return chunkBuffer.limit();
+      } else {
+        return valueEndOffset;
+      }
+    }
+  }
 
-    if (currentRowId == _numDocsPerChunk - 1) {
-      // Last row in this trunk.
-      nextRowOffset = chunkBuffer.limit();
+  /**
+   * Helper method to compute the end offset of the value in the data buffer.
+   */
+  private long getValueEndOffset(int chunkId, int chunkRowId, long chunkStartOffset) {

Review comment:
       Algorithm is slightly different because with the `chunkBuffer` we can directly get the `chunkEndOffset` via `chunkBuffer.limit()`, which is not the case for the uncompressed one. That is why we have a branch on the last chunk.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5816: Enhance VarByteChunkSVForwardIndexReader to directly read from data buffer for uncompressed data

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5816:
URL: https://github.com/apache/incubator-pinot/pull/5816#discussion_r466083856



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
##########
@@ -19,92 +19,172 @@
 package org.apache.pinot.core.segment.index.readers.forward;
 
 import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.core.io.writer.impl.VarByteChunkSVForwardIndexWriter;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 /**
- * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of  of variable length data
- * type (STRING, BYTES).
+ * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of variable length data type
+ * (STRING, BYTES).
  * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
  */
 public final class VarByteChunkSVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+  private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+
   private final int _maxChunkSize;
 
   // Thread local (reusable) byte[] to read bytes from data file.
   private final ThreadLocal<byte[]> _reusableBytes = ThreadLocal.withInitial(() -> new byte[_lengthOfLongestEntry]);
 
   public VarByteChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
     super(dataBuffer, valueType);
-    _maxChunkSize = _numDocsPerChunk * (VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE
-        + _lengthOfLongestEntry);
+    _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
   }
 
+  @Nullable
   @Override
   public ChunkReaderContext createContext() {
-    return new ChunkReaderContext(_maxChunkSize);
+    if (_isCompressed) {
+      return new ChunkReaderContext(_maxChunkSize);
+    } else {
+      return null;
+    }
   }
 
   @Override
   public String getString(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return getStringCompressed(docId, context);
+    } else {
+      return getStringUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read STRING value from the compressed index.
+   */
+  private String getStringCompressed(int docId, ChunkReaderContext context) {
     int chunkRowId = docId % _numDocsPerChunk;
     ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
 
-    int rowOffset =
-        chunkBuffer.getInt(chunkRowId * VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE);
-    int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer);
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
 
-    int length = nextRowOffset - rowOffset;
+    int length = valueEndOffset - valueStartOffset;
     byte[] bytes = _reusableBytes.get();
-
-    chunkBuffer.position(rowOffset);
+    chunkBuffer.position(valueStartOffset);
     chunkBuffer.get(bytes, 0, length);
+    return StringUtil.decodeUtf8(bytes, 0, length);
+  }
+
+  /**
+   * Helper method to read STRING value from the uncompressed index.
+   */
+  private String getStringUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
 
+    int length = (int) (valueEndOffset - valueStartOffset);
+    byte[] bytes = _reusableBytes.get();
+    _dataBuffer.copyTo(valueStartOffset, bytes, 0, length);
     return StringUtil.decodeUtf8(bytes, 0, length);
   }
 
   @Override
   public byte[] getBytes(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return getBytesCompressed(docId, context);
+    } else {
+      return getBytesUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read BYTES value from the compressed index.
+   */
+  private byte[] getBytesCompressed(int docId, ChunkReaderContext context) {
     int chunkRowId = docId % _numDocsPerChunk;
     ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
 
-    int rowOffset =
-        chunkBuffer.getInt(chunkRowId * VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE);
-    int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer);
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
 
-    int length = nextRowOffset - rowOffset;
-    byte[] bytes = new byte[length];
+    byte[] bytes = new byte[valueEndOffset - valueStartOffset];
+    chunkBuffer.position(valueStartOffset);
+    chunkBuffer.get(bytes);
+    return bytes;
+  }
 
-    chunkBuffer.position(rowOffset);
-    chunkBuffer.get(bytes, 0, length);
+  /**
+   * Helper method to read BYTES value from the uncompressed index.
+   */
+  private byte[] getBytesUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
+
+    byte[] bytes = new byte[(int) (valueEndOffset - valueStartOffset)];
+    _dataBuffer.copyTo(valueStartOffset, bytes);
     return bytes;
   }
 
   /**
-   * Helper method to compute the offset of next row in the chunk buffer.
-   *
-   * @param currentRowId Current row id within the chunk buffer.
-   * @param chunkBuffer Chunk buffer containing the rows.
-   *
-   * @return Offset of next row within the chunk buffer. If current row is the last one,
-   * chunkBuffer.limit() is returned.
+   * Helper method to compute the end offset of the value in the chunk buffer.
    */
-  private int getNextRowOffset(int currentRowId, ByteBuffer chunkBuffer) {
-    int nextRowOffset;
+  private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) {
+    if (rowId == _numDocsPerChunk - 1) {
+      // Last row in the trunk

Review comment:
       Good catch




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org