You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/11/10 03:40:48 UTC
[pinot] branch master updated: implement size balanced V4 raw chunk
format (#7661)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 13c9ee9 implement size balanced V4 raw chunk format (#7661)
13c9ee9 is described below
commit 13c9ee9556498bb6dc4ab60734743edb8b89773c
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Wed Nov 10 03:37:05 2021 +0000
implement size balanced V4 raw chunk format (#7661)
---
.../impl/VarByteChunkSVForwardIndexWriter.java | 4 +-
.../impl/VarByteChunkSVForwardIndexWriterV4.java | 229 ++++++++++++++++
.../local/io/writer/impl/VarByteChunkWriter.java | 15 +-
.../fwd/MultiValueFixedByteRawIndexCreator.java | 10 +-
.../fwd/SingleValueVarByteRawIndexCreator.java | 10 +-
.../index/column/PhysicalColumnIndexContainer.java | 11 +-
.../forward/BaseChunkSVForwardIndexReader.java | 2 +-
.../VarByteChunkSVForwardIndexReaderV4.java | 296 +++++++++++++++++++++
.../segment/index/creator/VarByteChunkV4Test.java | 167 ++++++++++++
.../spi/compression/ChunkCompressionType.java | 9 +
.../pinot/segment/spi/memory/CleanerUtil.java | 10 +
11 files changed, 743 insertions(+), 20 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
index fed1200..f035a62 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
@@ -53,7 +53,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
* Only sequential writes are supported.
*/
@NotThreadSafe
-public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWriter {
+public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWriter implements VarByteChunkWriter {
public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES;
@@ -87,10 +87,12 @@ public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWri
_chunkDataOffSet = _chunkHeaderSize;
}
+ @Override
public void putString(String value) {
putBytes(value.getBytes(UTF_8));
}
+ @Override
public void putBytes(byte[] value) {
_chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
_chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterV4.java
new file mode 100644
index 0000000..ef24570
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterV4.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.writer.impl;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+import org.apache.pinot.segment.spi.memory.CleanerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to write out variable length bytes into a single column.
+ *
+ *
+ * Only sequential writes are supported.
+ */
+@NotThreadSafe
+public class VarByteChunkSVForwardIndexWriterV4 implements VarByteChunkWriter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(VarByteChunkSVForwardIndexWriterV4.class);
+
+ public static final int VERSION = 4;
+
+ private static final String DATA_BUFFER_SUFFIX = ".buf";
+
+ private final File _dataBuffer;
+ private final RandomAccessFile _output;
+ private final FileChannel _dataChannel;
+ private final ByteBuffer _chunkBuffer;
+ private final ChunkCompressor _chunkCompressor;
+
+ private int _docIdOffset = 0;
+ private int _nextDocId = 0;
+ private int _metadataSize = 0;
+ private long _chunkOffset = 0;
+
+ public VarByteChunkSVForwardIndexWriterV4(File file, ChunkCompressionType compressionType, int chunkSize)
+ throws IOException {
+ _dataBuffer = new File(file.getName() + DATA_BUFFER_SUFFIX);
+ _output = new RandomAccessFile(file, "rw");
+ _dataChannel = new RandomAccessFile(_dataBuffer, "rw").getChannel();
+ _chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType, true);
+ _chunkBuffer = ByteBuffer.allocateDirect(chunkSize).order(ByteOrder.LITTLE_ENDIAN);
+ // reserve space for numDocs
+ _chunkBuffer.position(Integer.BYTES);
+ writeHeader(_chunkCompressor.compressionType(), chunkSize);
+ }
+
+ private void writeHeader(ChunkCompressionType compressionType, int targetDecompressedChunkSize)
+ throws IOException {
+ // keep metadata BE for backwards compatibility
+ // (e.g. the version needs to be read by a factory which assumes BE)
+ _output.writeInt(VERSION);
+ _output.writeInt(targetDecompressedChunkSize);
+ _output.writeInt(compressionType.getValue());
+ // reserve a slot to write the data offset into
+ _output.writeInt(0);
+ _metadataSize += 4 * Integer.BYTES;
+ }
+
+ @Override
+ public void putString(String string) {
+ putBytes(string.getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public void putBytes(byte[] bytes) {
+ Preconditions.checkState(_chunkOffset < (1L << 32), "exceeded 4GB of compressed chunks");
+ int sizeRequired = Integer.BYTES + bytes.length;
+ if (_chunkBuffer.position() > _chunkBuffer.capacity() - sizeRequired) {
+ flushChunk();
+ if (sizeRequired > _chunkBuffer.capacity() - Integer.BYTES) {
+ writeHugeChunk(bytes);
+ return;
+ }
+ }
+ _chunkBuffer.putInt(bytes.length);
+ _chunkBuffer.put(bytes);
+ _nextDocId++;
+ }
+
+ private void writeHugeChunk(byte[] bytes) {
+ // huge values where the bytes and their length prefix don't fit in to the remainder of the buffer after the prefix
+ // for the number of documents in a regular chunk are written as a single value without metadata, and these chunks
+ // are detected by marking the MSB in the doc id offset
+ final ByteBuffer buffer;
+ if (_chunkCompressor.compressionType() == ChunkCompressionType.SNAPPY
+ || _chunkCompressor.compressionType() == ChunkCompressionType.ZSTANDARD) {
+ // SNAPPY and ZSTANDARD libraries don't work with on heap buffers,
+ // so the already allocated bytes are not good enough
+ buffer = ByteBuffer.allocateDirect(bytes.length);
+ buffer.put(bytes);
+ buffer.flip();
+ } else {
+ buffer = ByteBuffer.wrap(bytes);
+ }
+ try {
+ _nextDocId++;
+ write(buffer, true);
+ } finally {
+ CleanerUtil.cleanQuietly(buffer);
+ }
+ }
+
+ private void flushChunk() {
+ if (_nextDocId > _docIdOffset) {
+ writeChunk();
+ }
+ }
+
+ private void writeChunk() {
+ /*
+ This method translates from the current state of the buffer, assuming there are 3 values of lengths a,b, and c:
+ [-][a][a bytes][b][b bytes][c][c bytes]
+ to:
+ [3][16][a+16][a+b+16][a bytes][b bytes][c bytes]
+ [------16-bytes-----][----a+b+c bytes----------]
+ */
+ int numDocs = _nextDocId - _docIdOffset;
+ _chunkBuffer.putInt(0, numDocs);
+ // collect offsets
+ int[] offsets = new int[numDocs];
+ int offset = Integer.BYTES;
+ for (int i = 0; i < numDocs; i++) {
+ offsets[i] = offset;
+ int size = _chunkBuffer.getInt(offset);
+ offset += size + Integer.BYTES;
+ }
+ // now iterate backwards shifting variable length content backwards to make space for prefixes at the start
+ // this pays for itself by allowing random access to readers
+ int limit = _chunkBuffer.position();
+ int accumulatedOffset = Integer.BYTES;
+ for (int i = numDocs - 2; i >= 0; i--) {
+ int length = _chunkBuffer.getInt(offsets[i]);
+ ByteBuffer source = _chunkBuffer.duplicate();
+ int copyFrom = offsets[i] + Integer.BYTES;
+ source.position(copyFrom).limit(copyFrom + length);
+ _chunkBuffer.position(copyFrom + accumulatedOffset);
+ _chunkBuffer.put(source);
+ offsets[i + 1] = _chunkBuffer.position();
+ accumulatedOffset += Integer.BYTES;
+ }
+ offsets[0] = Integer.BYTES * (numDocs + 1);
+ // write the offsets into the space created at the front
+ _chunkBuffer.position(Integer.BYTES);
+ _chunkBuffer.asIntBuffer().put(offsets);
+ _chunkBuffer.position(0);
+ _chunkBuffer.limit(limit);
+ write(_chunkBuffer, false);
+ clearChunkBuffer();
+ }
+
+ private void write(ByteBuffer buffer, boolean huge) {
+ int maxCompressedSize = _chunkCompressor.maxCompressedSize(buffer.limit());
+ ByteBuffer target = null;
+ try {
+ target = _dataChannel.map(FileChannel.MapMode.READ_WRITE, _chunkOffset, maxCompressedSize)
+ .order(ByteOrder.LITTLE_ENDIAN);
+ int compressedSize = _chunkCompressor.compress(buffer, target);
+ // reverse bytes here because the file writes BE and we want to read the metadata LE
+ _output.writeInt(Integer.reverseBytes(_docIdOffset | (huge ? 0x80000000 : 0)));
+ _output.writeInt(Integer.reverseBytes((int) (_chunkOffset & 0xFFFFFFFFL)));
+ _metadataSize += 8;
+ _chunkOffset += compressedSize;
+ _docIdOffset = _nextDocId;
+ } catch (IOException e) {
+ LOGGER.error("Exception caught while compressing/writing data chunk", e);
+ throw new RuntimeException(e);
+ } finally {
+ CleanerUtil.cleanQuietly(target);
+ }
+ }
+
+ private void clearChunkBuffer() {
+ _chunkBuffer.clear();
+ _chunkBuffer.position(Integer.BYTES);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ flushChunk();
+ // write out where the chunks start into slot reserved at offset 12
+ _output.seek(3 * Integer.BYTES);
+ _output.writeInt(_metadataSize);
+ _output.seek(_metadataSize);
+ _dataChannel.truncate(_chunkOffset);
+ _output.setLength(_metadataSize + _chunkOffset);
+ long total = _chunkOffset;
+ long position = 0;
+ while (total > 0) {
+ long transferred = _dataChannel.transferTo(position, total, _output.getChannel());
+ total -= transferred;
+ position += transferred;
+ }
+ _dataChannel.close();
+ _output.close();
+ CleanerUtil.cleanQuietly(_chunkBuffer);
+ FileUtils.deleteQuietly(_dataBuffer);
+ }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
similarity index 73%
copy from pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
index 5a4775e..979a04f 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
@@ -16,18 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.segment.spi.compression;
+package org.apache.pinot.segment.local.io.writer.impl;
-public enum ChunkCompressionType {
- PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3), LZ4_LENGTH_PREFIXED(4);
+import java.io.Closeable;
- private final int _value;
- ChunkCompressionType(int value) {
- _value = value;
- }
+public interface VarByteChunkWriter extends Closeable {
+ void putString(String value);
- public int getValue() {
- return _value;
- }
+ void putBytes(byte[] value);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
index c43f8b7..ef8483e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -23,6 +23,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter;
import org.apache.pinot.segment.spi.V1Constants.Indexes;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
@@ -39,7 +41,7 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
- private final VarByteChunkSVForwardIndexWriter _indexWriter;
+ private final VarByteChunkWriter _indexWriter;
private final DataType _valueType;
/**
@@ -79,8 +81,10 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
int numDocsPerChunk =
deriveNumDocsPerChunk ? Math.max(TARGET_MAX_CHUNK_SIZE / (totalMaxLength
+ VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1) : DEFAULT_NUM_DOCS_PER_CHUNK;
- _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
- numDocsPerChunk, totalMaxLength, writerVersion);
+ _indexWriter = writerVersion < VarByteChunkSVForwardIndexWriterV4.VERSION
+ ? new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, numDocsPerChunk, totalMaxLength,
+ writerVersion)
+ : new VarByteChunkSVForwardIndexWriterV4(file, compressionType, TARGET_MAX_CHUNK_SIZE);
_valueType = valueType;
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
index d2ecdb4..41c3622 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
@@ -23,6 +23,8 @@ import java.io.File;
import java.io.IOException;
import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
@@ -37,7 +39,7 @@ public class SingleValueVarByteRawIndexCreator implements ForwardIndexCreator {
private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
- private final VarByteChunkSVForwardIndexWriter _indexWriter;
+ private final VarByteChunkWriter _indexWriter;
private final DataType _valueType;
/**
@@ -74,8 +76,10 @@ public class SingleValueVarByteRawIndexCreator implements ForwardIndexCreator {
throws IOException {
File file = new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
int numDocsPerChunk = deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
- _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, numDocsPerChunk, maxLength,
- writerVersion);
+ _indexWriter = writerVersion < VarByteChunkSVForwardIndexWriterV4.VERSION
+ ? new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, numDocsPerChunk, maxLength,
+ writerVersion)
+ : new VarByteChunkSVForwardIndexWriterV4(file, compressionType, TARGET_MAX_CHUNK_SIZE);
_valueType = valueType;
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
index 5d8657b..8dfd2c7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.util.Map;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -49,6 +50,7 @@ import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChu
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
import org.apache.pinot.segment.local.segment.index.readers.geospatial.ImmutableH3IndexReader;
import org.apache.pinot.segment.local.segment.index.readers.json.ImmutableJsonIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl;
@@ -308,8 +310,13 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
: new FixedByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType);
case STRING:
case BYTES:
- return isSingleValue ? new VarByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType)
- : new VarByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType);
+ if (isSingleValue) {
+ int version = forwardIndexBuffer.getInt(0);
+ return version < VarByteChunkSVForwardIndexWriterV4.VERSION
+ ? new VarByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType)
+ : new VarByteChunkSVForwardIndexReaderV4(forwardIndexBuffer, storedType);
+ }
+ return new VarByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType);
default:
throw new IllegalStateException("Illegal data type for raw forward index: " + dataType);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
index 46a0629..56090ca 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
@@ -77,7 +77,7 @@ public abstract class BaseChunkSVForwardIndexReader
_dataBuffer.getInt(headerOffset); // Total docs
headerOffset += Integer.BYTES;
- ChunkCompressionType compressionType = ChunkCompressionType.values()[_dataBuffer.getInt(headerOffset)];
+ ChunkCompressionType compressionType = ChunkCompressionType.valueOf(_dataBuffer.getInt(headerOffset));
_chunkDecompressor = ChunkCompressorFactory.getDecompressor(compressionType);
_isCompressed = !compressionType.equals(ChunkCompressionType.PASS_THROUGH);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
new file mode 100644
index 0000000..13870dd
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.memory.CleanerUtil;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class VarByteChunkSVForwardIndexReaderV4
+ implements ForwardIndexReader<VarByteChunkSVForwardIndexReaderV4.ReaderContext> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(VarByteChunkSVForwardIndexReaderV4.class);
+
+ private static final int METADATA_ENTRY_SIZE = 8;
+
+ private final FieldSpec.DataType _valueType;
+ private final int _targetDecompressedChunkSize;
+ private final ChunkDecompressor _chunkDecompressor;
+ private final ChunkCompressionType _chunkCompressionType;
+
+ private final PinotDataBuffer _metadata;
+ private final PinotDataBuffer _chunks;
+
+ public VarByteChunkSVForwardIndexReaderV4(PinotDataBuffer dataBuffer, FieldSpec.DataType valueType) {
+ if (dataBuffer.getInt(0) < VarByteChunkSVForwardIndexWriterV4.VERSION) {
+ throw new IllegalStateException("version " + dataBuffer.getInt(0) + " < "
+ + VarByteChunkSVForwardIndexWriterV4.VERSION);
+ }
+ _valueType = valueType;
+ _targetDecompressedChunkSize = dataBuffer.getInt(4);
+ _chunkCompressionType = ChunkCompressionType.valueOf(dataBuffer.getInt(8));
+ _chunkDecompressor = ChunkCompressorFactory.getDecompressor(_chunkCompressionType);
+ int chunksOffset = dataBuffer.getInt(12);
+ // the file has a BE header for compatability reasons (version selection) but the content is LE
+ _metadata = dataBuffer.view(16, chunksOffset, ByteOrder.LITTLE_ENDIAN);
+ _chunks = dataBuffer.view(chunksOffset, dataBuffer.size(), ByteOrder.LITTLE_ENDIAN);
+ }
+
+ @Override
+ public boolean isDictionaryEncoded() {
+ return false;
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ return true;
+ }
+
+ @Override
+ public FieldSpec.DataType getValueType() {
+ return _valueType;
+ }
+
+ @Override
+ public String getString(int docId, ReaderContext context) {
+ return new String(context.getValue(docId), StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public byte[] getBytes(int docId, ReaderContext context) {
+ return context.getValue(docId);
+ }
+
+ @Nullable
+ @Override
+ public ReaderContext createContext() {
+ return _chunkCompressionType == ChunkCompressionType.PASS_THROUGH
+ ? new UncompressedReaderContext(_chunks, _metadata)
+ : new CompressedReaderContext(_metadata, _chunks, _chunkDecompressor, _chunkCompressionType,
+ _targetDecompressedChunkSize);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ }
+
+ public static abstract class ReaderContext implements ForwardIndexReaderContext {
+
+ protected final PinotDataBuffer _chunks;
+ protected final PinotDataBuffer _metadata;
+ protected int _docIdOffset;
+ protected int _nextDocIdOffset;
+ protected boolean _regularChunk;
+ protected int _numDocsInCurrentChunk;
+
+ protected ReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks) {
+ _chunks = chunks;
+ _metadata = metadata;
+ }
+
+ public byte[] getValue(int docId) {
+ if (docId >= _docIdOffset && docId < _nextDocIdOffset) {
+ return readSmallUncompressedValue(docId);
+ } else {
+ try {
+ return decompressAndRead(docId);
+ } catch (IOException e) {
+ LOGGER.error("Exception caught while decompressing data chunk", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ protected long chunkIndexFor(int docId) {
+ long low = 0;
+ long high = (_metadata.size() / METADATA_ENTRY_SIZE) - 1;
+ while (low <= high) {
+ long mid = (low + high) >>> 1;
+ long position = mid * METADATA_ENTRY_SIZE;
+ int midDocId = _metadata.getInt(position) & 0x7FFFFFFF;
+ if (midDocId < docId) {
+ low = mid + 1;
+ } else if (midDocId > docId) {
+ high = mid - 1;
+ } else {
+ return position;
+ }
+ }
+ return (low - 1) * METADATA_ENTRY_SIZE;
+ }
+
+ protected abstract byte[] processChunkAndReadFirstValue(int docId, long offset, long limit)
+ throws IOException;
+
+ protected abstract byte[] readSmallUncompressedValue(int docId);
+
+ private byte[] decompressAndRead(int docId)
+ throws IOException {
+ long metadataEntry = chunkIndexFor(docId);
+ int info = _metadata.getInt(metadataEntry);
+ _docIdOffset = info & 0x7FFFFFFF;
+ _regularChunk = _docIdOffset == info;
+ long offset = _metadata.getInt(metadataEntry + Integer.BYTES) & 0xFFFFFFFFL;
+ long limit;
+ if (_metadata.size() - METADATA_ENTRY_SIZE > metadataEntry) {
+ _nextDocIdOffset = _metadata.getInt(metadataEntry + METADATA_ENTRY_SIZE) & 0x7FFFFFFF;
+ limit = _metadata.getInt(metadataEntry + METADATA_ENTRY_SIZE + Integer.BYTES) & 0xFFFFFFFFL;
+ } else {
+ _nextDocIdOffset = Integer.MAX_VALUE;
+ limit = _chunks.size();
+ }
+ return processChunkAndReadFirstValue(docId, offset, limit);
+ }
+ }
+
+ private static final class UncompressedReaderContext extends ReaderContext {
+
+ private ByteBuffer _chunk;
+
+ UncompressedReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks) {
+ super(chunks, metadata);
+ }
+
+ @Override
+ protected byte[] processChunkAndReadFirstValue(int docId, long offset, long limit) {
+ _chunk = _chunks.toDirectByteBuffer(offset, (int) (limit - offset));
+ if (!_regularChunk) {
+ return readHugeValue();
+ }
+ _numDocsInCurrentChunk = _chunk.getInt(0);
+ return readSmallUncompressedValue(docId);
+ }
+
+ private byte[] readHugeValue() {
+ byte[] value = new byte[_chunk.capacity()];
+ _chunk.get(value);
+ return value;
+ }
+
+ @Override
+ protected byte[] readSmallUncompressedValue(int docId) {
+ int index = docId - _docIdOffset;
+ int offset = _chunk.getInt((index + 1) * Integer.BYTES);
+ int nextOffset = index == _numDocsInCurrentChunk - 1
+ ? _chunk.limit()
+ : _chunk.getInt((index + 2) * Integer.BYTES);
+ byte[] bytes = new byte[nextOffset - offset];
+ _chunk.position(offset);
+ _chunk.get(bytes);
+ _chunk.position(0);
+ return bytes;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ }
+ }
+
+ private static final class CompressedReaderContext extends ReaderContext {
+
+ private final ByteBuffer _decompressedBuffer;
+ private final ChunkDecompressor _chunkDecompressor;
+ private final ChunkCompressionType _chunkCompressionType;
+
+ CompressedReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks, ChunkDecompressor chunkDecompressor,
+ ChunkCompressionType chunkCompressionType, int targetChunkSize) {
+ super(metadata, chunks);
+ _chunkDecompressor = chunkDecompressor;
+ _chunkCompressionType = chunkCompressionType;
+ _decompressedBuffer = ByteBuffer.allocateDirect(targetChunkSize).order(ByteOrder.LITTLE_ENDIAN);
+ }
+
+ @Override
+ protected byte[] processChunkAndReadFirstValue(int docId, long offset, long limit)
+ throws IOException {
+ _decompressedBuffer.clear();
+ ByteBuffer compressed = _chunks.toDirectByteBuffer(offset, (int) (limit - offset));
+ if (_regularChunk) {
+ _chunkDecompressor.decompress(compressed, _decompressedBuffer);
+ _numDocsInCurrentChunk = _decompressedBuffer.getInt(0);
+ return readSmallUncompressedValue(docId);
+ }
+ // huge value, no benefit from buffering, return the whole thing
+ return readHugeCompressedValue(compressed, _chunkDecompressor.decompressedLength(compressed));
+ }
+
+ @Override
+ protected byte[] readSmallUncompressedValue(int docId) {
+ int index = docId - _docIdOffset;
+ int offset = _decompressedBuffer.getInt((index + 1) * Integer.BYTES);
+ int nextOffset = index == _numDocsInCurrentChunk - 1
+ ? _decompressedBuffer.limit()
+ : _decompressedBuffer.getInt((index + 2) * Integer.BYTES);
+ byte[] bytes = new byte[nextOffset - offset];
+ _decompressedBuffer.position(offset);
+ _decompressedBuffer.get(bytes);
+ _decompressedBuffer.position(0);
+ return bytes;
+ }
+
+ private byte[] readHugeCompressedValue(ByteBuffer compressed, int decompressedLength)
+ throws IOException {
+ // huge values don't have length prefixes; they occupy the entire chunk so are unambiguous
+ byte[] value = new byte[decompressedLength];
+ if (_chunkCompressionType == ChunkCompressionType.SNAPPY
+ || _chunkCompressionType == ChunkCompressionType.ZSTANDARD) {
+ // snappy and zstandard don't work without direct buffers
+ decompressViaDirectBuffer(compressed, value);
+ } else {
+ _chunkDecompressor.decompress(compressed, ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN));
+ }
+ return value;
+ }
+
+ private void decompressViaDirectBuffer(ByteBuffer compressed, byte[] target)
+ throws IOException {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(target.length).order(ByteOrder.LITTLE_ENDIAN);
+ try {
+ _chunkDecompressor.decompress(compressed, buffer);
+ buffer.get(target);
+ } finally {
+ if (CleanerUtil.UNMAP_SUPPORTED) {
+ CleanerUtil.getCleaner().freeBuffer(buffer);
+ }
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ CleanerUtil.cleanQuietly(_decompressedBuffer);
+ }
+ }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
new file mode 100644
index 0000000..857e29b
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class VarByteChunkV4Test {
+
+ private static final File TEST_DIR = new File(FileUtils.getTempDirectory(), "VarByteChunkV4Test");
+
+ private File _file;
+
+ @DataProvider
+ public Object[][] params() {
+ return new Object[][]{
+ {ChunkCompressionType.LZ4, 20, 1024},
+ {ChunkCompressionType.LZ4_LENGTH_PREFIXED, 20, 1024},
+ {ChunkCompressionType.PASS_THROUGH, 20, 1024},
+ {ChunkCompressionType.SNAPPY, 20, 1024},
+ {ChunkCompressionType.ZSTANDARD, 20, 1024},
+ {ChunkCompressionType.LZ4, 2048, 1024},
+ {ChunkCompressionType.LZ4_LENGTH_PREFIXED, 2048, 1024},
+ {ChunkCompressionType.PASS_THROUGH, 2048, 1024},
+ {ChunkCompressionType.SNAPPY, 2048, 1024},
+ {ChunkCompressionType.ZSTANDARD, 2048, 1024}
+ };
+ }
+
+ @BeforeClass
+ public void forceMkDir()
+ throws IOException {
+ FileUtils.forceMkdir(TEST_DIR);
+ }
+
+ @AfterClass
+ public void deleteDir() {
+ FileUtils.deleteQuietly(TEST_DIR);
+ }
+
+ @AfterMethod
+ public void after() {
+ if (_file != null) {
+ FileUtils.deleteQuietly(_file);
+ }
+ }
+
+ @Test(dataProvider = "params")
+ public void testStringSV(ChunkCompressionType compressionType, int longestEntry, int chunkSize)
+ throws IOException {
+ _file = new File(TEST_DIR, "testStringSV");
+ testSV(compressionType, longestEntry, chunkSize, FieldSpec.DataType.STRING, x -> x,
+ VarByteChunkSVForwardIndexWriterV4::putString, (reader, context, docId) -> reader.getString(docId, context));
+ }
+
+ @Test(dataProvider = "params")
+ public void testBytesSV(ChunkCompressionType compressionType, int longestEntry, int chunkSize)
+ throws IOException {
+ _file = new File(TEST_DIR, "testBytesSV");
+ testSV(compressionType, longestEntry, chunkSize, FieldSpec.DataType.BYTES, x -> x.getBytes(StandardCharsets.UTF_8),
+ VarByteChunkSVForwardIndexWriterV4::putBytes, (reader, context, docId) -> reader.getBytes(docId, context));
+ }
+
+ private <T> void testSV(ChunkCompressionType compressionType, int longestEntry, int chunkSize,
+ FieldSpec.DataType dataType, Function<String, T> forwardMapper,
+ BiConsumer<VarByteChunkSVForwardIndexWriterV4, T> write,
+ Read<T> read)
+ throws IOException {
+ List<T> values = randomStrings(1000, longestEntry).map(forwardMapper).collect(Collectors.toList());
+ try (VarByteChunkSVForwardIndexWriterV4 writer = new VarByteChunkSVForwardIndexWriterV4(_file, compressionType,
+ chunkSize)) {
+ for (T value : values) {
+ write.accept(writer, value);
+ }
+ }
+ try (PinotDataBuffer buffer = PinotDataBuffer.mapReadOnlyBigEndianFile(_file)) {
+ try (VarByteChunkSVForwardIndexReaderV4 reader = new VarByteChunkSVForwardIndexReaderV4(buffer, dataType);
+ VarByteChunkSVForwardIndexReaderV4.ReaderContext context = reader.createContext()) {
+ for (int i = 0; i < values.size(); i++) {
+ assertEquals(read.read(reader, context, i), values.get(i));
+ }
+ for (int i = 0; i < values.size(); i += 2) {
+ assertEquals(read.read(reader, context, i), values.get(i));
+ }
+ for (int i = 1; i < values.size(); i += 2) {
+ assertEquals(read.read(reader, context, i), values.get(i));
+ }
+ for (int i = 1; i < values.size(); i += 100) {
+ assertEquals(read.read(reader, context, i), values.get(i));
+ }
+ for (int i = values.size() - 1; i >= 0; i--) {
+ assertEquals(read.read(reader, context, i), values.get(i));
+ }
+ for (int i = values.size() - 1; i >= 0; i -= 2) {
+ assertEquals(read.read(reader, context, i), values.get(i));
+ }
+ for (int i = values.size() - 2; i >= 0; i -= 2) {
+ assertEquals(read.read(reader, context, i), values.get(i));
+ }
+ for (int i = values.size() - 1; i >= 0; i -= 100) {
+ assertEquals(read.read(reader, context, i), values.get(i));
+ }
+ }
+ }
+ }
+
+ private Stream<String> randomStrings(int count, int lengthOfLongestEntry) {
+ return IntStream.range(0, count)
+ .mapToObj(i -> {
+ int length = ThreadLocalRandom.current().nextInt(lengthOfLongestEntry);
+ byte[] bytes = new byte[length];
+ if (length != 0) {
+ bytes[bytes.length - 1] = 'c';
+ if (length > 2) {
+ Arrays.fill(bytes, 1, bytes.length - 1, (byte) 'b');
+ }
+ bytes[0] = 'a';
+ }
+ return new String(bytes, StandardCharsets.UTF_8);
+ });
+ }
+
+ @FunctionalInterface
+ interface Read<T> {
+ T read(VarByteChunkSVForwardIndexReaderV4 reader, VarByteChunkSVForwardIndexReaderV4.ReaderContext context,
+ int docId);
+ }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
index 5a4775e..97d7057 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressionType.java
@@ -21,6 +21,8 @@ package org.apache.pinot.segment.spi.compression;
public enum ChunkCompressionType {
PASS_THROUGH(0), SNAPPY(1), ZSTANDARD(2), LZ4(3), LZ4_LENGTH_PREFIXED(4);
+ private static final ChunkCompressionType[] VALUES = values();
+
private final int _value;
ChunkCompressionType(int value) {
@@ -30,4 +32,11 @@ public enum ChunkCompressionType {
public int getValue() {
return _value;
}
+
+ public static ChunkCompressionType valueOf(int ordinal) {
+ if (ordinal < 0 || ordinal >= VALUES.length) {
+ throw new IllegalArgumentException("invalid ordinal " + ordinal);
+ }
+ return VALUES[ordinal];
+ }
}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CleanerUtil.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CleanerUtil.java
index 026b0a3..0469e9e 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CleanerUtil.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CleanerUtil.java
@@ -175,6 +175,16 @@ public final class CleanerUtil {
};
}
+ public static void cleanQuietly(ByteBuffer buffer) {
+ if (UNMAP_SUPPORTED && buffer != null && buffer.isDirect()) {
+ try {
+ getCleaner().freeBuffer(buffer);
+ } catch (IOException e) {
+ LOGGER.debug("failed to clean buffer", e);
+ }
+ }
+ }
+
/**
* Pass in an implementation of this interface to cleanup ByteBuffers.
* CleanerUtil implements this to allow unmapping of bytebuffers
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org