You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/11/10 03:40:48 UTC

[pinot] branch master updated: implement size balanced V4 raw chunk format (#7661)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 13c9ee9  implement size balanced V4 raw chunk format (#7661)
13c9ee9 is described below

commit 13c9ee9556498bb6dc4ab60734743edb8b89773c
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Wed Nov 10 03:37:05 2021 +0000

    implement size balanced V4 raw chunk format (#7661)
---
 .../impl/VarByteChunkSVForwardIndexWriter.java     |   4 +-
 .../impl/VarByteChunkSVForwardIndexWriterV4.java   | 229 ++++++++++++++++
 .../local/io/writer/impl/VarByteChunkWriter.java   |  15 +-
 .../fwd/MultiValueFixedByteRawIndexCreator.java    |  10 +-
 .../fwd/SingleValueVarByteRawIndexCreator.java     |  10 +-
 .../index/column/PhysicalColumnIndexContainer.java |  11 +-
 .../forward/BaseChunkSVForwardIndexReader.java     |   2 +-
 .../VarByteChunkSVForwardIndexReaderV4.java        | 296 +++++++++++++++++++++
 .../segment/index/creator/VarByteChunkV4Test.java  | 167 ++++++++++++
 .../spi/compression/ChunkCompressionType.java      |   9 +
 .../pinot/segment/spi/memory/CleanerUtil.java      |  10 +
 11 files changed, 743 insertions(+), 20 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
index fed1200..f035a62 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
@@ -53,7 +53,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
  * Only sequential writes are supported.
  */
 @NotThreadSafe
-public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWriter {
+public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWriter implements VarByteChunkWriter {
 
   public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES;
 
@@ -87,10 +87,12 @@ public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWri
     _chunkDataOffSet = _chunkHeaderSize;
   }
 
+  @Override
   public void putString(String value) {
     putBytes(value.getBytes(UTF_8));
   }
 
+  @Override
   public void putBytes(byte[] value) {
     _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
     _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterV4.java
new file mode 100644
index 0000000..ef24570
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterV4.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.writer.impl;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+import org.apache.pinot.segment.spi.memory.CleanerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to write out variable length bytes into a single column.
+ *
+ *
+ * Only sequential writes are supported.
+ */
+@NotThreadSafe
+public class VarByteChunkSVForwardIndexWriterV4 implements VarByteChunkWriter {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(VarByteChunkSVForwardIndexWriterV4.class);
+
+  public static final int VERSION = 4;
+
+  private static final String DATA_BUFFER_SUFFIX = ".buf";
+
+  private final File _dataBuffer;
+  private final RandomAccessFile _output;
+  private final FileChannel _dataChannel;
+  private final ByteBuffer _chunkBuffer;
+  private final ChunkCompressor _chunkCompressor;
+
+  private int _docIdOffset = 0;
+  private int _nextDocId = 0;
+  private int _metadataSize = 0;
+  private long _chunkOffset = 0;
+
+  public VarByteChunkSVForwardIndexWriterV4(File file, ChunkCompressionType compressionType, int chunkSize)
+      throws IOException {
+    _dataBuffer = new File(file.getName() + DATA_BUFFER_SUFFIX);
+    _output = new RandomAccessFile(file, "rw");
+    _dataChannel = new RandomAccessFile(_dataBuffer, "rw").getChannel();
+    _chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType, true);
+    _chunkBuffer = ByteBuffer.allocateDirect(chunkSize).order(ByteOrder.LITTLE_ENDIAN);
+    // reserve space for numDocs
+    _chunkBuffer.position(Integer.BYTES);
+    writeHeader(_chunkCompressor.compressionType(), chunkSize);
+  }
+
+  private void writeHeader(ChunkCompressionType compressionType, int targetDecompressedChunkSize)
+      throws IOException {
+    // keep metadata BE for backwards compatibility
+    // (e.g. the version needs to be read by a factory which assumes BE)
+    _output.writeInt(VERSION);
+    _output.writeInt(targetDecompressedChunkSize);
+    _output.writeInt(compressionType.getValue());
+    // reserve a slot to write the data offset into
+    _output.writeInt(0);
+    _metadataSize += 4 * Integer.BYTES;
+  }
+
+  @Override
+  public void putString(String string) {
+    putBytes(string.getBytes(StandardCharsets.UTF_8));
+  }
+
+  @Override
+  public void putBytes(byte[] bytes) {
+    Preconditions.checkState(_chunkOffset < (1L << 32), "exceeded 4GB of compressed chunks");
+    int sizeRequired = Integer.BYTES + bytes.length;
+    if (_chunkBuffer.position() > _chunkBuffer.capacity() - sizeRequired) {
+      flushChunk();
+      if (sizeRequired > _chunkBuffer.capacity() - Integer.BYTES) {
+        writeHugeChunk(bytes);
+        return;
+      }
+    }
+    _chunkBuffer.putInt(bytes.length);
+    _chunkBuffer.put(bytes);
+    _nextDocId++;
+  }
+
+  private void writeHugeChunk(byte[] bytes) {
+    // huge values where the bytes and their length prefix don't fit in to the remainder of the buffer after the prefix
+    // for the number of documents in a regular chunk are written as a single value without metadata, and these chunks
+    // are detected by marking the MSB in the doc id offset
+    final ByteBuffer buffer;
+    if (_chunkCompressor.compressionType() == ChunkCompressionType.SNAPPY
+        || _chunkCompressor.compressionType() == ChunkCompressionType.ZSTANDARD) {
+      // SNAPPY and ZSTANDARD libraries don't work with on heap buffers,
+      // so the already allocated bytes are not good enough
+      buffer = ByteBuffer.allocateDirect(bytes.length);
+      buffer.put(bytes);
+      buffer.flip();
+    } else {
+      buffer = ByteBuffer.wrap(bytes);
+    }
+    try {
+      _nextDocId++;
+      write(buffer, true);
+    } finally {
+      CleanerUtil.cleanQuietly(buffer);
+    }
+  }
+
+  private void flushChunk() {
+    if (_nextDocId > _docIdOffset) {
+      writeChunk();
+    }
+  }
+
+  private void writeChunk() {
+    /*
+    This method translates from the current state of the buffer, assuming there are 3 values of lengths a,b, and c:
+    [-][a][a bytes][b][b bytes][c][c bytes]
+    to:
+    [3][16][a+16][a+b+16][a bytes][b bytes][c bytes]
+    [------16-bytes-----][----a+b+c bytes----------]
+     */
+    int numDocs = _nextDocId - _docIdOffset;
+    _chunkBuffer.putInt(0, numDocs);
+    // collect offsets
+    int[] offsets = new int[numDocs];
+    int offset = Integer.BYTES;
+    for (int i = 0; i < numDocs; i++) {
+      offsets[i] = offset;
+      int size = _chunkBuffer.getInt(offset);
+      offset += size + Integer.BYTES;
+    }
+    // now iterate backwards shifting variable length content backwards to make space for prefixes at the start
+    // this pays for itself by allowing random access to readers
+    int limit = _chunkBuffer.position();
+    int accumulatedOffset = Integer.BYTES;
+    for (int i = numDocs - 2; i >= 0; i--) {
+      int length = _chunkBuffer.getInt(offsets[i]);
+      ByteBuffer source = _chunkBuffer.duplicate();
+      int copyFrom = offsets[i] + Integer.BYTES;
+      source.position(copyFrom).limit(copyFrom + length);
+      _chunkBuffer.position(copyFrom + accumulatedOffset);
+      _chunkBuffer.put(source);
+      offsets[i + 1] = _chunkBuffer.position();
+      accumulatedOffset += Integer.BYTES;
+    }
+    offsets[0] = Integer.BYTES * (numDocs + 1);
+    // write the offsets into the space created at the front
+    _chunkBuffer.position(Integer.BYTES);
+    _chunkBuffer.asIntBuffer().put(offsets);
+    _chunkBuffer.position(0);
+    _chunkBuffer.limit(limit);
+    write(_chunkBuffer, false);
+    clearChunkBuffer();
+  }
+
+  private void write(ByteBuffer buffer, boolean huge) {
+    int maxCompressedSize = _chunkCompressor.maxCompressedSize(buffer.limit());
+    ByteBuffer target = null;
+    try {
+      target = _dataChannel.map(FileChannel.MapMode.READ_WRITE, _chunkOffset, maxCompressedSize)
+          .order(ByteOrder.LITTLE_ENDIAN);
+      int compressedSize = _chunkCompressor.compress(buffer, target);
+      // reverse bytes here because the file writes BE and we want to read the metadata LE
+      _output.writeInt(Integer.reverseBytes(_docIdOffset | (huge ? 0x80000000 : 0)));
+      _output.writeInt(Integer.reverseBytes((int) (_chunkOffset & 0xFFFFFFFFL)));
+      _metadataSize += 8;
+      _chunkOffset += compressedSize;
+      _docIdOffset = _nextDocId;
+    } catch (IOException e) {
+      LOGGER.error("Exception caught while compressing/writing data chunk", e);
+      throw new RuntimeException(e);
+    } finally {
+      CleanerUtil.cleanQuietly(target);
+    }
+  }
+
+  private void clearChunkBuffer() {
+    _chunkBuffer.clear();
+    _chunkBuffer.position(Integer.BYTES);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    flushChunk();
+    // write out where the chunks start into slot reserved at offset 12
+    _output.seek(3 * Integer.BYTES);
+    _output.writeInt(_metadataSize);
+    _output.seek(_metadataSize);
+    _dataChannel.truncate(_chunkOffset);
+    _output.setLength(_metadataSize + _chunkOffset);
+    long total = _chunkOffset;
+    long position = 0;
+    while (total > 0) {
+      long transferred = _dataChannel.transferTo(position, total, _output.getChannel());
+      total -= transferred;
+      position += transferred;
+    }
+    _dataChannel.close();
+    _output.close();
+    CleanerUtil.cleanQuietly(_chunkBuffer);
+    FileUtils.deleteQuietly(_dataBuffer);
+  }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
similarity index 73%
copy from pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
index 5a4775e..979a04f 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
@@ -16,18 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.segment.spi.compression;
+package org.apache.pinot.segment.local.io.writer.impl;
 
-public enum ChunkCompressionType {
-  PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3), LZ4_LENGTH_PREFIXED(4);
+import java.io.Closeable;
 
-  private final int _value;
 
-  ChunkCompressionType(int value) {
-    _value = value;
-  }
+public interface VarByteChunkWriter extends Closeable {
+  void putString(String value);
 
-  public int getValue() {
-    return _value;
-  }
+  void putBytes(byte[] value);
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
index c43f8b7..ef8483e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
 import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter;
 import org.apache.pinot.segment.spi.V1Constants.Indexes;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
@@ -39,7 +41,7 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
   private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
   private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
 
-  private final VarByteChunkSVForwardIndexWriter _indexWriter;
+  private final VarByteChunkWriter _indexWriter;
   private final DataType _valueType;
 
   /**
@@ -79,8 +81,10 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
     int numDocsPerChunk =
         deriveNumDocsPerChunk ? Math.max(TARGET_MAX_CHUNK_SIZE / (totalMaxLength
             + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1) : DEFAULT_NUM_DOCS_PER_CHUNK;
-    _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
-        numDocsPerChunk, totalMaxLength, writerVersion);
+    _indexWriter = writerVersion < VarByteChunkSVForwardIndexWriterV4.VERSION
+        ? new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, numDocsPerChunk, totalMaxLength,
+        writerVersion)
+        : new VarByteChunkSVForwardIndexWriterV4(file, compressionType, TARGET_MAX_CHUNK_SIZE);
     _valueType = valueType;
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
index d2ecdb4..41c3622 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
@@ -23,6 +23,8 @@ import java.io.File;
 import java.io.IOException;
 import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
 import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
@@ -37,7 +39,7 @@ public class SingleValueVarByteRawIndexCreator implements ForwardIndexCreator {
   private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
   private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
 
-  private final VarByteChunkSVForwardIndexWriter _indexWriter;
+  private final VarByteChunkWriter _indexWriter;
   private final DataType _valueType;
 
   /**
@@ -74,8 +76,10 @@ public class SingleValueVarByteRawIndexCreator implements ForwardIndexCreator {
       throws IOException {
     File file = new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
     int numDocsPerChunk = deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
-    _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, numDocsPerChunk, maxLength,
-        writerVersion);
+    _indexWriter = writerVersion < VarByteChunkSVForwardIndexWriterV4.VERSION
+        ? new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, numDocsPerChunk, maxLength,
+        writerVersion)
+        : new VarByteChunkSVForwardIndexWriterV4(file, compressionType, TARGET_MAX_CHUNK_SIZE);
     _valueType = valueType;
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
index 5d8657b..8dfd2c7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
 import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -49,6 +50,7 @@ import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChu
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
 import org.apache.pinot.segment.local.segment.index.readers.geospatial.ImmutableH3IndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.json.ImmutableJsonIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl;
@@ -308,8 +310,13 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
             : new FixedByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType);
       case STRING:
       case BYTES:
-        return isSingleValue ? new VarByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType)
-            : new VarByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType);
+        if (isSingleValue) {
+          int version = forwardIndexBuffer.getInt(0);
+          return version < VarByteChunkSVForwardIndexWriterV4.VERSION
+              ? new VarByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType)
+              : new VarByteChunkSVForwardIndexReaderV4(forwardIndexBuffer, storedType);
+        }
+        return new VarByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType);
       default:
         throw new IllegalStateException("Illegal data type for raw forward index: " + dataType);
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
index 46a0629..56090ca 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
@@ -77,7 +77,7 @@ public abstract class BaseChunkSVForwardIndexReader
       _dataBuffer.getInt(headerOffset); // Total docs
       headerOffset += Integer.BYTES;
 
-      ChunkCompressionType compressionType = ChunkCompressionType.values()[_dataBuffer.getInt(headerOffset)];
+      ChunkCompressionType compressionType = ChunkCompressionType.valueOf(_dataBuffer.getInt(headerOffset));
       _chunkDecompressor = ChunkCompressorFactory.getDecompressor(compressionType);
       _isCompressed = !compressionType.equals(ChunkCompressionType.PASS_THROUGH);
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
new file mode 100644
index 0000000..13870dd
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.memory.CleanerUtil;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class VarByteChunkSVForwardIndexReaderV4
+    implements ForwardIndexReader<VarByteChunkSVForwardIndexReaderV4.ReaderContext> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(VarByteChunkSVForwardIndexReaderV4.class);
+
+  private static final int METADATA_ENTRY_SIZE = 8;
+
+  private final FieldSpec.DataType _valueType;
+  private final int _targetDecompressedChunkSize;
+  private final ChunkDecompressor _chunkDecompressor;
+  private final ChunkCompressionType _chunkCompressionType;
+
+  private final PinotDataBuffer _metadata;
+  private final PinotDataBuffer _chunks;
+
+  public VarByteChunkSVForwardIndexReaderV4(PinotDataBuffer dataBuffer, FieldSpec.DataType valueType) {
+    if (dataBuffer.getInt(0) < VarByteChunkSVForwardIndexWriterV4.VERSION) {
+      throw new IllegalStateException("version " + dataBuffer.getInt(0) + " < "
+          + VarByteChunkSVForwardIndexWriterV4.VERSION);
+    }
+    _valueType = valueType;
+    _targetDecompressedChunkSize = dataBuffer.getInt(4);
+    _chunkCompressionType = ChunkCompressionType.valueOf(dataBuffer.getInt(8));
+    _chunkDecompressor = ChunkCompressorFactory.getDecompressor(_chunkCompressionType);
+    int chunksOffset = dataBuffer.getInt(12);
+    // the file has a BE header for compatability reasons (version selection) but the content is LE
+    _metadata = dataBuffer.view(16, chunksOffset, ByteOrder.LITTLE_ENDIAN);
+    _chunks = dataBuffer.view(chunksOffset, dataBuffer.size(), ByteOrder.LITTLE_ENDIAN);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _valueType;
+  }
+
+  @Override
+  public String getString(int docId, ReaderContext context) {
+    return new String(context.getValue(docId), StandardCharsets.UTF_8);
+  }
+
+  @Override
+  public byte[] getBytes(int docId, ReaderContext context) {
+    return context.getValue(docId);
+  }
+
+  @Nullable
+  @Override
+  public ReaderContext createContext() {
+    return _chunkCompressionType == ChunkCompressionType.PASS_THROUGH
+        ? new UncompressedReaderContext(_chunks, _metadata)
+        : new CompressedReaderContext(_metadata, _chunks, _chunkDecompressor, _chunkCompressionType,
+            _targetDecompressedChunkSize);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+
+  public static abstract class ReaderContext implements ForwardIndexReaderContext {
+
+    protected final PinotDataBuffer _chunks;
+    protected final PinotDataBuffer _metadata;
+    protected int _docIdOffset;
+    protected int _nextDocIdOffset;
+    protected boolean _regularChunk;
+    protected int _numDocsInCurrentChunk;
+
+    protected ReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks) {
+      _chunks = chunks;
+      _metadata = metadata;
+    }
+
+    public byte[] getValue(int docId) {
+      if (docId >= _docIdOffset && docId < _nextDocIdOffset) {
+        return readSmallUncompressedValue(docId);
+      } else {
+        try {
+          return decompressAndRead(docId);
+        } catch (IOException e) {
+          LOGGER.error("Exception caught while decompressing data chunk", e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    protected long chunkIndexFor(int docId) {
+      long low = 0;
+      long high = (_metadata.size() / METADATA_ENTRY_SIZE) - 1;
+      while (low <= high) {
+        long mid = (low + high) >>> 1;
+        long position = mid * METADATA_ENTRY_SIZE;
+        int midDocId = _metadata.getInt(position) & 0x7FFFFFFF;
+        if (midDocId < docId) {
+          low = mid + 1;
+        } else if (midDocId > docId) {
+          high = mid - 1;
+        } else {
+          return position;
+        }
+      }
+      return (low - 1) * METADATA_ENTRY_SIZE;
+    }
+
+    protected abstract byte[] processChunkAndReadFirstValue(int docId, long offset, long limit)
+        throws IOException;
+
+    protected abstract byte[] readSmallUncompressedValue(int docId);
+
+    private byte[] decompressAndRead(int docId)
+        throws IOException {
+      long metadataEntry = chunkIndexFor(docId);
+      int info = _metadata.getInt(metadataEntry);
+      _docIdOffset = info & 0x7FFFFFFF;
+      _regularChunk = _docIdOffset == info;
+      long offset = _metadata.getInt(metadataEntry + Integer.BYTES) & 0xFFFFFFFFL;
+      long limit;
+      if (_metadata.size() - METADATA_ENTRY_SIZE > metadataEntry) {
+        _nextDocIdOffset = _metadata.getInt(metadataEntry + METADATA_ENTRY_SIZE) & 0x7FFFFFFF;
+        limit = _metadata.getInt(metadataEntry + METADATA_ENTRY_SIZE + Integer.BYTES) & 0xFFFFFFFFL;
+      } else {
+        _nextDocIdOffset = Integer.MAX_VALUE;
+        limit = _chunks.size();
+      }
+      return processChunkAndReadFirstValue(docId, offset, limit);
+    }
+  }
+
+  private static final class UncompressedReaderContext extends ReaderContext {
+
+    private ByteBuffer _chunk;
+
+    UncompressedReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks) {
+      super(chunks, metadata);
+    }
+
+    @Override
+    protected byte[] processChunkAndReadFirstValue(int docId, long offset, long limit) {
+      _chunk = _chunks.toDirectByteBuffer(offset, (int) (limit - offset));
+      if (!_regularChunk) {
+        return readHugeValue();
+      }
+      _numDocsInCurrentChunk = _chunk.getInt(0);
+      return readSmallUncompressedValue(docId);
+    }
+
+    private byte[] readHugeValue() {
+      byte[] value = new byte[_chunk.capacity()];
+      _chunk.get(value);
+      return value;
+    }
+
+    @Override
+    protected byte[] readSmallUncompressedValue(int docId) {
+      int index = docId - _docIdOffset;
+      int offset = _chunk.getInt((index + 1) * Integer.BYTES);
+      int nextOffset = index == _numDocsInCurrentChunk - 1
+          ? _chunk.limit()
+          : _chunk.getInt((index + 2) * Integer.BYTES);
+      byte[] bytes = new byte[nextOffset - offset];
+      _chunk.position(offset);
+      _chunk.get(bytes);
+      _chunk.position(0);
+      return bytes;
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+    }
+  }
+
+  private static final class CompressedReaderContext extends ReaderContext {
+
+    private final ByteBuffer _decompressedBuffer;
+    private final ChunkDecompressor _chunkDecompressor;
+    private final ChunkCompressionType _chunkCompressionType;
+
+    CompressedReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks, ChunkDecompressor chunkDecompressor,
+        ChunkCompressionType chunkCompressionType, int targetChunkSize) {
+      super(metadata, chunks);
+      _chunkDecompressor = chunkDecompressor;
+      _chunkCompressionType = chunkCompressionType;
+      _decompressedBuffer = ByteBuffer.allocateDirect(targetChunkSize).order(ByteOrder.LITTLE_ENDIAN);
+    }
+
+    @Override
+    protected byte[] processChunkAndReadFirstValue(int docId, long offset, long limit)
+        throws IOException {
+      _decompressedBuffer.clear();
+      ByteBuffer compressed = _chunks.toDirectByteBuffer(offset, (int) (limit - offset));
+      if (_regularChunk) {
+        _chunkDecompressor.decompress(compressed, _decompressedBuffer);
+        _numDocsInCurrentChunk = _decompressedBuffer.getInt(0);
+        return readSmallUncompressedValue(docId);
+      }
+      // huge value, no benefit from buffering, return the whole thing
+      return readHugeCompressedValue(compressed, _chunkDecompressor.decompressedLength(compressed));
+    }
+
+    @Override
+    protected byte[] readSmallUncompressedValue(int docId) {
+      int index = docId - _docIdOffset;
+      int offset = _decompressedBuffer.getInt((index + 1) * Integer.BYTES);
+      int nextOffset = index == _numDocsInCurrentChunk - 1
+          ? _decompressedBuffer.limit()
+          : _decompressedBuffer.getInt((index + 2) * Integer.BYTES);
+      byte[] bytes = new byte[nextOffset - offset];
+      _decompressedBuffer.position(offset);
+      _decompressedBuffer.get(bytes);
+      _decompressedBuffer.position(0);
+      return bytes;
+    }
+
+    private byte[] readHugeCompressedValue(ByteBuffer compressed, int decompressedLength)
+        throws IOException {
+      // huge values don't have length prefixes; they occupy the entire chunk so are unambiguous
+      byte[] value = new byte[decompressedLength];
+      if (_chunkCompressionType == ChunkCompressionType.SNAPPY
+          || _chunkCompressionType == ChunkCompressionType.ZSTANDARD) {
+        // snappy and zstandard don't work without direct buffers
+        decompressViaDirectBuffer(compressed, value);
+      } else {
+        _chunkDecompressor.decompress(compressed, ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN));
+      }
+      return value;
+    }
+
+    private void decompressViaDirectBuffer(ByteBuffer compressed, byte[] target)
+        throws IOException {
+      ByteBuffer buffer = ByteBuffer.allocateDirect(target.length).order(ByteOrder.LITTLE_ENDIAN);
+      try {
+        _chunkDecompressor.decompress(compressed, buffer);
+        buffer.get(target);
+      } finally {
+        if (CleanerUtil.UNMAP_SUPPORTED) {
+          CleanerUtil.getCleaner().freeBuffer(buffer);
+        }
+      }
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+      CleanerUtil.cleanQuietly(_decompressedBuffer);
+    }
+  }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
new file mode 100644
index 0000000..857e29b
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class VarByteChunkV4Test {
+
+  private static final File TEST_DIR = new File(FileUtils.getTempDirectory(), "VarByteChunkV4Test");
+
+  private File _file;
+
+  @DataProvider
+  public Object[][] params() {
+    return new Object[][]{
+        {ChunkCompressionType.LZ4, 20, 1024},
+        {ChunkCompressionType.LZ4_LENGTH_PREFIXED, 20, 1024},
+        {ChunkCompressionType.PASS_THROUGH, 20, 1024},
+        {ChunkCompressionType.SNAPPY, 20, 1024},
+        {ChunkCompressionType.ZSTANDARD, 20, 1024},
+        {ChunkCompressionType.LZ4, 2048, 1024},
+        {ChunkCompressionType.LZ4_LENGTH_PREFIXED, 2048, 1024},
+        {ChunkCompressionType.PASS_THROUGH, 2048, 1024},
+        {ChunkCompressionType.SNAPPY, 2048, 1024},
+        {ChunkCompressionType.ZSTANDARD, 2048, 1024}
+    };
+  }
+
+  @BeforeClass
+  public void forceMkDir()
+      throws IOException {
+    FileUtils.forceMkdir(TEST_DIR);
+  }
+
+  @AfterClass
+  public void deleteDir() {
+    FileUtils.deleteQuietly(TEST_DIR);
+  }
+
+  @AfterMethod
+  public void after() {
+    if (_file != null) {
+      FileUtils.deleteQuietly(_file);
+    }
+  }
+
+  @Test(dataProvider = "params")
+  public void testStringSV(ChunkCompressionType compressionType, int longestEntry, int chunkSize)
+      throws IOException {
+    _file = new File(TEST_DIR, "testStringSV");
+    testSV(compressionType, longestEntry, chunkSize, FieldSpec.DataType.STRING, x -> x,
+        VarByteChunkSVForwardIndexWriterV4::putString, (reader, context, docId) -> reader.getString(docId, context));
+  }
+
+  @Test(dataProvider = "params")
+  public void testBytesSV(ChunkCompressionType compressionType, int longestEntry, int chunkSize)
+      throws IOException {
+    _file = new File(TEST_DIR, "testBytesSV");
+    testSV(compressionType, longestEntry, chunkSize, FieldSpec.DataType.BYTES, x -> x.getBytes(StandardCharsets.UTF_8),
+        VarByteChunkSVForwardIndexWriterV4::putBytes, (reader, context, docId) -> reader.getBytes(docId, context));
+  }
+
+  private <T> void testSV(ChunkCompressionType compressionType, int longestEntry, int chunkSize,
+      FieldSpec.DataType dataType, Function<String, T> forwardMapper,
+      BiConsumer<VarByteChunkSVForwardIndexWriterV4, T> write,
+      Read<T> read)
+      throws IOException {
+    List<T> values = randomStrings(1000, longestEntry).map(forwardMapper).collect(Collectors.toList());
+    try (VarByteChunkSVForwardIndexWriterV4 writer = new VarByteChunkSVForwardIndexWriterV4(_file, compressionType,
+        chunkSize)) {
+      for (T value : values) {
+        write.accept(writer, value);
+      }
+    }
+    try (PinotDataBuffer buffer = PinotDataBuffer.mapReadOnlyBigEndianFile(_file)) {
+      try (VarByteChunkSVForwardIndexReaderV4 reader = new VarByteChunkSVForwardIndexReaderV4(buffer, dataType);
+          VarByteChunkSVForwardIndexReaderV4.ReaderContext context = reader.createContext()) {
+        for (int i = 0; i < values.size(); i++) {
+          assertEquals(read.read(reader, context, i), values.get(i));
+        }
+        for (int i = 0; i < values.size(); i += 2) {
+          assertEquals(read.read(reader, context, i), values.get(i));
+        }
+        for (int i = 1; i < values.size(); i += 2) {
+          assertEquals(read.read(reader, context, i), values.get(i));
+        }
+        for (int i = 1; i < values.size(); i += 100) {
+          assertEquals(read.read(reader, context, i), values.get(i));
+        }
+        for (int i = values.size() - 1; i >= 0; i--) {
+          assertEquals(read.read(reader, context, i), values.get(i));
+        }
+        for (int i = values.size() - 1; i >= 0; i -= 2) {
+          assertEquals(read.read(reader, context, i), values.get(i));
+        }
+        for (int i = values.size() - 2; i >= 0; i -= 2) {
+          assertEquals(read.read(reader, context, i), values.get(i));
+        }
+        for (int i = values.size() - 1; i >= 0; i -= 100) {
+          assertEquals(read.read(reader, context, i), values.get(i));
+        }
+      }
+    }
+  }
+
+  private Stream<String> randomStrings(int count, int lengthOfLongestEntry) {
+    return IntStream.range(0, count)
+        .mapToObj(i -> {
+          int length = ThreadLocalRandom.current().nextInt(lengthOfLongestEntry);
+          byte[] bytes = new byte[length];
+          if (length != 0) {
+            bytes[bytes.length - 1] = 'c';
+            if (length > 2) {
+              Arrays.fill(bytes, 1, bytes.length - 1, (byte) 'b');
+            }
+            bytes[0] = 'a';
+          }
+          return new String(bytes, StandardCharsets.UTF_8);
+        });
+  }
+
+  @FunctionalInterface
+  interface Read<T> {
+    T read(VarByteChunkSVForwardIndexReaderV4 reader, VarByteChunkSVForwardIndexReaderV4.ReaderContext context,
+        int docId);
+  }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
index 5a4775e..97d7057 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
@@ -21,6 +21,8 @@ package org.apache.pinot.segment.spi.compression;
 public enum ChunkCompressionType {
   PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3), LZ4_LENGTH_PREFIXED(4);
 
+  private static final ChunkCompressionType[] VALUES = values();
+
   private final int _value;
 
   ChunkCompressionType(int value) {
@@ -30,4 +32,11 @@ public enum ChunkCompressionType {
   public int getValue() {
     return _value;
   }
+
+  public static ChunkCompressionType valueOf(int ordinal) {
+    if (ordinal < 0 || ordinal >= VALUES.length) {
+      throw new IllegalArgumentException("invalid ordinal " + ordinal);
+    }
+    return VALUES[ordinal];
+  }
 }
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CleanerUtil.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CleanerUtil.java
index 026b0a3..0469e9e 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CleanerUtil.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CleanerUtil.java
@@ -175,6 +175,16 @@ public final class CleanerUtil {
     };
   }
 
+  public static void cleanQuietly(ByteBuffer buffer) {
+    if (UNMAP_SUPPORTED && buffer != null && buffer.isDirect()) {
+      try {
+        getCleaner().freeBuffer(buffer);
+      } catch (IOException e) {
+        LOGGER.debug("failed to clean buffer", e);
+      }
+    }
+  }
+
   /**
    * Pass in an implementation of this interface to cleanup ByteBuffers.
    * CleanerUtil implements this to allow unmapping of bytebuffers

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org