You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2023/08/04 17:52:42 UTC
[druid] branch master updated: refactor front-coded into static classes instead of using functional interfaces (#14572)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e5661a394c refactor front-coded into static classes instead of using functional interfaces (#14572)
e5661a394c is described below
commit e5661a394cf3ba26ca6adc1cb6d09b048ea123bd
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Fri Aug 4 10:52:36 2023 -0700
refactor front-coded into static classes instead of using functional interfaces (#14572)
* refactor front-coded into static classes instead of using functional interfaces
* shared v0 static method instead of copy
---
.../benchmark/FrontCodedIndexedBenchmark.java | 18 +-
.../druid/segment/data/FrontCodedIndexed.java | 668 ++++++++++-----------
.../segment/data/FrontCodedIndexedWriter.java | 14 +-
3 files changed, 344 insertions(+), 356 deletions(-)
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java
index fe0e717ab1..d09d501a05 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java
@@ -88,10 +88,10 @@ public class FrontCodedIndexedBenchmark
@Param({
"generic",
- "front-coded-4",
- "front-coded-16",
- "front-coded-incremental-buckets-4",
- "front-coded-incremental-buckets-16"
+ "front-coded-v0-4",
+ "front-coded-v0-16",
+ "front-coded-v1-4",
+ "front-coded-v1-16"
})
public String indexType;
@@ -138,7 +138,7 @@ public class FrontCodedIndexedBenchmark
FrontCodedIndexedWriter frontCodedIndexedWriter = new FrontCodedIndexedWriter(
new OnHeapMemorySegmentWriteOutMedium(),
ByteOrder.nativeOrder(),
- "front-coded-4".equals(indexType) ? 4 : 16,
+ "front-coded-v0-4".equals(indexType) ? 4 : 16,
FrontCodedIndexed.V0
);
frontCodedIndexedWriter.open();
@@ -146,7 +146,7 @@ public class FrontCodedIndexedBenchmark
FrontCodedIndexedWriter frontCodedIndexedWriterIncrementalBuckets = new FrontCodedIndexedWriter(
new OnHeapMemorySegmentWriteOutMedium(),
ByteOrder.nativeOrder(),
- "front-coded-incremental-buckets-4".equals(indexType) ? 4 : 16,
+ "front-coded-v1-4".equals(indexType) ? 4 : 16,
FrontCodedIndexed.V1
);
frontCodedIndexedWriterIncrementalBuckets.open();
@@ -166,11 +166,11 @@ public class FrontCodedIndexedBenchmark
fileGeneric = File.createTempFile("genericIndexedBenchmark", "meta");
smooshDirFrontCodedIncrementalBuckets = FileUtils.createTempDir();
- fileFrontCodedIncrementalBuckets = File.createTempFile("frontCodedIndexedBenchmarkIncrementalBuckets", "meta");
+ fileFrontCodedIncrementalBuckets = File.createTempFile("frontCodedIndexedBenchmarkv1Buckets", "meta");
EncodingSizeProfiler.encodedSize = (int) ("generic".equals(indexType)
? genericIndexedWriter.getSerializedSize()
- : indexType.startsWith("front-coded-incremental-buckets")
+ : indexType.startsWith("front-coded-v1")
? frontCodedIndexedWriterIncrementalBuckets.getSerializedSize()
: frontCodedIndexedWriter.getSerializedSize());
try (
@@ -286,7 +286,7 @@ public class FrontCodedIndexedBenchmark
}
if ("generic".equals(indexType)) {
indexed = genericIndexed.singleThreaded();
- } else if (indexType.startsWith("front-coded-incremental-buckets")) {
+ } else if (indexType.startsWith("front-coded-v1")) {
indexed = frontCodedIndexedIncrementalBuckets;
} else {
indexed = frontCodedIndexed;
diff --git a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java
index ebbf13a91b..f3f1457c50 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java
@@ -21,7 +21,6 @@ package org.apache.druid.segment.data;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
-import org.apache.druid.annotations.SuppressFBWarnings;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@@ -75,7 +74,7 @@ import java.util.NoSuchElementException;
* <p>
* This class is not thread-safe since during operation modifies positions of a shared buffer.
*/
-public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+public abstract class FrontCodedIndexed implements Indexed<ByteBuffer>
{
public static final byte V0 = 0;
public static final byte V1 = 1;
@@ -94,6 +93,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
}
return version;
}
+
public static Supplier<FrontCodedIndexed> read(ByteBuffer buffer, ByteOrder ordering)
{
final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering);
@@ -108,44 +108,46 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
// move position to end of buffer
buffer.position(offsetsPosition + size);
- return () -> new FrontCodedIndexed(
- buffer,
- ordering,
- bucketSize,
- numValues,
- hasNull,
- offsetsPosition,
- version
- );
+ if (version == V0) {
+ return () -> new FrontCodedV0(
+ buffer,
+ ordering,
+ bucketSize,
+ numValues,
+ hasNull,
+ offsetsPosition
+ );
+ } else {
+ return () -> new FrontCodedV1(
+ buffer,
+ ordering,
+ bucketSize,
+ numValues,
+ hasNull,
+ offsetsPosition
+ );
+ }
}
- private final ByteBuffer buffer;
- private final int adjustedNumValues;
- private final int adjustIndex;
- private final int bucketSize;
- private final int[] unwindPrefixLength;
- private final int[] unwindBufferPosition;
- private final int numBuckets;
- private final int div;
- private final int rem;
- private final int offsetsPosition;
- private final int bucketsPosition;
- private final boolean hasNull;
- private final int lastBucketNumValues;
-
- private final GetBucketValue getBucketValueFn;
- private final ReadBucket readBucketFn;
- private final FindInBucket findInBucketFn;
-
- @SuppressFBWarnings(value = "NP_STORE_INTO_NONNULL_FIELD", justification = "V0 does not use unwindPrefixLength or unwindBufferPosition")
+ protected final ByteBuffer buffer;
+ protected final int adjustedNumValues;
+ protected final int adjustIndex;
+ protected final int bucketSize;
+ protected final int numBuckets;
+ protected final int div;
+ protected final int rem;
+ protected final int offsetsPosition;
+ protected final int bucketsPosition;
+ protected final boolean hasNull;
+ protected final int lastBucketNumValues;
+
private FrontCodedIndexed(
ByteBuffer buffer,
ByteOrder order,
int bucketSize,
int numValues,
boolean hasNull,
- int offsetsPosition,
- byte version
+ int offsetsPosition
)
{
if (Integer.bitCount(bucketSize) != 1) {
@@ -163,25 +165,38 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
this.lastBucketNumValues = (numValues & rem) == 0 ? bucketSize : numValues & rem;
this.offsetsPosition = offsetsPosition;
this.bucketsPosition = offsetsPosition + ((numBuckets - 1) * Integer.BYTES);
- if (version == 0) {
- // version zero, all prefixes are computed against the first value in the bucket
- this.getBucketValueFn = FrontCodedIndexed::getFromBucketV0;
- this.readBucketFn = FrontCodedIndexed::readBucketV0;
- this.findInBucketFn = this::findValueInBucketV0;
- //noinspection DataFlowIssue
- this.unwindPrefixLength = null;
- //noinspection DataFlowIssue
- this.unwindBufferPosition = null;
- } else {
- // version one uses 'incremental' buckets, where the prefix is computed against the previous value
- this.unwindPrefixLength = new int[bucketSize];
- this.unwindBufferPosition = new int[bucketSize];
- this.getBucketValueFn = this::getFromBucketV1;
- this.readBucketFn = this::readBucketV1;
- this.findInBucketFn = this::findValueInBucketV1;
- }
}
+ /**
+ * Get a value from a bucket at a relative position.
+ * <p>
+ * This method modifies the position of the buffer.
+ */
+ abstract ByteBuffer getFromBucket(ByteBuffer buffer, int offset);
+
+ /**
+ * Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes.
+ * <p>
+ * This method modifies the position of the buffer.
+ */
+ abstract ByteBuffer[] readBucket(ByteBuffer buffer, int numValues);
+
+ /**
+ * Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against
+ * and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is
+ * the length which the value has in common with the previous value of the bucket.
+ * <p>
+ * This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when
+ * possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always
+ * sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment
+ * with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values
+ * which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a
+ * full comparison if the prefix length is the same
+ * <p>
+ * this method modifies the position of {@link #buffer}
+ */
+ abstract int findInBucket(ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, int sharedPrefixLength);
+
@Override
public int size()
{
@@ -206,7 +221,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
final int bucketIndex = adjustedIndex & rem;
final int offset = getBucketOffset(bucket);
buffer.position(offset);
- return getBucketValueFn.get(buffer, bucketIndex);
+ return getFromBucket(buffer, bucketIndex);
}
@Override
@@ -268,7 +283,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
// find the value in the bucket (or where it would be if it were present)
buffer.position(firstOffset + firstLength);
- return findInBucketFn.find(value, currBucketFirstValueIndex, bucketSize, sharedPrefix);
+ return findInBucket(value, currBucketFirstValueIndex, bucketSize, sharedPrefix);
} else if (comparison < 0) {
minBucketIndex = currentBucket + 1;
} else {
@@ -308,7 +323,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
buffer.position(firstOffset + firstLength);
- return findInBucketFn.find(value, bucketIndexBase, numValuesInBucket, sharedPrefix);
+ return findInBucket(value, bucketIndexBase, numValuesInBucket, sharedPrefix);
}
@Override
@@ -329,7 +344,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
}
ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
copy.position(bucketsPosition);
- final ByteBuffer[] firstBucket = readBucketFn.readBucket(copy, numBuckets > 1 ? bucketSize : lastBucketNumValues);
+ final ByteBuffer[] firstBucket = readBucket(copy, numBuckets > 1 ? bucketSize : lastBucketNumValues);
// iterator decodes and buffers a bucket at a time, paging through buckets as the iterator is consumed
return new Iterator<ByteBuffer>()
{
@@ -360,7 +375,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
if (bucketNum != currentBucketIndex) {
final int offset = copy.getInt(offsetsPosition + ((bucketNum - 1) * Integer.BYTES));
copy.position(bucketsPosition + offset);
- currentBucket = readBucketFn.readBucket(
+ currentBucket = readBucket(
copy,
bucketNum < (numBuckets - 1) ? bucketSize : lastBucketNumValues
);
@@ -394,7 +409,6 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
return bucketsPosition + (bucket > 0 ? buffer.getInt(offsetsPosition + ((bucket - 1) * Integer.BYTES)) : 0);
}
-
/**
* Performs byte-by-byte comparison of the first value in a bucket with the specified value. Note that this method
* MUST be prepared before calling, as it expects the length of the first value to have already been read externally,
@@ -423,319 +437,293 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
return comparison;
}
- /**
- * Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against
- * and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is
- * the length which the value has in common with the first value of the bucket.
- * <p>
- * This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when
- * possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always
- * sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment
- * with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values
- * which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a
- * full comparison if the prefix length is the same
- * <p>
- * this method modifies the position of {@link #buffer}
- */
- private int findValueInBucketV0(
- ByteBuffer value,
- int currBucketFirstValueIndex,
- int bucketSize,
- int sharedPrefix
- )
+ public static final class FrontCodedV0 extends FrontCodedIndexed
{
- int relativePosition = 0;
- int prefixLength;
- // scan through bucket values until we find match or compare numValues
- int insertionPoint = 1;
- while (++relativePosition < bucketSize) {
- prefixLength = VByte.readInt(buffer);
- if (prefixLength > sharedPrefix) {
- // this value shares more in common with the first value, so the value we are looking for comes after
- final int skip = VByte.readInt(buffer);
- buffer.position(buffer.position() + skip);
- insertionPoint++;
- } else if (prefixLength < sharedPrefix) {
- // prefix is smaller, that means this value sorts ahead of it
- break;
- } else {
- final int fragmentLength = VByte.readInt(buffer);
- final int common = Math.min(fragmentLength, value.remaining() - prefixLength);
- int fragmentComparison = 0;
- for (int i = 0; i < common; i++) {
- fragmentComparison = StringUtils.compareUtf8UsingJavaStringOrdering(
- buffer.get(buffer.position() + i),
- value.get(prefixLength + i)
- );
- if (fragmentComparison != 0) {
- break;
- }
- }
- if (fragmentComparison == 0) {
- fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining());
- }
+ private FrontCodedV0(
+ ByteBuffer buffer,
+ ByteOrder order,
+ int bucketSize,
+ int numValues,
+ boolean hasNull,
+ int offsetsPosition
+ )
+ {
+ super(buffer, order, bucketSize, numValues, hasNull, offsetsPosition);
+ }
- if (fragmentComparison == 0) {
- return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
- } else if (fragmentComparison < 0) {
- buffer.position(buffer.position() + fragmentLength);
- insertionPoint++;
+ @Override
+ ByteBuffer getFromBucket(ByteBuffer buffer, int offset)
+ {
+ return getValueFromBucket(buffer, offset);
+ }
+
+ public static ByteBuffer getValueFromBucket(ByteBuffer buffer, int offset)
+ {
+ int prefixPosition;
+ if (offset == 0) {
+ final int length = VByte.readInt(buffer);
+ final ByteBuffer firstValue = buffer.asReadOnlyBuffer();
+ firstValue.limit(firstValue.position() + length);
+ return firstValue;
+ } else {
+ final int firstLength = VByte.readInt(buffer);
+ prefixPosition = buffer.position();
+ buffer.position(buffer.position() + firstLength);
+ }
+ int pos = 0;
+ int prefixLength;
+ int fragmentLength;
+ int fragmentPosition;
+ // scan through bucket values until we reach offset
+ do {
+ prefixLength = VByte.readInt(buffer);
+ if (++pos < offset) {
+ // not there yet, no need to read anything other than the length to skip ahead
+ final int skipLength = VByte.readInt(buffer);
+ buffer.position(buffer.position() + skipLength);
} else {
+ // we've reached our destination
+ fragmentLength = VByte.readInt(buffer);
+ fragmentPosition = buffer.position();
break;
}
+ } while (true);
+ final int valueLength = prefixLength + fragmentLength;
+ ByteBuffer value = ByteBuffer.allocate(valueLength);
+ for (int i = 0; i < valueLength; i++) {
+ if (i < prefixLength) {
+ value.put(buffer.get(prefixPosition + i));
+ } else {
+ value.put(buffer.get(fragmentPosition + i - prefixLength));
+ }
}
+ value.flip();
+ return value;
}
- // (-(insertion point) - 1)
- return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
- }
- /**
- * Get a value from a bucket at a relative position.
- * <p>
- * This method modifies the position of the buffer.
- */
- static ByteBuffer getFromBucketV0(ByteBuffer buffer, int offset)
- {
- int prefixPosition;
- if (offset == 0) {
+ @Override
+ ByteBuffer[] readBucket(ByteBuffer buffer, int numValues)
+ {
final int length = VByte.readInt(buffer);
- final ByteBuffer firstValue = buffer.asReadOnlyBuffer();
- firstValue.limit(firstValue.position() + length);
- return firstValue;
- } else {
- final int firstLength = VByte.readInt(buffer);
- prefixPosition = buffer.position();
- buffer.position(buffer.position() + firstLength);
- }
- int pos = 0;
- int prefixLength;
- int fragmentLength;
- int fragmentPosition;
- // scan through bucket values until we reach offset
- do {
- prefixLength = VByte.readInt(buffer);
- if (++pos < offset) {
- // not there yet, no need to read anything other than the length to skip ahead
- final int skipLength = VByte.readInt(buffer);
- buffer.position(buffer.position() + skipLength);
- } else {
- // we've reached our destination
- fragmentLength = VByte.readInt(buffer);
- fragmentPosition = buffer.position();
- break;
- }
- } while (true);
- final int valueLength = prefixLength + fragmentLength;
- ByteBuffer value = ByteBuffer.allocate(valueLength);
- for (int i = 0; i < valueLength; i++) {
- if (i < prefixLength) {
- value.put(buffer.get(prefixPosition + i));
- } else {
- value.put(buffer.get(fragmentPosition + i - prefixLength));
+ final byte[] prefixBytes = new byte[length];
+ buffer.get(prefixBytes, 0, length);
+ final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues];
+ bucketBuffers[0] = ByteBuffer.wrap(prefixBytes);
+ int pos = 1;
+ while (pos < numValues) {
+ final int prefixLength = VByte.readInt(buffer);
+ final int fragmentLength = VByte.readInt(buffer);
+ final byte[] fragment = new byte[fragmentLength];
+ buffer.get(fragment, 0, fragmentLength);
+ final ByteBuffer value = ByteBuffer.allocate(prefixLength + fragmentLength);
+ value.put(prefixBytes, 0, prefixLength);
+ value.put(fragment);
+ value.flip();
+ bucketBuffers[pos++] = value;
}
+ return bucketBuffers;
}
- value.flip();
- return value;
- }
-
- /**
- * Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes.
- * <p>
- * This method modifies the position of the buffer.
- */
- private static ByteBuffer[] readBucketV0(ByteBuffer bucket, int numValues)
- {
- final int length = VByte.readInt(bucket);
- final byte[] prefixBytes = new byte[length];
- bucket.get(prefixBytes, 0, length);
- final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues];
- bucketBuffers[0] = ByteBuffer.wrap(prefixBytes);
- int pos = 1;
- while (pos < numValues) {
- final int prefixLength = VByte.readInt(bucket);
- final int fragmentLength = VByte.readInt(bucket);
- final byte[] fragment = new byte[fragmentLength];
- bucket.get(fragment, 0, fragmentLength);
- final ByteBuffer value = ByteBuffer.allocate(prefixLength + fragmentLength);
- value.put(prefixBytes, 0, prefixLength);
- value.put(fragment);
- value.flip();
- bucketBuffers[pos++] = value;
- }
- return bucketBuffers;
- }
-
- /**
- * Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against
- * and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is
- * the length which the value has in common with the previous value of the bucket.
- * <p>
- * This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when
- * possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always
- * sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment
- * with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values
- * which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a
- * full comparison if the prefix length is the same
- * <p>
- * this method modifies the position of {@link #buffer}
- */
- private int findValueInBucketV1(
- ByteBuffer value,
- int currBucketFirstValueIndex,
- int bucketSize,
- int sharedPrefixLength
- )
- {
- int relativePosition = 0;
- int prefixLength;
- // scan through bucket values until we find match or compare numValues
- int insertionPoint = 1;
- while (++relativePosition < bucketSize) {
- prefixLength = VByte.readInt(buffer);
- if (prefixLength > sharedPrefixLength) {
- // bucket value shares more in common with the preceding value, so the value we are looking for comes after
- final int skip = VByte.readInt(buffer);
- buffer.position(buffer.position() + skip);
- insertionPoint++;
- } else if (prefixLength < sharedPrefixLength) {
- // bucket value prefix is smaller, that means the value we are looking for sorts ahead of it
- break;
- } else {
- // value has the same shared prefix, so compare additional values to find
- final int fragmentLength = VByte.readInt(buffer);
- final int common = Math.min(fragmentLength, value.remaining() - prefixLength);
- int fragmentComparison = 0;
- boolean shortCircuit = false;
- for (int i = 0; i < common; i++) {
- fragmentComparison = StringUtils.compareUtf8UsingJavaStringOrdering(
- buffer.get(buffer.position() + i),
- value.get(prefixLength + i)
- );
- if (fragmentComparison != 0) {
- sharedPrefixLength = prefixLength + i;
- shortCircuit = true;
- break;
+ @Override
+ int findInBucket(ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, int sharedPrefixLength)
+ {
+ int relativePosition = 0;
+ int prefixLength;
+ // scan through bucket values until we find match or compare numValues
+ int insertionPoint = 1;
+ while (++relativePosition < bucketSize) {
+ prefixLength = VByte.readInt(buffer);
+ if (prefixLength > sharedPrefixLength) {
+ // this value shares more in common with the first value, so the value we are looking for comes after
+ final int skip = VByte.readInt(buffer);
+ buffer.position(buffer.position() + skip);
+ insertionPoint++;
+ } else if (prefixLength < sharedPrefixLength) {
+ // prefix is smaller, that means this value sorts ahead of it
+ break;
+ } else {
+ final int fragmentLength = VByte.readInt(buffer);
+ final int common = Math.min(fragmentLength, value.remaining() - prefixLength);
+ int fragmentComparison = 0;
+ for (int i = 0; i < common; i++) {
+ fragmentComparison = StringUtils.compareUtf8UsingJavaStringOrdering(
+ buffer.get(buffer.position() + i),
+ value.get(prefixLength + i)
+ );
+ if (fragmentComparison != 0) {
+ break;
+ }
+ }
+ if (fragmentComparison == 0) {
+ fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining());
}
- }
- if (fragmentComparison == 0) {
- fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining());
- }
- if (fragmentComparison == 0) {
- return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
- } else if (fragmentComparison < 0) {
- // value we are looking for is longer than the current bucket value, continue on
- if (!shortCircuit) {
- sharedPrefixLength = prefixLength + common;
+ if (fragmentComparison == 0) {
+ return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
+ } else if (fragmentComparison < 0) {
+ buffer.position(buffer.position() + fragmentLength);
+ insertionPoint++;
+ } else {
+ break;
}
- buffer.position(buffer.position() + fragmentLength);
- insertionPoint++;
- } else {
- break;
}
}
+ // (-(insertion point) - 1)
+ return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
}
- // (-(insertion point) - 1)
- return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
}
- private ByteBuffer getFromBucketV1(ByteBuffer buffer, int offset)
+ public static final class FrontCodedV1 extends FrontCodedIndexed
{
- // first value is written whole
- final int length = VByte.readInt(buffer);
- if (offset == 0) {
- // return first value directly from underlying buffer since it is stored whole
- final ByteBuffer value = buffer.asReadOnlyBuffer();
- value.limit(value.position() + length);
- return value;
+ private final int[] unwindPrefixLength;
+ private final int[] unwindBufferPosition;
+
+ private FrontCodedV1(
+ ByteBuffer buffer,
+ ByteOrder order,
+ int bucketSize,
+ int numValues,
+ boolean hasNull,
+ int offsetsPosition
+ )
+ {
+ super(buffer, order, bucketSize, numValues, hasNull, offsetsPosition);
+ this.unwindPrefixLength = new int[bucketSize];
+ this.unwindBufferPosition = new int[bucketSize];
}
- int pos = 0;
- int prefixLength;
- int fragmentLength;
- unwindPrefixLength[pos] = 0;
- unwindBufferPosition[pos] = buffer.position();
-
- buffer.position(buffer.position() + length);
- do {
- prefixLength = VByte.readInt(buffer);
- if (++pos < offset) {
- // not there yet, no need to read anything other than the length to skip ahead
- final int skipLength = VByte.readInt(buffer);
- unwindPrefixLength[pos] = prefixLength;
- unwindBufferPosition[pos] = buffer.position();
- buffer.position(buffer.position() + skipLength);
- } else {
- // we've reached our destination
- fragmentLength = VByte.readInt(buffer);
- if (prefixLength == 0) {
- // no prefix, return it directly from the underlying buffer
- final ByteBuffer value = buffer.asReadOnlyBuffer();
- value.limit(value.position() + fragmentLength);
- return value;
- }
- break;
+
+ @Override
+ ByteBuffer getFromBucket(ByteBuffer buffer, int offset)
+ {
+ // first value is written whole
+ final int length = VByte.readInt(buffer);
+ if (offset == 0) {
+ // return first value directly from underlying buffer since it is stored whole
+ final ByteBuffer value = buffer.asReadOnlyBuffer();
+ value.limit(value.position() + length);
+ return value;
}
- } while (true);
- final int valueLength = prefixLength + fragmentLength;
- final byte[] valueBytes = new byte[valueLength];
- buffer.get(valueBytes, prefixLength, fragmentLength);
- for (int i = prefixLength; i > 0;) {
- // previous value had a larger prefix than or the same as the value we are looking for
- // skip it since the fragment doesn't have anything we need
- if (unwindPrefixLength[--pos] >= i) {
- continue;
+ int pos = 0;
+ int prefixLength;
+ int fragmentLength;
+ unwindPrefixLength[pos] = 0;
+ unwindBufferPosition[pos] = buffer.position();
+
+ buffer.position(buffer.position() + length);
+ do {
+ prefixLength = VByte.readInt(buffer);
+ if (++pos < offset) {
+ // not there yet, no need to read anything other than the length to skip ahead
+ final int skipLength = VByte.readInt(buffer);
+ unwindPrefixLength[pos] = prefixLength;
+ unwindBufferPosition[pos] = buffer.position();
+ buffer.position(buffer.position() + skipLength);
+ } else {
+ // we've reached our destination
+ fragmentLength = VByte.readInt(buffer);
+ if (prefixLength == 0) {
+ // no prefix, return it directly from the underlying buffer
+ final ByteBuffer value = buffer.asReadOnlyBuffer();
+ value.limit(value.position() + fragmentLength);
+ return value;
+ }
+ break;
+ }
+ } while (true);
+ final int valueLength = prefixLength + fragmentLength;
+ final byte[] valueBytes = new byte[valueLength];
+ buffer.get(valueBytes, prefixLength, fragmentLength);
+ for (int i = prefixLength; i > 0;) {
+ // previous value had a larger prefix than or the same as the value we are looking for
+ // skip it since the fragment doesn't have anything we need
+ if (unwindPrefixLength[--pos] >= i) {
+ continue;
+ }
+ buffer.position(unwindBufferPosition[pos]);
+ buffer.get(valueBytes, unwindPrefixLength[pos], i - unwindPrefixLength[pos]);
+ i = unwindPrefixLength[pos];
}
- buffer.position(unwindBufferPosition[pos]);
- buffer.get(valueBytes, unwindPrefixLength[pos], i - unwindPrefixLength[pos]);
- i = unwindPrefixLength[pos];
+ return ByteBuffer.wrap(valueBytes);
}
- return ByteBuffer.wrap(valueBytes);
- }
- /**
- * Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes.
- * <p>
- * This method modifies the position of the buffer.
- */
- private ByteBuffer[] readBucketV1(ByteBuffer bucket, int numValues)
- {
- final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues];
-
- // first value is written whole
- final int length = VByte.readInt(bucket);
- byte[] prefixBytes = new byte[length];
- bucket.get(prefixBytes, 0, length);
- bucketBuffers[0] = ByteBuffer.wrap(prefixBytes);
- int pos = 1;
- while (pos < numValues) {
- final int prefixLength = VByte.readInt(bucket);
- final int fragmentLength = VByte.readInt(bucket);
- byte[] nextValueBytes = new byte[prefixLength + fragmentLength];
- System.arraycopy(prefixBytes, 0, nextValueBytes, 0, prefixLength);
- bucket.get(nextValueBytes, prefixLength, fragmentLength);
- final ByteBuffer value = ByteBuffer.wrap(nextValueBytes);
- prefixBytes = nextValueBytes;
- bucketBuffers[pos++] = value;
- }
- return bucketBuffers;
- }
+ @Override
+ ByteBuffer[] readBucket(ByteBuffer buffer, int numValues)
+ {
+ final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues];
- @FunctionalInterface
- interface GetBucketValue
- {
- ByteBuffer get(ByteBuffer buffer, int offset);
- }
+ // first value is written whole
+ final int length = VByte.readInt(buffer);
+ byte[] prefixBytes = new byte[length];
+ buffer.get(prefixBytes, 0, length);
+ bucketBuffers[0] = ByteBuffer.wrap(prefixBytes);
+ int pos = 1;
+ while (pos < numValues) {
+ final int prefixLength = VByte.readInt(buffer);
+ final int fragmentLength = VByte.readInt(buffer);
+ byte[] nextValueBytes = new byte[prefixLength + fragmentLength];
+ System.arraycopy(prefixBytes, 0, nextValueBytes, 0, prefixLength);
+ buffer.get(nextValueBytes, prefixLength, fragmentLength);
+ final ByteBuffer value = ByteBuffer.wrap(nextValueBytes);
+ prefixBytes = nextValueBytes;
+ bucketBuffers[pos++] = value;
+ }
+ return bucketBuffers;
+ }
- @FunctionalInterface
- interface ReadBucket
- {
- ByteBuffer[] readBucket(ByteBuffer buffer, int bucketSize);
- }
+ @Override
+ int findInBucket(ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, int sharedPrefixLength)
+ {
+ int relativePosition = 0;
+ int prefixLength;
+ // scan through bucket values until we find match or compare numValues
+ int insertionPoint = 1;
+ while (++relativePosition < bucketSize) {
+ prefixLength = VByte.readInt(buffer);
+ if (prefixLength > sharedPrefixLength) {
+ // bucket value shares more in common with the preceding value, so the value we are looking for comes after
+ final int skip = VByte.readInt(buffer);
+ buffer.position(buffer.position() + skip);
+ insertionPoint++;
+ } else if (prefixLength < sharedPrefixLength) {
+ // bucket value prefix is smaller, that means the value we are looking for sorts ahead of it
+ break;
+ } else {
+ // value has the same shared prefix, so compare additional values to find
+ final int fragmentLength = VByte.readInt(buffer);
+ final int common = Math.min(fragmentLength, value.remaining() - prefixLength);
+ int fragmentComparison = 0;
+ boolean shortCircuit = false;
+ for (int i = 0; i < common; i++) {
+ fragmentComparison = StringUtils.compareUtf8UsingJavaStringOrdering(
+ buffer.get(buffer.position() + i),
+ value.get(prefixLength + i)
+ );
+ if (fragmentComparison != 0) {
+ sharedPrefixLength = prefixLength + i;
+ shortCircuit = true;
+ break;
+ }
+ }
+ if (fragmentComparison == 0) {
+ fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining());
+ }
- @FunctionalInterface
- interface FindInBucket
- {
- int find(ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, int sharedPrefixLength);
+ if (fragmentComparison == 0) {
+ return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
+ } else if (fragmentComparison < 0) {
+ // value we are looking for is longer than the current bucket value, continue on
+ if (!shortCircuit) {
+ sharedPrefixLength = prefixLength + common;
+ }
+ buffer.position(buffer.position() + fragmentLength);
+ insertionPoint++;
+ } else {
+ break;
+ }
+ }
+ }
+ // (-(insertion point) - 1)
+ return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
+ }
}
}
diff --git a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java
index 83b4850f97..efdf7336d1 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java
@@ -217,7 +217,7 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
bucketBuffer.clear();
final ByteBuffer valueBuffer = version == FrontCodedIndexed.V1
? getFromBucketV1(bucketBuffer, relativeIndex, bucketSize)
- : FrontCodedIndexed.getFromBucketV0(bucketBuffer, relativeIndex);
+ : FrontCodedIndexed.FrontCodedV0.getValueFromBucket(bucketBuffer, relativeIndex);
final byte[] valueBytes = new byte[valueBuffer.limit() - valueBuffer.position()];
valueBuffer.get(valueBytes);
return valueBytes;
@@ -413,15 +413,15 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
return StringUtils.compareUtf8UsingJavaStringOrdering(b1, b2);
}
-
/**
- * same as {@link FrontCodedIndexed#getFromBucketV1(ByteBuffer, int)} but without re-using prefixLength and buffer position
- * arrays so has more overhead/garbage creation than the instance method.
+ * same as {@link FrontCodedIndexed.FrontCodedV1#getFromBucket(ByteBuffer, int)} but
+ * without re-using prefixLength and buffer position arrays so has more overhead/garbage creation than the instance
+ * method.
*
* Note: adding the unwindPrefixLength and unwindBufferPosition arrays as arguments and having
- * {@link FrontCodedIndexed#getFromBucketV1(ByteBuffer, int)} call this static method added 5-10ns of overhead
- * compared to having its own copy of the code, presumably due to the overhead of an additional method call and extra
- * arguments.
+ * {@link FrontCodedIndexed.FrontCodedV1#getFromBucket(ByteBuffer, int)} call this static method added 5-10ns of
+ * overhead compared to having its own copy of the code, presumably due to the overhead of an additional method call
+ * and extra arguments.
*
* As such, since the writer is the only user of this method, it has been copied here...
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org