You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2020/05/29 06:05:19 UTC
[incubator-pinot] 01/02: Derive num docs per chunk from max column
value length for varbyte raw index creator (#5256)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch hotfix_chunkwriter_realtime
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 34080af27f2b94f24ea54aa488c767eff846879f
Author: Sidd <si...@gmail.com>
AuthorDate: Thu Apr 16 22:29:17 2020 -0700
Derive num docs per chunk from max column value length for varbyte raw index creator (#5256)
* Derive numDocsPerChunk from max column value length
for var byte raw forward index creator
* review comments
Co-authored-by: Siddharth Teotia <st...@steotia-mn1.linkedin.biz>
---
.../impl/v1/VarByteChunkSingleValueReader.java | 4 +-
.../impl/v1/VarByteChunkSingleValueWriter.java | 7 ++-
.../fwd/SingleValueVarByteRawIndexCreator.java | 11 +++-
.../defaultcolumn/BaseDefaultColumnHandler.java | 3 +-
.../loader/invertedindex/TextIndexHandler.java | 4 +-
.../VarByteChunkSingleValueReaderWriteTest.java | 66 ++++++++++++++++++++++
6 files changed, 85 insertions(+), 10 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java
index 1ebbe32..82fcb2b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/VarByteChunkSingleValueReader.java
@@ -42,9 +42,7 @@ public class VarByteChunkSingleValueReader extends BaseChunkSingleValueReader {
*/
public VarByteChunkSingleValueReader(PinotDataBuffer pinotDataBuffer) {
super(pinotDataBuffer);
-
- int chunkHeaderSize = _numDocsPerChunk * Integer.BYTES;
- _maxChunkSize = chunkHeaderSize + (_lengthOfLongestEntry * _numDocsPerChunk);
+ _maxChunkSize = _numDocsPerChunk * (VarByteChunkSingleValueWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + _lengthOfLongestEntry);
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java
index 2c6a299..950c003 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/v1/VarByteChunkSingleValueWriter.java
@@ -50,6 +50,7 @@ import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
@NotThreadSafe
public class VarByteChunkSingleValueWriter extends BaseChunkSingleValueWriter {
private static final int CURRENT_VERSION = 2;
+ public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES;
private final int _chunkHeaderSize;
private int _chunkHeaderOffset;
@@ -70,11 +71,11 @@ public class VarByteChunkSingleValueWriter extends BaseChunkSingleValueWriter {
throws FileNotFoundException {
super(file, compressionType, totalDocs, numDocsPerChunk,
- ((numDocsPerChunk * Integer.BYTES) + (lengthOfLongestEntry * numDocsPerChunk)), // chunkSize
+ numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + lengthOfLongestEntry), // chunkSize
lengthOfLongestEntry, CURRENT_VERSION);
_chunkHeaderOffset = 0;
- _chunkHeaderSize = numDocsPerChunk * Integer.BYTES;
+ _chunkHeaderSize = numDocsPerChunk * CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
_chunkDataOffSet = _chunkHeaderSize;
}
@@ -87,7 +88,7 @@ public class VarByteChunkSingleValueWriter extends BaseChunkSingleValueWriter {
@Override
public void setBytes(int row, byte[] bytes) {
_chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
- _chunkHeaderOffset += Integer.BYTES;
+ _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
_chunkBuffer.position(_chunkDataOffSet);
_chunkBuffer.put(bytes);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
index 34879bd..a8d7e6b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/fwd/SingleValueVarByteRawIndexCreator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.segment.creator.impl.fwd;
+import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
@@ -27,7 +28,7 @@ import org.apache.pinot.core.segment.creator.impl.V1Constants;
public class SingleValueVarByteRawIndexCreator extends BaseSingleValueRawIndexCreator {
- private static final int NUM_DOCS_PER_CHUNK = 1000; // TODO: Auto-derive this based on metadata.
+ private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
private final VarByteChunkSingleValueWriter _indexWriter;
@@ -35,7 +36,13 @@ public class SingleValueVarByteRawIndexCreator extends BaseSingleValueRawIndexCr
String column, int totalDocs, int maxLength)
throws IOException {
File file = new File(baseIndexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
- _indexWriter = new VarByteChunkSingleValueWriter(file, compressionType, totalDocs, NUM_DOCS_PER_CHUNK, maxLength);
+ _indexWriter = new VarByteChunkSingleValueWriter(file, compressionType, totalDocs, getNumDocsPerChunk(maxLength), maxLength);
+ }
+
+ @VisibleForTesting
+ public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
+ int overheadPerEntry = lengthOfLongestEntry + VarByteChunkSingleValueWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+ return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 447489d..df8b6cf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.segment.index.loader.defaultcolumn;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -338,7 +339,7 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
int totalDocs = _segmentMetadata.getTotalDocs();
Object defaultValue = fieldSpec.getDefaultNullValue();
String stringDefaultValue = (String) defaultValue;
- int lengthOfLongestEntry = stringDefaultValue.length();
+ int lengthOfLongestEntry = StringUtil.encodeUtf8(stringDefaultValue).length;
int dictionaryElementSize = 0;
SingleValueVarByteRawIndexCreator rawIndexCreator =
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java
index 1c1786f..a11b25c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/TextIndexHandler.java
@@ -44,6 +44,7 @@ import javax.annotation.Nonnull;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.io.reader.DataFileReader;
+import org.apache.pinot.core.io.reader.impl.ChunkReaderContext;
import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
import org.apache.pinot.core.segment.creator.TextIndexType;
import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneTextIndexCreator;
@@ -162,8 +163,9 @@ public class TextIndexHandler {
try (LuceneTextIndexCreator textIndexCreator = new LuceneTextIndexCreator(column, segmentDirectory, true)) {
try (DataFileReader forwardIndexReader = getForwardIndexReader(columnMetadata)) {
VarByteChunkSingleValueReader forwardIndex = (VarByteChunkSingleValueReader) forwardIndexReader;
+ ChunkReaderContext readerContext = forwardIndex.createContext();
for (int docID = 0; docID < numDocs; docID++) {
- Object docToAdd = forwardIndex.getString(docID);
+ Object docToAdd = forwardIndex.getString(docID, readerContext);
textIndexCreator.addDoc(docToAdd, docID);
}
textIndexCreator.seal();
diff --git a/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java
index d0d62e3..659bbb2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/index/readerwriter/VarByteChunkSingleValueReaderWriteTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
import org.apache.pinot.core.io.reader.impl.ChunkReaderContext;
import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
import org.apache.pinot.core.io.writer.impl.v1.VarByteChunkSingleValueWriter;
+import org.apache.pinot.core.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -136,4 +137,69 @@ public class VarByteChunkSingleValueReaderWriteTest {
}
}
}
+
+ @Test
+ public void testVarCharWithDifferentSizes() throws Exception {
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 10, 1000);
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 10, 1000);
+
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 100, 1000);
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 100, 1000);
+
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 1000, 1000);
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 1000, 1000);
+
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 10000, 100);
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 10000, 100);
+
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 100000, 10);
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 100000, 10);
+
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 1000000, 10);
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 1000000, 10);
+
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.SNAPPY, 2000000, 10);
+ testLargeVarcharHelper(ChunkCompressorFactory.CompressionType.PASS_THROUGH, 2000000, 10);
+ }
+
+ private void testLargeVarcharHelper(ChunkCompressorFactory.CompressionType compressionType, int numChars, int numDocs)
+ throws Exception {
+ String[] expected = new String[numDocs];
+ Random random = new Random();
+
+ File outFile = new File(TEST_FILE);
+ FileUtils.deleteQuietly(outFile);
+
+ int maxStringLengthInBytes = 0;
+ for (int i = 0; i < numDocs; i++) {
+ expected[i] = RandomStringUtils.random(random.nextInt(numChars));
+ maxStringLengthInBytes = Math.max(maxStringLengthInBytes, expected[i].getBytes(UTF_8).length);
+ }
+
+ int numDocsPerChunk = SingleValueVarByteRawIndexCreator.getNumDocsPerChunk(maxStringLengthInBytes);
+ VarByteChunkSingleValueWriter writer =
+ new VarByteChunkSingleValueWriter(outFile, compressionType, numDocs, numDocsPerChunk,
+ maxStringLengthInBytes);
+
+ for (int i = 0; i < numDocs; i += 2) {
+ writer.setString(i, expected[i]);
+ writer.setBytes(i + 1, expected[i].getBytes(UTF_8));
+ }
+
+ writer.close();
+
+ try (VarByteChunkSingleValueReader reader = new VarByteChunkSingleValueReader(
+ PinotDataBuffer.mapReadOnlyBigEndianFile(outFile))) {
+ ChunkReaderContext context = reader.createContext();
+
+ for (int i = 0; i < numDocs; i += 2) {
+ String actual = reader.getString(i, context);
+ Assert.assertEquals(actual, expected[i]);
+ Assert.assertEquals(actual.getBytes(UTF_8), expected[i].getBytes(UTF_8));
+ Assert.assertEquals(reader.getBytes(i + 1), expected[i].getBytes(UTF_8));
+ }
+ }
+
+ FileUtils.deleteQuietly(outFile);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org