You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2023/03/15 01:14:24 UTC
[druid] branch master updated: better FrontCodedIndexed (#13854)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ed57c5c853 better FrontCodedIndexed (#13854)
ed57c5c853 is described below
commit ed57c5c8539bcd61a50cca6a337761307fd8e540
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Tue Mar 14 18:14:11 2023 -0700
better FrontCodedIndexed (#13854)
* Adds new implementation of 'frontCoded' string encoding strategy, which writes out a v1 FrontCodedIndexed which stores buckets on a prefix of the previous value instead of the first value in the bucket
---
.../benchmark/FrontCodedIndexedBenchmark.java | 68 +++++-
.../segment/column/StringEncodingStrategies.java | 4 +-
.../segment/column/StringEncodingStrategy.java | 12 +-
.../druid/segment/data/FrontCodedIndexed.java | 261 ++++++++++++++++++---
.../segment/data/FrontCodedIndexedWriter.java | 156 +++++++++++-
.../druid/segment/data/FrontCodedIndexedTest.java | 53 +++--
6 files changed, 486 insertions(+), 68 deletions(-)
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java
index 3065663065..00be920253 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java
@@ -86,19 +86,29 @@ public class FrontCodedIndexedBenchmark
@Param({"16"})
public int width;
- @Param({"generic", "front-coded-4", "front-coded-16"})
+ @Param({
+ "generic",
+ "front-coded-4",
+ "front-coded-16",
+ "front-coded-incremental-buckets-4",
+ "front-coded-incremental-buckets-16"
+ })
public String indexType;
@Param({"10000"})
public int numOperations;
private File fileFrontCoded;
+ private File fileFrontCodedIncrementalBuckets;
private File fileGeneric;
private File smooshDirFrontCoded;
+ private File smooshDirFrontCodedIncrementalBuckets;
private File smooshDirGeneric;
private GenericIndexed<ByteBuffer> genericIndexed;
private FrontCodedIndexed frontCodedIndexed;
+ private FrontCodedIndexed frontCodedIndexedIncrementalBuckets;
+
private Indexed<ByteBuffer> indexed;
private String[] values;
@@ -128,31 +138,54 @@ public class FrontCodedIndexedBenchmark
FrontCodedIndexedWriter frontCodedIndexedWriter = new FrontCodedIndexedWriter(
new OnHeapMemorySegmentWriteOutMedium(),
ByteOrder.nativeOrder(),
- "front-coded-4".equals(indexType) ? 4 : 16
+ "front-coded-4".equals(indexType) ? 4 : 16,
+ false
);
frontCodedIndexedWriter.open();
+ FrontCodedIndexedWriter frontCodedIndexedWriterIncrementalBuckets = new FrontCodedIndexedWriter(
+ new OnHeapMemorySegmentWriteOutMedium(),
+ ByteOrder.nativeOrder(),
+ "front-coded-incremental-buckets-4".equals(indexType) ? 4 : 16,
+ true
+ );
+ frontCodedIndexedWriterIncrementalBuckets.open();
+
int count = 0;
while (iterator.hasNext()) {
final String next = iterator.next();
values[count++] = next;
frontCodedIndexedWriter.write(StringUtils.toUtf8Nullable(next));
genericIndexedWriter.write(next);
+ frontCodedIndexedWriterIncrementalBuckets.write(StringUtils.toUtf8Nullable(next));
}
smooshDirFrontCoded = FileUtils.createTempDir();
fileFrontCoded = File.createTempFile("frontCodedIndexedBenchmark", "meta");
+
smooshDirGeneric = FileUtils.createTempDir();
fileGeneric = File.createTempFile("genericIndexedBenchmark", "meta");
+ smooshDirFrontCodedIncrementalBuckets = FileUtils.createTempDir();
+ fileFrontCodedIncrementalBuckets = File.createTempFile("frontCodedIndexedBenchmarkIncrementalBuckets", "meta");
+
EncodingSizeProfiler.encodedSize = (int) ("generic".equals(indexType)
? genericIndexedWriter.getSerializedSize()
- : frontCodedIndexedWriter.getSerializedSize());
+ : indexType.startsWith("front-coded-incremental-buckets")
+ ? frontCodedIndexedWriterIncrementalBuckets.getSerializedSize()
+ : frontCodedIndexedWriter.getSerializedSize());
try (
FileChannel fileChannelFrontCoded = FileChannel.open(
fileFrontCoded.toPath(),
StandardOpenOption.CREATE, StandardOpenOption.WRITE
);
FileSmoosher fileSmoosherFrontCoded = new FileSmoosher(smooshDirFrontCoded);
+
+ FileChannel fileChannelFrontCodedIncrementalBuckets = FileChannel.open(
+ fileFrontCodedIncrementalBuckets.toPath(),
+ StandardOpenOption.CREATE, StandardOpenOption.WRITE
+ );
+ FileSmoosher fileSmoosherFrontCodedIncrementalBuckets = new FileSmoosher(smooshDirFrontCodedIncrementalBuckets);
+
FileChannel fileChannelGeneric = FileChannel.open(
fileGeneric.toPath(),
StandardOpenOption.CREATE, StandardOpenOption.WRITE
@@ -161,6 +194,10 @@ public class FrontCodedIndexedBenchmark
) {
frontCodedIndexedWriter.writeTo(fileChannelFrontCoded, fileSmoosherFrontCoded);
genericIndexedWriter.writeTo(fileChannelGeneric, fileSmoosherGeneric);
+ frontCodedIndexedWriterIncrementalBuckets.writeTo(
+ fileChannelFrontCodedIncrementalBuckets,
+ fileSmoosherFrontCodedIncrementalBuckets
+ );
}
FileChannel fileChannelGeneric = FileChannel.open(fileGeneric.toPath());
@@ -172,6 +209,13 @@ public class FrontCodedIndexedBenchmark
fileFrontCoded.length()
);
+ FileChannel fileChannelFrontCodedIncrementalBuckets = FileChannel.open(fileFrontCodedIncrementalBuckets.toPath());
+ MappedByteBuffer byteBufferFrontCodedIncrementalBuckets = fileChannelFrontCodedIncrementalBuckets.map(
+ FileChannel.MapMode.READ_ONLY,
+ 0,
+ fileFrontCodedIncrementalBuckets.length()
+ );
+
genericIndexed = GenericIndexed.read(
byteBufferGeneric,
GenericIndexed.UTF8_STRATEGY,
@@ -181,19 +225,29 @@ public class FrontCodedIndexedBenchmark
byteBufferFrontCoded.order(ByteOrder.nativeOrder()),
ByteOrder.nativeOrder()
).get();
+ frontCodedIndexedIncrementalBuckets = FrontCodedIndexed.read(
+ byteBufferFrontCodedIncrementalBuckets.order(ByteOrder.nativeOrder()),
+ ByteOrder.nativeOrder()
+ ).get();
// sanity test
for (int i = 0; i < numElements; i++) {
final String expected = StringUtils.fromUtf8Nullable(genericIndexed.get(i));
final String actual = StringUtils.fromUtf8Nullable(frontCodedIndexed.get(i));
+ final String actual2 = StringUtils.fromUtf8Nullable(frontCodedIndexedIncrementalBuckets.get(i));
Preconditions.checkArgument(
Objects.equals(expected, actual),
"elements not equal: " + i + " " + expected + " " + actual
);
+ Preconditions.checkArgument(
+ Objects.equals(expected, actual2),
+ "elements not equal (incremental buckets): " + i + " " + expected + " " + actual
+ );
}
Iterator<ByteBuffer> genericIterator = genericIndexed.iterator();
Iterator<ByteBuffer> frontCodedIterator = frontCodedIndexed.iterator();
+ Iterator<ByteBuffer> frontCodedIteratorIncrementalBuckets = frontCodedIndexedIncrementalBuckets.iterator();
Iterator<String> frontCodedStringIterator =
new StringEncodingStrategies.Utf8ToStringIndexed(frontCodedIndexed).iterator();
@@ -202,6 +256,7 @@ public class FrontCodedIndexedBenchmark
final String expected = StringUtils.fromUtf8Nullable(genericIterator.next());
final String actual = StringUtils.fromUtf8Nullable(frontCodedIterator.next());
final String actual2 = frontCodedStringIterator.next();
+ final String actual3 = StringUtils.fromUtf8Nullable(frontCodedIteratorIncrementalBuckets.next());
Preconditions.checkArgument(
Objects.equals(expected, actual),
"elements not equal: " + counter + " " + expected + " " + actual
@@ -210,11 +265,16 @@ public class FrontCodedIndexedBenchmark
Objects.equals(expected, actual2),
"elements not equal: " + counter + " " + expected + " " + actual
);
+ Preconditions.checkArgument(
+ Objects.equals(expected, actual3),
+ "elements not equal: " + counter + " " + expected + " " + actual
+ );
counter++;
}
Preconditions.checkArgument(counter == numElements);
Preconditions.checkArgument(genericIterator.hasNext() == frontCodedIterator.hasNext());
Preconditions.checkArgument(genericIterator.hasNext() == frontCodedStringIterator.hasNext());
+ Preconditions.checkArgument(genericIterator.hasNext() == frontCodedIteratorIncrementalBuckets.hasNext());
elementsToSearch = new String[numOperations];
for (int i = 0; i < numOperations; i++) {
@@ -226,6 +286,8 @@ public class FrontCodedIndexedBenchmark
}
if ("generic".equals(indexType)) {
indexed = genericIndexed.singleThreaded();
+ } else if (indexType.startsWith("front-coded-incremental-buckets")) {
+ indexed = frontCodedIndexedIncrementalBuckets;
} else {
indexed = frontCodedIndexed;
}
diff --git a/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java b/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java
index 2427658b66..0c04f0288c 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java
@@ -53,10 +53,12 @@ public class StringEncodingStrategies
// writing the dictionary itself
DictionaryWriter<byte[]> writer;
if (StringEncodingStrategy.FRONT_CODED.equals(encodingStrategy.getType())) {
+ StringEncodingStrategy.FrontCoded strategy = (StringEncodingStrategy.FrontCoded) encodingStrategy;
writer = new FrontCodedIndexedWriter(
writeoutMedium,
IndexIO.BYTE_ORDER,
- ((StringEncodingStrategy.FrontCoded) encodingStrategy).getBucketSize()
+ strategy.getBucketSize(),
+ true
);
} else {
throw new ISE("Unknown encoding strategy: %s", encodingStrategy.getType());
diff --git a/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategy.java
index fedabb206a..e850b0c362 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategy.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategy.java
@@ -86,12 +86,6 @@ public interface StringEncodingStrategy
{
public static final int DEFAULT_BUCKET_SIZE = 4;
- @JsonProperty
- public int getBucketSize()
- {
- return bucketSize;
- }
-
@JsonProperty
private final int bucketSize;
@@ -106,6 +100,12 @@ public interface StringEncodingStrategy
}
}
+ @JsonProperty
+ public int getBucketSize()
+ {
+ return bucketSize;
+ }
+
@Override
public String getType()
{
diff --git a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java
index 2596f7ec2b..cc0f280405 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment.data;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
+import org.apache.druid.annotations.SuppressFBWarnings;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@@ -38,37 +39,39 @@ import java.util.NoSuchElementException;
* sorted and unique, using 'front coding'. Front coding is a type of delta encoding for byte arrays, where sorted
* values are grouped into buckets. The first value of the bucket is written entirely, and remaining values are stored
* as a pair of an integer which indicates how much of the first byte array of the bucket to use as a prefix, followed
- * by the remaining bytes after the prefix to complete the value.
- *
+ * by the remaining bytes after the prefix to complete the value. If using 'incremental' buckets, instead of using the
+ * prefix of the first bucket value, instead the prefix is computed against the immediately preceding value in the
+ * bucket.
+ * <p>
* front coded indexed layout:
* | version | bucket size | has null? | number of values | size of "offsets" + "buckets" | "offsets" | "buckets" |
* | ------- | ----------- | --------- | ---------------- | ----------------------------- | --------- | --------- |
* | byte | byte | byte | vbyte int | vbyte int | int[] | bucket[] |
- *
+ * <p>
* "offsets" are the ending offsets of each bucket stored in order, stored as plain integers for easy random access.
- *
+ * <p>
* bucket layout:
* | first value | prefix length | fragment | ... | prefix length | fragment |
* | ----------- | ------------- | -------- | --- | ------------- | -------- |
* | blob | vbyte int | blob | ... | vbyte int | blob |
- *
+ * <p>
* blob layout:
* | blob length | blob bytes |
* | ----------- | ---------- |
* | vbyte int | byte[] |
- *
- *
+ * <p>
+ * <p>
* Getting a value first picks the appropriate bucket, finds its offset in the underlying buffer, then scans the bucket
* values to seek to the correct position of the value within the bucket in order to reconstruct it using the prefix
* length.
- *
+ * <p>
* Finding the index of a value involves binary searching the first values of each bucket to find the correct bucket,
* then a linear scan within the bucket to find the matching value (or negative insertion point -1 for values that
* are not present).
- *
+ * <p>
* The value iterator reads an entire bucket at a time, reconstructing the values into an array to iterate within the
* bucket before moving onto the next bucket as the iterator is consumed.
- *
+ * <p>
* This class is not thread-safe since during operation modifies positions of a shared buffer.
*/
public final class FrontCodedIndexed implements Indexed<ByteBuffer>
@@ -77,7 +80,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
{
final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering);
final byte version = orderedBuffer.get();
- Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + version);
+ Preconditions.checkArgument(version == 0 || version == 1, "only V0 and V1 exist, encountered " + version);
final int bucketSize = Byte.toUnsignedInt(orderedBuffer.get());
final boolean hasNull = NullHandling.IS_NULL_BYTE == orderedBuffer.get();
final int numValues = VByte.readInt(orderedBuffer);
@@ -93,7 +96,8 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
bucketSize,
numValues,
hasNull,
- offsetsPosition
+ offsetsPosition,
+ version
);
}
@@ -101,6 +105,8 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
private final int adjustedNumValues;
private final int adjustIndex;
private final int bucketSize;
+ private final int[] unwindPrefixLength;
+ private final int[] unwindBufferPosition;
private final int numBuckets;
private final int div;
private final int rem;
@@ -109,13 +115,19 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
private final boolean hasNull;
private final int lastBucketNumValues;
+ private final GetBucketValue getBucketValueFn;
+ private final ReadBucket readBucketFn;
+ private final FindInBucket findInBucketFn;
+
+ @SuppressFBWarnings(value = "NP_STORE_INTO_NONNULL_FIELD", justification = "V0 does not use unwindPrefixLength or unwindBufferPosition")
private FrontCodedIndexed(
ByteBuffer buffer,
ByteOrder order,
int bucketSize,
int numValues,
boolean hasNull,
- int offsetsPosition
+ int offsetsPosition,
+ byte version
)
{
if (Integer.bitCount(bucketSize) != 1) {
@@ -133,6 +145,23 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
this.lastBucketNumValues = (numValues & rem) == 0 ? bucketSize : numValues & rem;
this.offsetsPosition = offsetsPosition;
this.bucketsPosition = offsetsPosition + ((numBuckets - 1) * Integer.BYTES);
+ if (version == 0) {
+ // version zero, all prefixes are computed against the first value in the bucket
+ this.getBucketValueFn = FrontCodedIndexed::getFromBucketV0;
+ this.readBucketFn = FrontCodedIndexed::readBucketV0;
+ this.findInBucketFn = this::findValueInBucketV0;
+ //noinspection DataFlowIssue
+ this.unwindPrefixLength = null;
+ //noinspection DataFlowIssue
+ this.unwindBufferPosition = null;
+ } else {
+ // version one uses 'incremental' buckets, where the prefix is computed against the previous value
+ this.unwindPrefixLength = new int[bucketSize];
+ this.unwindBufferPosition = new int[bucketSize];
+ this.getBucketValueFn = this::getFromBucketV1;
+ this.readBucketFn = this::readBucketV1;
+ this.findInBucketFn = this::findValueInBucketV1;
+ }
}
@Override
@@ -158,7 +187,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
final int bucketIndex = adjustedIndex & rem;
final int offset = getBucketOffset(bucket);
buffer.position(offset);
- return getFromBucket(buffer, bucketIndex);
+ return getBucketValueFn.get(buffer, bucketIndex);
}
@Override
@@ -216,7 +245,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
// find the value in the bucket (or where it would be if it were present)
buffer.position(firstOffset + firstLength);
- return findValueInBucket(value, currBucketFirstValueIndex, bucketSize, sharedPrefix);
+ return findInBucketFn.find(value, currBucketFirstValueIndex, bucketSize, sharedPrefix);
} else if (comparison < 0) {
minBucketIndex = currentBucket + 1;
} else {
@@ -251,12 +280,12 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
if (comparison > 0) {
// value preceedes bucket, so bail out
- return -(bucketIndexBase + adjustIndex) - 1;
+ return ~(bucketIndexBase + adjustIndex);
}
buffer.position(firstOffset + firstLength);
- return findValueInBucket(value, bucketIndexBase, numValuesInBucket, sharedPrefix);
+ return findInBucketFn.find(value, bucketIndexBase, numValuesInBucket, sharedPrefix);
}
@Override
@@ -274,7 +303,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
}
ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
copy.position(bucketsPosition);
- final ByteBuffer[] firstBucket = readBucket(copy, numBuckets > 1 ? bucketSize : lastBucketNumValues);
+ final ByteBuffer[] firstBucket = readBucketFn.readBucket(copy, numBuckets > 1 ? bucketSize : lastBucketNumValues);
// iterator decodes and buffers a bucket at a time, paging through buckets as the iterator is consumed
return new Iterator<ByteBuffer>()
{
@@ -305,7 +334,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
if (bucketNum != currentBucketIndex) {
final int offset = copy.getInt(offsetsPosition + ((bucketNum - 1) * Integer.BYTES));
copy.position(bucketsPosition + offset);
- currentBucket = readBucket(
+ currentBucket = readBucketFn.readBucket(
copy,
bucketNum < (numBuckets - 1) ? bucketSize : lastBucketNumValues
);
@@ -345,7 +374,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
* MUST be prepared before calling, as it expects the length of the first value to have already been read externally,
* and the buffer position to be at the start of the first bucket value. The final buffer position will be the
* 'shared prefix length' of the first value in the bucket and the value to compare.
- *
+ * <p>
* Bytes are compared using {@link StringUtils#compareUtf8UsingJavaStringOrdering(byte, byte)}. Therefore, when the
* values are UTF-8 encoded strings, the ordering is compatible with {@link String#compareTo(String)}.
*/
@@ -372,17 +401,17 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
* Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against
* and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is
* the length which the value has in common with the first value of the bucket.
- *
+ * <p>
* This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when
* possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always
* sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment
* with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values
* which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a
* full comparison if the prefix length is the same
- *
+ * <p>
* this method modifies the position of {@link #buffer}
*/
- private int findValueInBucket(
+ private int findValueInBucketV0(
ByteBuffer value,
int currBucketFirstValueIndex,
int bucketSize,
@@ -396,6 +425,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
while (++relativePosition < bucketSize) {
prefixLength = VByte.readInt(buffer);
if (prefixLength > sharedPrefix) {
+ // this value shares more in common with the first value, so the value we are looking for comes after
final int skip = VByte.readInt(buffer);
buffer.position(buffer.position() + skip);
insertionPoint++;
@@ -430,20 +460,20 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
}
}
// (-(insertion point) - 1)
- return -(currBucketFirstValueIndex + adjustIndex) + (-(insertionPoint) - 1);
+ return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
}
/**
* Get a value from a bucket at a relative position.
- *
+ * <p>
* This method modifies the position of the buffer.
*/
- static ByteBuffer getFromBucket(ByteBuffer buffer, int offset)
+ static ByteBuffer getFromBucketV0(ByteBuffer buffer, int offset)
{
int prefixPosition;
if (offset == 0) {
final int length = VByte.readInt(buffer);
- final ByteBuffer firstValue = buffer.asReadOnlyBuffer().order(buffer.order());
+ final ByteBuffer firstValue = buffer.asReadOnlyBuffer();
firstValue.limit(firstValue.position() + length);
return firstValue;
} else {
@@ -485,10 +515,10 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
/**
* Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes.
- *
+ * <p>
* This method modifies the position of the buffer.
*/
- private static ByteBuffer[] readBucket(ByteBuffer bucket, int numValues)
+ private static ByteBuffer[] readBucketV0(ByteBuffer bucket, int numValues)
{
final int length = VByte.readInt(bucket);
final byte[] prefixBytes = new byte[length];
@@ -509,4 +539,177 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
}
return bucketBuffers;
}
+
+ /**
+ * Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against
+ * and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is
+ * the length which the value has in common with the previous value of the bucket.
+ * <p>
+ * This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when
+ * possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always
+ * sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment
+ * with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values
+ * which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a
+ * full comparison if the prefix length is the same
+ * <p>
+ * this method modifies the position of {@link #buffer}
+ */
+ private int findValueInBucketV1(
+ ByteBuffer value,
+ int currBucketFirstValueIndex,
+ int bucketSize,
+ int sharedPrefixLength
+ )
+ {
+ int relativePosition = 0;
+ int prefixLength;
+ // scan through bucket values until we find match or compare numValues
+ int insertionPoint = 1;
+ while (++relativePosition < bucketSize) {
+ prefixLength = VByte.readInt(buffer);
+ if (prefixLength > sharedPrefixLength) {
+ // bucket value shares more in common with the preceding value, so the value we are looking for comes after
+ final int skip = VByte.readInt(buffer);
+ buffer.position(buffer.position() + skip);
+ insertionPoint++;
+ } else if (prefixLength < sharedPrefixLength) {
+ // bucket value prefix is smaller, that means the value we are looking for sorts ahead of it
+ break;
+ } else {
+ // value has the same shared prefix, so compare additional values to find
+ final int fragmentLength = VByte.readInt(buffer);
+ final int common = Math.min(fragmentLength, value.remaining() - prefixLength);
+ int fragmentComparison = 0;
+ boolean shortCircuit = false;
+ for (int i = 0; i < common; i++) {
+ fragmentComparison = StringUtils.compareUtf8UsingJavaStringOrdering(
+ buffer.get(buffer.position() + i),
+ value.get(prefixLength + i)
+ );
+ if (fragmentComparison != 0) {
+ sharedPrefixLength = prefixLength + i;
+ shortCircuit = true;
+ break;
+ }
+ }
+ if (fragmentComparison == 0) {
+ fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining());
+ }
+
+ if (fragmentComparison == 0) {
+ return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
+ } else if (fragmentComparison < 0) {
+ // value we are looking for is longer than the current bucket value, continue on
+ if (!shortCircuit) {
+ sharedPrefixLength = prefixLength + common;
+ }
+ buffer.position(buffer.position() + fragmentLength);
+ insertionPoint++;
+ } else {
+ break;
+ }
+ }
+ }
+ // (-(insertion point) - 1)
+ return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
+ }
+
+ private ByteBuffer getFromBucketV1(ByteBuffer buffer, int offset)
+ {
+ // first value is written whole
+ final int length = VByte.readInt(buffer);
+ if (offset == 0) {
+ // return first value directly from underlying buffer since it is stored whole
+ final ByteBuffer value = buffer.asReadOnlyBuffer();
+ value.limit(value.position() + length);
+ return value;
+ }
+ int pos = 0;
+ int prefixLength;
+ int fragmentLength;
+ unwindPrefixLength[pos] = 0;
+ unwindBufferPosition[pos] = buffer.position();
+
+ buffer.position(buffer.position() + length);
+ do {
+ prefixLength = VByte.readInt(buffer);
+ if (++pos < offset) {
+ // not there yet, no need to read anything other than the length to skip ahead
+ final int skipLength = VByte.readInt(buffer);
+ unwindPrefixLength[pos] = prefixLength;
+ unwindBufferPosition[pos] = buffer.position();
+ buffer.position(buffer.position() + skipLength);
+ } else {
+ // we've reached our destination
+ fragmentLength = VByte.readInt(buffer);
+ if (prefixLength == 0) {
+ // no prefix, return it directly from the underlying buffer
+ final ByteBuffer value = buffer.asReadOnlyBuffer();
+ value.limit(value.position() + fragmentLength);
+ return value;
+ }
+ break;
+ }
+ } while (true);
+ final int valueLength = prefixLength + fragmentLength;
+ final byte[] valueBytes = new byte[valueLength];
+ buffer.get(valueBytes, prefixLength, fragmentLength);
+ for (int i = prefixLength; i > 0;) {
+ // previous value had a larger prefix than or the same as the value we are looking for
+ // skip it since the fragment doesn't have anything we need
+ if (unwindPrefixLength[--pos] >= i) {
+ continue;
+ }
+ buffer.position(unwindBufferPosition[pos]);
+ buffer.get(valueBytes, unwindPrefixLength[pos], i - unwindPrefixLength[pos]);
+ i = unwindPrefixLength[pos];
+ }
+ return ByteBuffer.wrap(valueBytes);
+ }
+
+ /**
+ * Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes.
+ * <p>
+ * This method modifies the position of the buffer.
+ */
+ private ByteBuffer[] readBucketV1(ByteBuffer bucket, int numValues)
+ {
+ final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues];
+
+ // first value is written whole
+ final int length = VByte.readInt(bucket);
+ byte[] prefixBytes = new byte[length];
+ bucket.get(prefixBytes, 0, length);
+ bucketBuffers[0] = ByteBuffer.wrap(prefixBytes);
+ int pos = 1;
+ while (pos < numValues) {
+ final int prefixLength = VByte.readInt(bucket);
+ final int fragmentLength = VByte.readInt(bucket);
+ byte[] nextValueBytes = new byte[prefixLength + fragmentLength];
+ System.arraycopy(prefixBytes, 0, nextValueBytes, 0, prefixLength);
+ bucket.get(nextValueBytes, prefixLength, fragmentLength);
+ final ByteBuffer value = ByteBuffer.wrap(nextValueBytes);
+ prefixBytes = nextValueBytes;
+ bucketBuffers[pos++] = value;
+ }
+ return bucketBuffers;
+ }
+
+ @FunctionalInterface
+ interface GetBucketValue
+ {
+ ByteBuffer get(ByteBuffer buffer, int offset);
+ }
+
+ @FunctionalInterface
+ interface ReadBucket
+ {
+ ByteBuffer[] readBucket(ByteBuffer buffer, int bucketSize);
+ }
+
+ @FunctionalInterface
+ interface FindInBucket
+ {
+ int find(ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, int sharedPrefixLength);
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java
index bcbe47db62..c5a26f3a59 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java
@@ -42,7 +42,8 @@ import java.nio.channels.WritableByteChannel;
*
* Front coding is a type of delta encoding for byte arrays, where values are grouped into buckets. The first value of
* the bucket is written entirely, and remaining values are stored as pairs of an integer which indicates how much
- * of the first byte array of the bucket to use as a prefix, followed by the remaining value bytes after the prefix.
+ * of the first byte array of the bucket to use as a prefix, (or the preceding value of the bucket if using
+ * 'incremental' buckets) followed by the remaining value bytes after the prefix.
*
* This writer is designed for use with UTF-8 encoded strings that are written in an order compatible with
* {@link String#compareTo(String)}.
@@ -58,6 +59,7 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
private final byte[][] bucketBuffer;
private final ByteBuffer getOffsetBuffer;
private final int div;
+ private final boolean useIncrementalBuckets;
@Nullable
private byte[] prevObject = null;
@@ -71,10 +73,12 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
private boolean isClosed = false;
private boolean hasNulls = false;
+
public FrontCodedIndexedWriter(
SegmentWriteOutMedium segmentWriteOutMedium,
ByteOrder byteOrder,
- int bucketSize
+ int bucketSize,
+ boolean useIncrementalBuckets
)
{
if (Integer.bitCount(bucketSize) != 1 || bucketSize < 1 || bucketSize > 128) {
@@ -87,6 +91,7 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
this.bucketBuffer = new byte[bucketSize][];
this.getOffsetBuffer = ByteBuffer.allocate(Integer.BYTES).order(byteOrder);
this.div = Integer.numberOfTrailingZeros(bucketSize);
+ this.useIncrementalBuckets = useIncrementalBuckets;
}
@Override
@@ -119,7 +124,9 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
int written;
// write the bucket, growing scratch buffer as necessary
do {
- written = writeBucket(scratch, bucketBuffer, bucketSize);
+ written = useIncrementalBuckets
+ ? writeIncrementalBucket(scratch, bucketBuffer, bucketSize)
+ : writeBucket(scratch, bucketBuffer, bucketSize);
if (written < 0) {
growScratch();
}
@@ -163,8 +170,14 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
flush();
}
resetScratch();
- // version 0
- scratch.put((byte) 0);
+
+ if (useIncrementalBuckets) {
+ // version 1 is incremental buckets
+ scratch.put((byte) 1);
+ } else {
+ // version 0 all values are prefixed on first bucket value
+ scratch.put((byte) 0);
+ }
scratch.put((byte) bucketSize);
scratch.put(hasNulls ? NullHandling.IS_NULL_BYTE : NullHandling.IS_NOT_NULL_BYTE);
VByte.writeInt(scratch, numWritten);
@@ -202,14 +215,16 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
startOffset = getBucketOffset(bucket - 1);
}
long endOffset = getBucketOffset(bucket);
- int bucketSize = Ints.checkedCast(endOffset - startOffset);
- if (bucketSize == 0) {
+ int bucketBytesSize = Ints.checkedCast(endOffset - startOffset);
+ if (bucketBytesSize == 0) {
return null;
}
- final ByteBuffer bucketBuffer = ByteBuffer.allocate(bucketSize).order(byteOrder);
+ final ByteBuffer bucketBuffer = ByteBuffer.allocate(bucketBytesSize).order(byteOrder);
valuesOut.readFully(startOffset, bucketBuffer);
bucketBuffer.clear();
- final ByteBuffer valueBuffer = FrontCodedIndexed.getFromBucket(bucketBuffer, relativeIndex);
+ final ByteBuffer valueBuffer = useIncrementalBuckets
+ ? getFromBucketV1(bucketBuffer, relativeIndex, bucketSize)
+ : FrontCodedIndexed.getFromBucketV0(bucketBuffer, relativeIndex);
final byte[] valueBytes = new byte[valueBuffer.limit() - valueBuffer.position()];
valueBuffer.get(valueBytes);
return valueBytes;
@@ -232,7 +247,10 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
resetScratch();
int written;
do {
- written = writeBucket(scratch, bucketBuffer, remainder == 0 ? bucketSize : remainder);
+ int flushSize = remainder == 0 ? bucketSize : remainder;
+ written = useIncrementalBuckets
+ ? writeIncrementalBucket(scratch, bucketBuffer, flushSize)
+ : writeBucket(scratch, bucketBuffer, flushSize);
if (written < 0) {
growScratch();
}
@@ -308,6 +326,57 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
return written;
}
+ /**
+ * Write bucket of values to a {@link ByteBuffer}. The first value is written completely, subsequent values are
+ * written with an integer to indicate how much of the preceding value in the bucket is a prefix of the value,
+ * followed by the remaining bytes of the value.
+ *
+ * Uses {@link VByte} encoded integers to indicate prefix length and value length.
+ */
+ public static int writeIncrementalBucket(ByteBuffer buffer, byte[][] values, int numValues)
+ {
+ int written = 0;
+ byte[] prev = null;
+ while (written < numValues) {
+ byte[] next = values[written];
+ if (written == 0) {
+ prev = next;
+ // the first value in the bucket is written completely as it is
+ int rem = writeValue(buffer, prev);
+ // wasn't enough room, bail out
+ if (rem < 0) {
+ return rem;
+ }
+ } else {
+ // all other values must be partitioned into a prefix length and suffix bytes
+ int prefixLength = 0;
+ for (; prefixLength < prev.length; prefixLength++) {
+ final int cmp = StringUtils.compareUtf8UsingJavaStringOrdering(prev[prefixLength], next[prefixLength]);
+ if (cmp != 0) {
+ break;
+ }
+ }
+ // convert to bytes because not every char is a single byte
+ final byte[] suffix = new byte[next.length - prefixLength];
+ System.arraycopy(next, prefixLength, suffix, 0, suffix.length);
+ int rem = buffer.remaining() - VByte.computeIntSize(prefixLength);
+ // wasn't enough room, bail out
+ if (rem < 0) {
+ return rem;
+ }
+ VByte.writeInt(buffer, prefixLength);
+ rem = writeValue(buffer, suffix);
+ prev = next;
+ // wasn't enough room, bail out
+ if (rem < 0) {
+ return rem;
+ }
+ }
+ written++;
+ }
+ return written;
+ }
+
/**
* Write a variable length byte[] value to a {@link ByteBuffer}, storing the length as a {@link VByte} encoded
* integer followed by the value itself. Returns the number of bytes written to the buffer. This method returns a
@@ -344,4 +413,71 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
return StringUtils.compareUtf8UsingJavaStringOrdering(b1, b2);
}
+
+
+ /**
+ * same as {@link FrontCodedIndexed#getFromBucketV1(ByteBuffer, int)} but without re-using prefixLength and buffer position
+ * arrays so has more overhead/garbage creation than the instance method.
+ *
+ * Note: adding the unwindPrefixLength and unwindBufferPosition arrays as arguments and having
+ * {@link FrontCodedIndexed#getFromBucketV1(ByteBuffer, int)} call this static method added 5-10ns of overhead
+ * compared to having its own copy of the code, presumably due to the overhead of an additional method call and extra
+ * arguments.
+ *
+ * As such, since the writer is the only user of this method, it has been copied here...
+ */
+ static ByteBuffer getFromBucketV1(ByteBuffer buffer, int offset, int bucketSize)
+ {
+ final int[] unwindPrefixLength = new int[bucketSize];
+ final int[] unwindBufferPosition = new int[bucketSize];
+ // first value is written whole
+ final int length = VByte.readInt(buffer);
+ if (offset == 0) {
+ // return first value directly from underlying buffer since it is stored whole
+ final ByteBuffer value = buffer.asReadOnlyBuffer();
+ value.limit(value.position() + length);
+ return value;
+ }
+ int pos = 0;
+ int prefixLength;
+ int fragmentLength;
+ unwindPrefixLength[pos] = 0;
+ unwindBufferPosition[pos] = buffer.position();
+
+ buffer.position(buffer.position() + length);
+ do {
+ prefixLength = VByte.readInt(buffer);
+ if (++pos < offset) {
+ // not there yet, no need to read anything other than the length to skip ahead
+ final int skipLength = VByte.readInt(buffer);
+ unwindPrefixLength[pos] = prefixLength;
+ unwindBufferPosition[pos] = buffer.position();
+ buffer.position(buffer.position() + skipLength);
+ } else {
+ // we've reached our destination
+ fragmentLength = VByte.readInt(buffer);
+ if (prefixLength == 0) {
+ // no prefix, return it directly from the underlying buffer
+ final ByteBuffer value = buffer.asReadOnlyBuffer();
+ value.limit(value.position() + fragmentLength);
+ return value;
+ }
+ break;
+ }
+ } while (true);
+ final int valueLength = prefixLength + fragmentLength;
+ final byte[] valueBytes = new byte[valueLength];
+ buffer.get(valueBytes, prefixLength, fragmentLength);
+ for (int i = prefixLength; i > 0;) {
+ // previous value had a larger prefix than or the same as the value we are looking for
+ // skip it since the fragment doesn't have anything we need
+ if (unwindPrefixLength[--pos] >= i) {
+ continue;
+ }
+ buffer.position(unwindBufferPosition[pos]);
+ buffer.get(valueBytes, unwindPrefixLength[pos], i - unwindPrefixLength[pos]);
+ i = unwindPrefixLength[pos];
+ }
+ return ByteBuffer.wrap(valueBytes);
+ }
}
diff --git a/processing/src/test/java/org/apache/druid/segment/data/FrontCodedIndexedTest.java b/processing/src/test/java/org/apache/druid/segment/data/FrontCodedIndexedTest.java
index c9e5613536..88a4a5d3ac 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/FrontCodedIndexedTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/FrontCodedIndexedTest.java
@@ -43,17 +43,24 @@ import java.util.TreeSet;
@RunWith(Parameterized.class)
public class FrontCodedIndexedTest extends InitializedNullHandlingTest
{
- @Parameterized.Parameters(name = "{0}")
+ @Parameterized.Parameters(name = "byteOrder: {0} useIncrementalBuckets: {1}")
public static Collection<Object[]> constructorFeeder()
{
- return ImmutableList.of(new Object[]{ByteOrder.LITTLE_ENDIAN}, new Object[]{ByteOrder.BIG_ENDIAN});
+ return ImmutableList.of(
+ new Object[]{ByteOrder.LITTLE_ENDIAN, true},
+ new Object[]{ByteOrder.LITTLE_ENDIAN, false},
+ new Object[]{ByteOrder.BIG_ENDIAN, true},
+ new Object[]{ByteOrder.BIG_ENDIAN, false}
+ );
}
private final ByteOrder order;
+ private final boolean useIncrementalBuckets;
- public FrontCodedIndexedTest(ByteOrder byteOrder)
+ public FrontCodedIndexedTest(ByteOrder byteOrder, boolean useIncrementalBuckets)
{
this.order = byteOrder;
+ this.useIncrementalBuckets = useIncrementalBuckets;
}
@Test
@@ -61,7 +68,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
{
ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
List<String> theList = ImmutableList.of("hello", "helloo", "hellooo", "hellooz", "helloozy");
- fillBuffer(buffer, theList, 4);
+ fillBuffer(buffer, theList, 4, useIncrementalBuckets);
buffer.position(0);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
@@ -92,13 +99,16 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
{
ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
List<String> theList = ImmutableList.of("hello", "helloo", "hellooo", "hellooz", "helloozy");
- fillBuffer(buffer, theList, 16);
+ fillBuffer(buffer, theList, 16, useIncrementalBuckets);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
buffer,
buffer.order()
).get();
+ Assert.assertEquals("hello", StringUtils.fromUtf8(codedUtf8Indexed.get(0)));
Assert.assertEquals("helloo", StringUtils.fromUtf8(codedUtf8Indexed.get(1)));
+ Assert.assertEquals("hellooo", StringUtils.fromUtf8(codedUtf8Indexed.get(2)));
+ Assert.assertEquals("hellooz", StringUtils.fromUtf8(codedUtf8Indexed.get(3)));
Assert.assertEquals("helloozy", StringUtils.fromUtf8(codedUtf8Indexed.get(4)));
Iterator<String> newListIterator = theList.iterator();
@@ -127,7 +137,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
for (int i = 0; i < sizeBase + sizeAdjust; i++) {
values.add(IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId());
}
- fillBuffer(buffer, values, bucketSize);
+ fillBuffer(buffer, values, bucketSize, useIncrementalBuckets);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
buffer,
@@ -163,7 +173,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
for (int i = 0; i < sizeBase + sizeAdjust; i++) {
values.add(IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId());
}
- fillBuffer(buffer, values, bucketSize);
+ fillBuffer(buffer, values, bucketSize, useIncrementalBuckets);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
buffer,
@@ -197,7 +207,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
List<String> theList = ImmutableList.of("hello", "helloo", "hellooo", "hellooz", "helloozy");
- fillBuffer(buffer, theList, 4);
+ fillBuffer(buffer, theList, 4, useIncrementalBuckets);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
buffer,
@@ -221,7 +231,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
TreeSet<String> values = new TreeSet<>(GenericIndexed.STRING_STRATEGY);
values.add(null);
values.addAll(theList);
- fillBuffer(buffer, values, 4);
+ fillBuffer(buffer, values, 4, useIncrementalBuckets);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
buffer,
@@ -244,7 +254,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
// "\uD83D\uDCA9" and "(請參見已被刪除版本)" are a regression test for https://github.com/apache/druid/pull/13364
List<String> theList = ImmutableList.of("Győ-Moson-Sopron", "Győr", "\uD83D\uDCA9", "(請參見已被刪除版本)");
- fillBuffer(buffer, theList, 4);
+ fillBuffer(buffer, theList, 4, useIncrementalBuckets);
buffer.position(0);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
@@ -260,7 +270,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
final ByteBuffer nextUtf8 = utf8Iterator.next();
Assert.assertEquals(next, StringUtils.fromUtf8(nextUtf8));
nextUtf8.position(0);
- Assert.assertEquals(next, StringUtils.fromUtf8(codedUtf8Indexed.get(ctr)));
+ Assert.assertEquals("mismatch row " + ctr, next, StringUtils.fromUtf8(codedUtf8Indexed.get(ctr)));
Assert.assertEquals(ctr, codedUtf8Indexed.indexOf(nextUtf8));
ctr++;
}
@@ -272,7 +282,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
{
ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
List<String> theList = Collections.singletonList(null);
- fillBuffer(buffer, theList, 4);
+ fillBuffer(buffer, theList, 4, useIncrementalBuckets);
buffer.position(0);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
@@ -315,7 +325,7 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
values.add(IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId());
}
for (int bucketSize : bucketSizes) {
- fillBuffer(buffer, values, bucketSize);
+ fillBuffer(buffer, values, bucketSize, useIncrementalBuckets);
FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
buffer,
buffer.order()
@@ -352,7 +362,8 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
() -> new FrontCodedIndexedWriter(
medium,
ByteOrder.nativeOrder(),
- 0
+ 0,
+ useIncrementalBuckets
)
);
@@ -361,7 +372,8 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
() -> new FrontCodedIndexedWriter(
medium,
ByteOrder.nativeOrder(),
- 15
+ 15,
+ useIncrementalBuckets
)
);
@@ -370,20 +382,23 @@ public class FrontCodedIndexedTest extends InitializedNullHandlingTest
() -> new FrontCodedIndexedWriter(
medium,
ByteOrder.nativeOrder(),
- 256
+ 256,
+ useIncrementalBuckets
)
);
}
- private static long fillBuffer(ByteBuffer buffer, Iterable<String> sortedIterable, int bucketSize) throws IOException
+ private static long fillBuffer(ByteBuffer buffer, Iterable<String> sortedIterable, int bucketSize, boolean useIncrementalBuckets) throws IOException
{
Iterator<String> sortedStrings = sortedIterable.iterator();
buffer.position(0);
OnHeapMemorySegmentWriteOutMedium medium = new OnHeapMemorySegmentWriteOutMedium();
- FrontCodedIndexedWriter writer = new FrontCodedIndexedWriter(
+ DictionaryWriter<byte[]> writer;
+ writer = new FrontCodedIndexedWriter(
medium,
buffer.order(),
- bucketSize
+ bucketSize,
+ useIncrementalBuckets
);
writer.open();
int index = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org