You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2023/08/04 17:52:42 UTC

[druid] branch master updated: refactor front-coded into static classes instead of using functional interfaces (#14572)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e5661a394c refactor front-coded into static classes instead of using functional interfaces (#14572)
e5661a394c is described below

commit e5661a394cf3ba26ca6adc1cb6d09b048ea123bd
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Fri Aug 4 10:52:36 2023 -0700

    refactor front-coded into static classes instead of using functional interfaces (#14572)
    
    * refactor front-coded into static classes instead of using functional interfaces
    
    * shared v0 static method instead of copy
---
 .../benchmark/FrontCodedIndexedBenchmark.java      |  18 +-
 .../druid/segment/data/FrontCodedIndexed.java      | 668 ++++++++++-----------
 .../segment/data/FrontCodedIndexedWriter.java      |  14 +-
 3 files changed, 344 insertions(+), 356 deletions(-)

diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java
index fe0e717ab1..d09d501a05 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java
@@ -88,10 +88,10 @@ public class FrontCodedIndexedBenchmark
 
   @Param({
       "generic",
-      "front-coded-4",
-      "front-coded-16",
-      "front-coded-incremental-buckets-4",
-      "front-coded-incremental-buckets-16"
+      "front-coded-v0-4",
+      "front-coded-v0-16",
+      "front-coded-v1-4",
+      "front-coded-v1-16"
   })
   public String indexType;
 
@@ -138,7 +138,7 @@ public class FrontCodedIndexedBenchmark
     FrontCodedIndexedWriter frontCodedIndexedWriter = new FrontCodedIndexedWriter(
         new OnHeapMemorySegmentWriteOutMedium(),
         ByteOrder.nativeOrder(),
-        "front-coded-4".equals(indexType) ? 4 : 16,
+        "front-coded-v0-4".equals(indexType) ? 4 : 16,
         FrontCodedIndexed.V0
     );
     frontCodedIndexedWriter.open();
@@ -146,7 +146,7 @@ public class FrontCodedIndexedBenchmark
     FrontCodedIndexedWriter frontCodedIndexedWriterIncrementalBuckets = new FrontCodedIndexedWriter(
         new OnHeapMemorySegmentWriteOutMedium(),
         ByteOrder.nativeOrder(),
-        "front-coded-incremental-buckets-4".equals(indexType) ? 4 : 16,
+        "front-coded-v1-4".equals(indexType) ? 4 : 16,
         FrontCodedIndexed.V1
     );
     frontCodedIndexedWriterIncrementalBuckets.open();
@@ -166,11 +166,11 @@ public class FrontCodedIndexedBenchmark
     fileGeneric = File.createTempFile("genericIndexedBenchmark", "meta");
 
     smooshDirFrontCodedIncrementalBuckets = FileUtils.createTempDir();
-    fileFrontCodedIncrementalBuckets = File.createTempFile("frontCodedIndexedBenchmarkIncrementalBuckets", "meta");
+    fileFrontCodedIncrementalBuckets = File.createTempFile("frontCodedIndexedBenchmarkv1Buckets", "meta");
 
     EncodingSizeProfiler.encodedSize = (int) ("generic".equals(indexType)
                                               ? genericIndexedWriter.getSerializedSize()
-                                              : indexType.startsWith("front-coded-incremental-buckets")
+                                              : indexType.startsWith("front-coded-v1")
                                                 ? frontCodedIndexedWriterIncrementalBuckets.getSerializedSize()
                                                 : frontCodedIndexedWriter.getSerializedSize());
     try (
@@ -286,7 +286,7 @@ public class FrontCodedIndexedBenchmark
     }
     if ("generic".equals(indexType)) {
       indexed = genericIndexed.singleThreaded();
-    } else if (indexType.startsWith("front-coded-incremental-buckets")) {
+    } else if (indexType.startsWith("front-coded-v1")) {
       indexed = frontCodedIndexedIncrementalBuckets;
     } else {
       indexed = frontCodedIndexed;
diff --git a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java
index ebbf13a91b..f3f1457c50 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java
@@ -21,7 +21,6 @@ package org.apache.druid.segment.data;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
-import org.apache.druid.annotations.SuppressFBWarnings;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
@@ -75,7 +74,7 @@ import java.util.NoSuchElementException;
  * <p>
  * This class is not thread-safe since during operation modifies positions of a shared buffer.
  */
-public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+public abstract class FrontCodedIndexed implements Indexed<ByteBuffer>
 {
   public static final byte V0 = 0;
   public static final byte V1 = 1;
@@ -94,6 +93,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
     }
     return version;
   }
+
   public static Supplier<FrontCodedIndexed> read(ByteBuffer buffer, ByteOrder ordering)
   {
     final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering);
@@ -108,44 +108,46 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
     // move position to end of buffer
     buffer.position(offsetsPosition + size);
 
-    return () -> new FrontCodedIndexed(
-        buffer,
-        ordering,
-        bucketSize,
-        numValues,
-        hasNull,
-        offsetsPosition,
-        version
-    );
+    if (version == V0) {
+      return () -> new FrontCodedV0(
+          buffer,
+          ordering,
+          bucketSize,
+          numValues,
+          hasNull,
+          offsetsPosition
+      );
+    } else {
+      return () -> new FrontCodedV1(
+          buffer,
+          ordering,
+          bucketSize,
+          numValues,
+          hasNull,
+          offsetsPosition
+      );
+    }
   }
 
-  private final ByteBuffer buffer;
-  private final int adjustedNumValues;
-  private final int adjustIndex;
-  private final int bucketSize;
-  private final int[] unwindPrefixLength;
-  private final int[] unwindBufferPosition;
-  private final int numBuckets;
-  private final int div;
-  private final int rem;
-  private final int offsetsPosition;
-  private final int bucketsPosition;
-  private final boolean hasNull;
-  private final int lastBucketNumValues;
-
-  private final GetBucketValue getBucketValueFn;
-  private final ReadBucket readBucketFn;
-  private final FindInBucket findInBucketFn;
-
-  @SuppressFBWarnings(value = "NP_STORE_INTO_NONNULL_FIELD", justification = "V0 does not use unwindPrefixLength or unwindBufferPosition")
+  protected final ByteBuffer buffer;
+  protected final int adjustedNumValues;
+  protected final int adjustIndex;
+  protected final int bucketSize;
+  protected final int numBuckets;
+  protected final int div;
+  protected final int rem;
+  protected final int offsetsPosition;
+  protected final int bucketsPosition;
+  protected final boolean hasNull;
+  protected final int lastBucketNumValues;
+
   private FrontCodedIndexed(
       ByteBuffer buffer,
       ByteOrder order,
       int bucketSize,
       int numValues,
       boolean hasNull,
-      int offsetsPosition,
-      byte version
+      int offsetsPosition
   )
   {
     if (Integer.bitCount(bucketSize) != 1) {
@@ -163,25 +165,38 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
     this.lastBucketNumValues = (numValues & rem) == 0 ? bucketSize : numValues & rem;
     this.offsetsPosition = offsetsPosition;
     this.bucketsPosition = offsetsPosition + ((numBuckets - 1) * Integer.BYTES);
-    if (version == 0) {
-      // version zero, all prefixes are computed against the first value in the bucket
-      this.getBucketValueFn = FrontCodedIndexed::getFromBucketV0;
-      this.readBucketFn = FrontCodedIndexed::readBucketV0;
-      this.findInBucketFn = this::findValueInBucketV0;
-      //noinspection DataFlowIssue
-      this.unwindPrefixLength = null;
-      //noinspection DataFlowIssue
-      this.unwindBufferPosition = null;
-    } else {
-      // version one uses 'incremental' buckets, where the prefix is computed against the previous value
-      this.unwindPrefixLength = new int[bucketSize];
-      this.unwindBufferPosition = new int[bucketSize];
-      this.getBucketValueFn = this::getFromBucketV1;
-      this.readBucketFn = this::readBucketV1;
-      this.findInBucketFn = this::findValueInBucketV1;
-    }
   }
 
+  /**
+   * Get a value from a bucket at a relative position.
+   * <p>
+   * This method modifies the position of the buffer.
+   */
+  abstract ByteBuffer getFromBucket(ByteBuffer buffer, int offset);
+
+  /**
+   * Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes.
+   * <p>
+   * This method modifies the position of the buffer.
+   */
+  abstract ByteBuffer[] readBucket(ByteBuffer buffer, int numValues);
+
+  /**
+   * Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against
+   * and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is
+   * the length which the value has in common with the previous value of the bucket.
+   * <p>
+   * This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when
+   * possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always
+   * sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment
+   * with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values
+   * which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a
+   * full comparison if the prefix length is the same
+   * <p>
+   * this method modifies the position of {@link #buffer}
+   */
+  abstract int findInBucket(ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, int sharedPrefixLength);
+  
   @Override
   public int size()
   {
@@ -206,7 +221,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
     final int bucketIndex = adjustedIndex & rem;
     final int offset = getBucketOffset(bucket);
     buffer.position(offset);
-    return getBucketValueFn.get(buffer, bucketIndex);
+    return getFromBucket(buffer, bucketIndex);
   }
 
   @Override
@@ -268,7 +283,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
         // find the value in the bucket (or where it would be if it were present)
         buffer.position(firstOffset + firstLength);
 
-        return findInBucketFn.find(value, currBucketFirstValueIndex, bucketSize, sharedPrefix);
+        return findInBucket(value, currBucketFirstValueIndex, bucketSize, sharedPrefix);
       } else if (comparison < 0) {
         minBucketIndex = currentBucket + 1;
       } else {
@@ -308,7 +323,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
 
     buffer.position(firstOffset + firstLength);
 
-    return findInBucketFn.find(value, bucketIndexBase, numValuesInBucket, sharedPrefix);
+    return findInBucket(value, bucketIndexBase, numValuesInBucket, sharedPrefix);
   }
 
   @Override
@@ -329,7 +344,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
     }
     ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
     copy.position(bucketsPosition);
-    final ByteBuffer[] firstBucket = readBucketFn.readBucket(copy, numBuckets > 1 ? bucketSize : lastBucketNumValues);
+    final ByteBuffer[] firstBucket = readBucket(copy, numBuckets > 1 ? bucketSize : lastBucketNumValues);
     // iterator decodes and buffers a bucket at a time, paging through buckets as the iterator is consumed
     return new Iterator<ByteBuffer>()
     {
@@ -360,7 +375,7 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
         if (bucketNum != currentBucketIndex) {
           final int offset = copy.getInt(offsetsPosition + ((bucketNum - 1) * Integer.BYTES));
           copy.position(bucketsPosition + offset);
-          currentBucket = readBucketFn.readBucket(
+          currentBucket = readBucket(
               copy,
               bucketNum < (numBuckets - 1) ? bucketSize : lastBucketNumValues
           );
@@ -394,7 +409,6 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
     return bucketsPosition + (bucket > 0 ? buffer.getInt(offsetsPosition + ((bucket - 1) * Integer.BYTES)) : 0);
   }
 
-
   /**
    * Performs byte-by-byte comparison of the first value in a bucket with the specified value. Note that this method
    * MUST be prepared before calling, as it expects the length of the first value to have already been read externally,
@@ -423,319 +437,293 @@ public final class FrontCodedIndexed implements Indexed<ByteBuffer>
     return comparison;
   }
 
-  /**
-   * Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against
-   * and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is
-   * the length which the value has in common with the first value of the bucket.
-   * <p>
-   * This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when
-   * possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always
-   * sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment
-   * with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values
-   * which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a
-   * full comparison if the prefix length is the same
-   * <p>
-   * this method modifies the position of {@link #buffer}
-   */
-  private int findValueInBucketV0(
-      ByteBuffer value,
-      int currBucketFirstValueIndex,
-      int bucketSize,
-      int sharedPrefix
-  )
+  public static final class FrontCodedV0 extends FrontCodedIndexed
   {
-    int relativePosition = 0;
-    int prefixLength;
-    // scan through bucket values until we find match or compare numValues
-    int insertionPoint = 1;
-    while (++relativePosition < bucketSize) {
-      prefixLength = VByte.readInt(buffer);
-      if (prefixLength > sharedPrefix) {
-        // this value shares more in common with the first value, so the value we are looking for comes after
-        final int skip = VByte.readInt(buffer);
-        buffer.position(buffer.position() + skip);
-        insertionPoint++;
-      } else if (prefixLength < sharedPrefix) {
-        // prefix is smaller, that means this value sorts ahead of it
-        break;
-      } else {
-        final int fragmentLength = VByte.readInt(buffer);
-        final int common = Math.min(fragmentLength, value.remaining() - prefixLength);
-        int fragmentComparison = 0;
-        for (int i = 0; i < common; i++) {
-          fragmentComparison = StringUtils.compareUtf8UsingJavaStringOrdering(
-              buffer.get(buffer.position() + i),
-              value.get(prefixLength + i)
-          );
-          if (fragmentComparison != 0) {
-            break;
-          }
-        }
-        if (fragmentComparison == 0) {
-          fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining());
-        }
+    private FrontCodedV0(
+        ByteBuffer buffer,
+        ByteOrder order,
+        int bucketSize,
+        int numValues,
+        boolean hasNull,
+        int offsetsPosition
+    )
+    {
+      super(buffer, order, bucketSize, numValues, hasNull, offsetsPosition);
+    }
 
-        if (fragmentComparison == 0) {
-          return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
-        } else if (fragmentComparison < 0) {
-          buffer.position(buffer.position() + fragmentLength);
-          insertionPoint++;
+    @Override
+    ByteBuffer getFromBucket(ByteBuffer buffer, int offset)
+    {
+      return getValueFromBucket(buffer, offset);
+    }
+
+    public static ByteBuffer getValueFromBucket(ByteBuffer buffer, int offset)
+    {
+      int prefixPosition;
+      if (offset == 0) {
+        final int length = VByte.readInt(buffer);
+        final ByteBuffer firstValue = buffer.asReadOnlyBuffer();
+        firstValue.limit(firstValue.position() + length);
+        return firstValue;
+      } else {
+        final int firstLength = VByte.readInt(buffer);
+        prefixPosition = buffer.position();
+        buffer.position(buffer.position() + firstLength);
+      }
+      int pos = 0;
+      int prefixLength;
+      int fragmentLength;
+      int fragmentPosition;
+      // scan through bucket values until we reach offset
+      do {
+        prefixLength = VByte.readInt(buffer);
+        if (++pos < offset) {
+          // not there yet, no need to read anything other than the length to skip ahead
+          final int skipLength = VByte.readInt(buffer);
+          buffer.position(buffer.position() + skipLength);
         } else {
+          // we've reached our destination
+          fragmentLength = VByte.readInt(buffer);
+          fragmentPosition = buffer.position();
           break;
         }
+      } while (true);
+      final int valueLength = prefixLength + fragmentLength;
+      ByteBuffer value = ByteBuffer.allocate(valueLength);
+      for (int i = 0; i < valueLength; i++) {
+        if (i < prefixLength) {
+          value.put(buffer.get(prefixPosition + i));
+        } else {
+          value.put(buffer.get(fragmentPosition + i - prefixLength));
+        }
       }
+      value.flip();
+      return value;
     }
-    // (-(insertion point) - 1)
-    return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
-  }
 
-  /**
-   * Get a value from a bucket at a relative position.
-   * <p>
-   * This method modifies the position of the buffer.
-   */
-  static ByteBuffer getFromBucketV0(ByteBuffer buffer, int offset)
-  {
-    int prefixPosition;
-    if (offset == 0) {
+    @Override
+    ByteBuffer[] readBucket(ByteBuffer buffer, int numValues)
+    {
       final int length = VByte.readInt(buffer);
-      final ByteBuffer firstValue = buffer.asReadOnlyBuffer();
-      firstValue.limit(firstValue.position() + length);
-      return firstValue;
-    } else {
-      final int firstLength = VByte.readInt(buffer);
-      prefixPosition = buffer.position();
-      buffer.position(buffer.position() + firstLength);
-    }
-    int pos = 0;
-    int prefixLength;
-    int fragmentLength;
-    int fragmentPosition;
-    // scan through bucket values until we reach offset
-    do {
-      prefixLength = VByte.readInt(buffer);
-      if (++pos < offset) {
-        // not there yet, no need to read anything other than the length to skip ahead
-        final int skipLength = VByte.readInt(buffer);
-        buffer.position(buffer.position() + skipLength);
-      } else {
-        // we've reached our destination
-        fragmentLength = VByte.readInt(buffer);
-        fragmentPosition = buffer.position();
-        break;
-      }
-    } while (true);
-    final int valueLength = prefixLength + fragmentLength;
-    ByteBuffer value = ByteBuffer.allocate(valueLength);
-    for (int i = 0; i < valueLength; i++) {
-      if (i < prefixLength) {
-        value.put(buffer.get(prefixPosition + i));
-      } else {
-        value.put(buffer.get(fragmentPosition + i - prefixLength));
+      final byte[] prefixBytes = new byte[length];
+      buffer.get(prefixBytes, 0, length);
+      final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues];
+      bucketBuffers[0] = ByteBuffer.wrap(prefixBytes);
+      int pos = 1;
+      while (pos < numValues) {
+        final int prefixLength = VByte.readInt(buffer);
+        final int fragmentLength = VByte.readInt(buffer);
+        final byte[] fragment = new byte[fragmentLength];
+        buffer.get(fragment, 0, fragmentLength);
+        final ByteBuffer value = ByteBuffer.allocate(prefixLength + fragmentLength);
+        value.put(prefixBytes, 0, prefixLength);
+        value.put(fragment);
+        value.flip();
+        bucketBuffers[pos++] = value;
       }
+      return bucketBuffers;
     }
-    value.flip();
-    return value;
-  }
 
-
-  /**
-   * Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes.
-   * <p>
-   * This method modifies the position of the buffer.
-   */
-  private static ByteBuffer[] readBucketV0(ByteBuffer bucket, int numValues)
-  {
-    final int length = VByte.readInt(bucket);
-    final byte[] prefixBytes = new byte[length];
-    bucket.get(prefixBytes, 0, length);
-    final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues];
-    bucketBuffers[0] = ByteBuffer.wrap(prefixBytes);
-    int pos = 1;
-    while (pos < numValues) {
-      final int prefixLength = VByte.readInt(bucket);
-      final int fragmentLength = VByte.readInt(bucket);
-      final byte[] fragment = new byte[fragmentLength];
-      bucket.get(fragment, 0, fragmentLength);
-      final ByteBuffer value = ByteBuffer.allocate(prefixLength + fragmentLength);
-      value.put(prefixBytes, 0, prefixLength);
-      value.put(fragment);
-      value.flip();
-      bucketBuffers[pos++] = value;
-    }
-    return bucketBuffers;
-  }
-
-  /**
-   * Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against
-   * and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is
-   * the length which the value has in common with the previous value of the bucket.
-   * <p>
-   * This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when
-   * possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always
-   * sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment
-   * with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values
-   * which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a
-   * full comparison if the prefix length is the same
-   * <p>
-   * this method modifies the position of {@link #buffer}
-   */
-  private int findValueInBucketV1(
-      ByteBuffer value,
-      int currBucketFirstValueIndex,
-      int bucketSize,
-      int sharedPrefixLength
-  )
-  {
-    int relativePosition = 0;
-    int prefixLength;
-    // scan through bucket values until we find match or compare numValues
-    int insertionPoint = 1;
-    while (++relativePosition < bucketSize) {
-      prefixLength = VByte.readInt(buffer);
-      if (prefixLength > sharedPrefixLength) {
-        // bucket value shares more in common with the preceding value, so the value we are looking for comes after
-        final int skip = VByte.readInt(buffer);
-        buffer.position(buffer.position() + skip);
-        insertionPoint++;
-      } else if (prefixLength < sharedPrefixLength) {
-        // bucket value prefix is smaller, that means the value we are looking for sorts ahead of it
-        break;
-      } else {
-        // value has the same shared prefix, so compare additional values to find
-        final int fragmentLength = VByte.readInt(buffer);
-        final int common = Math.min(fragmentLength, value.remaining() - prefixLength);
-        int fragmentComparison = 0;
-        boolean shortCircuit = false;
-        for (int i = 0; i < common; i++) {
-          fragmentComparison = StringUtils.compareUtf8UsingJavaStringOrdering(
-              buffer.get(buffer.position() + i),
-              value.get(prefixLength + i)
-          );
-          if (fragmentComparison != 0) {
-            sharedPrefixLength = prefixLength + i;
-            shortCircuit = true;
-            break;
+    @Override
+    int findInBucket(ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, int sharedPrefixLength)
+    {
+      int relativePosition = 0;
+      int prefixLength;
+      // scan through bucket values until we find match or compare numValues
+      int insertionPoint = 1;
+      while (++relativePosition < bucketSize) {
+        prefixLength = VByte.readInt(buffer);
+        if (prefixLength > sharedPrefixLength) {
+          // this value shares more in common with the first value, so the value we are looking for comes after
+          final int skip = VByte.readInt(buffer);
+          buffer.position(buffer.position() + skip);
+          insertionPoint++;
+        } else if (prefixLength < sharedPrefixLength) {
+          // prefix is smaller, that means this value sorts ahead of it
+          break;
+        } else {
+          final int fragmentLength = VByte.readInt(buffer);
+          final int common = Math.min(fragmentLength, value.remaining() - prefixLength);
+          int fragmentComparison = 0;
+          for (int i = 0; i < common; i++) {
+            fragmentComparison = StringUtils.compareUtf8UsingJavaStringOrdering(
+                buffer.get(buffer.position() + i),
+                value.get(prefixLength + i)
+            );
+            if (fragmentComparison != 0) {
+              break;
+            }
+          }
+          if (fragmentComparison == 0) {
+            fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining());
           }
-        }
-        if (fragmentComparison == 0) {
-          fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining());
-        }
 
-        if (fragmentComparison == 0) {
-          return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
-        } else if (fragmentComparison < 0) {
-          // value we are looking for is longer than the current bucket value, continue on
-          if (!shortCircuit) {
-            sharedPrefixLength = prefixLength + common;
+          if (fragmentComparison == 0) {
+            return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
+          } else if (fragmentComparison < 0) {
+            buffer.position(buffer.position() + fragmentLength);
+            insertionPoint++;
+          } else {
+            break;
           }
-          buffer.position(buffer.position() + fragmentLength);
-          insertionPoint++;
-        } else {
-          break;
         }
       }
+      // (-(insertion point) - 1)
+      return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
     }
-    // (-(insertion point) - 1)
-    return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
   }
 
-  private ByteBuffer getFromBucketV1(ByteBuffer buffer, int offset)
+  public static final class FrontCodedV1 extends FrontCodedIndexed
   {
-    // first value is written whole
-    final int length = VByte.readInt(buffer);
-    if (offset == 0) {
-      // return first value directly from underlying buffer since it is stored whole
-      final ByteBuffer value = buffer.asReadOnlyBuffer();
-      value.limit(value.position() + length);
-      return value;
+    private final int[] unwindPrefixLength;
+    private final int[] unwindBufferPosition;
+
+    private FrontCodedV1(
+        ByteBuffer buffer,
+        ByteOrder order,
+        int bucketSize,
+        int numValues,
+        boolean hasNull,
+        int offsetsPosition
+    )
+    {
+      super(buffer, order, bucketSize, numValues, hasNull, offsetsPosition);
+      this.unwindPrefixLength = new int[bucketSize];
+      this.unwindBufferPosition = new int[bucketSize];
     }
-    int pos = 0;
-    int prefixLength;
-    int fragmentLength;
-    unwindPrefixLength[pos] = 0;
-    unwindBufferPosition[pos] = buffer.position();
-
-    buffer.position(buffer.position() + length);
-    do {
-      prefixLength = VByte.readInt(buffer);
-      if (++pos < offset) {
-        // not there yet, no need to read anything other than the length to skip ahead
-        final int skipLength = VByte.readInt(buffer);
-        unwindPrefixLength[pos] = prefixLength;
-        unwindBufferPosition[pos] = buffer.position();
-        buffer.position(buffer.position() + skipLength);
-      } else {
-        // we've reached our destination
-        fragmentLength = VByte.readInt(buffer);
-        if (prefixLength == 0) {
-          // no prefix, return it directly from the underlying buffer
-          final ByteBuffer value = buffer.asReadOnlyBuffer();
-          value.limit(value.position() + fragmentLength);
-          return value;
-        }
-        break;
+
+    @Override
+    ByteBuffer getFromBucket(ByteBuffer buffer, int offset)
+    {
+      // first value is written whole
+      final int length = VByte.readInt(buffer);
+      if (offset == 0) {
+        // return first value directly from underlying buffer since it is stored whole
+        final ByteBuffer value = buffer.asReadOnlyBuffer();
+        value.limit(value.position() + length);
+        return value;
       }
-    } while (true);
-    final int valueLength = prefixLength + fragmentLength;
-    final byte[] valueBytes = new byte[valueLength];
-    buffer.get(valueBytes, prefixLength, fragmentLength);
-    for (int i = prefixLength; i > 0;) {
-      // previous value had a larger prefix than or the same as the value we are looking for
-      // skip it since the fragment doesn't have anything we need
-      if (unwindPrefixLength[--pos] >= i) {
-        continue;
+      int pos = 0;
+      int prefixLength;
+      int fragmentLength;
+      unwindPrefixLength[pos] = 0;
+      unwindBufferPosition[pos] = buffer.position();
+
+      buffer.position(buffer.position() + length);
+      do {
+        prefixLength = VByte.readInt(buffer);
+        if (++pos < offset) {
+          // not there yet, no need to read anything other than the length to skip ahead
+          final int skipLength = VByte.readInt(buffer);
+          unwindPrefixLength[pos] = prefixLength;
+          unwindBufferPosition[pos] = buffer.position();
+          buffer.position(buffer.position() + skipLength);
+        } else {
+          // we've reached our destination
+          fragmentLength = VByte.readInt(buffer);
+          if (prefixLength == 0) {
+            // no prefix, return it directly from the underlying buffer
+            final ByteBuffer value = buffer.asReadOnlyBuffer();
+            value.limit(value.position() + fragmentLength);
+            return value;
+          }
+          break;
+        }
+      } while (true);
+      final int valueLength = prefixLength + fragmentLength;
+      final byte[] valueBytes = new byte[valueLength];
+      buffer.get(valueBytes, prefixLength, fragmentLength);
+      for (int i = prefixLength; i > 0;) {
+        // previous value had a larger prefix than or the same as the value we are looking for
+        // skip it since the fragment doesn't have anything we need
+        if (unwindPrefixLength[--pos] >= i) {
+          continue;
+        }
+        buffer.position(unwindBufferPosition[pos]);
+        buffer.get(valueBytes, unwindPrefixLength[pos], i - unwindPrefixLength[pos]);
+        i = unwindPrefixLength[pos];
       }
-      buffer.position(unwindBufferPosition[pos]);
-      buffer.get(valueBytes, unwindPrefixLength[pos], i - unwindPrefixLength[pos]);
-      i = unwindPrefixLength[pos];
+      return ByteBuffer.wrap(valueBytes);
     }
-    return ByteBuffer.wrap(valueBytes);
-  }
 
-  /**
-   * Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes.
-   * <p>
-   * This method modifies the position of the buffer.
-   */
-  private ByteBuffer[] readBucketV1(ByteBuffer bucket, int numValues)
-  {
-    final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues];
-
-    // first value is written whole
-    final int length = VByte.readInt(bucket);
-    byte[] prefixBytes = new byte[length];
-    bucket.get(prefixBytes, 0, length);
-    bucketBuffers[0] = ByteBuffer.wrap(prefixBytes);
-    int pos = 1;
-    while (pos < numValues) {
-      final int prefixLength = VByte.readInt(bucket);
-      final int fragmentLength = VByte.readInt(bucket);
-      byte[] nextValueBytes = new byte[prefixLength + fragmentLength];
-      System.arraycopy(prefixBytes, 0, nextValueBytes, 0, prefixLength);
-      bucket.get(nextValueBytes, prefixLength, fragmentLength);
-      final ByteBuffer value = ByteBuffer.wrap(nextValueBytes);
-      prefixBytes = nextValueBytes;
-      bucketBuffers[pos++] = value;
-    }
-    return bucketBuffers;
-  }
+    @Override
+    ByteBuffer[] readBucket(ByteBuffer buffer, int numValues)
+    {
+      final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues];
 
-  @FunctionalInterface
-  interface GetBucketValue
-  {
-    ByteBuffer get(ByteBuffer buffer, int offset);
-  }
+      // first value is written whole
+      final int length = VByte.readInt(buffer);
+      byte[] prefixBytes = new byte[length];
+      buffer.get(prefixBytes, 0, length);
+      bucketBuffers[0] = ByteBuffer.wrap(prefixBytes);
+      int pos = 1;
+      while (pos < numValues) {
+        final int prefixLength = VByte.readInt(buffer);
+        final int fragmentLength = VByte.readInt(buffer);
+        byte[] nextValueBytes = new byte[prefixLength + fragmentLength];
+        System.arraycopy(prefixBytes, 0, nextValueBytes, 0, prefixLength);
+        buffer.get(nextValueBytes, prefixLength, fragmentLength);
+        final ByteBuffer value = ByteBuffer.wrap(nextValueBytes);
+        prefixBytes = nextValueBytes;
+        bucketBuffers[pos++] = value;
+      }
+      return bucketBuffers;
+    }
 
-  @FunctionalInterface
-  interface ReadBucket
-  {
-    ByteBuffer[] readBucket(ByteBuffer buffer, int bucketSize);
-  }
+    @Override
+    int findInBucket(ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, int sharedPrefixLength)
+    {
+      int relativePosition = 0;
+      int prefixLength;
+      // scan through bucket values until we find match or compare numValues
+      int insertionPoint = 1;
+      while (++relativePosition < bucketSize) {
+        prefixLength = VByte.readInt(buffer);
+        if (prefixLength > sharedPrefixLength) {
+          // bucket value shares more in common with the preceding value, so the value we are looking for comes after
+          final int skip = VByte.readInt(buffer);
+          buffer.position(buffer.position() + skip);
+          insertionPoint++;
+        } else if (prefixLength < sharedPrefixLength) {
+          // bucket value prefix is smaller, that means the value we are looking for sorts ahead of it
+          break;
+        } else {
+          // value has the same shared prefix, so compare additional values to find
+          final int fragmentLength = VByte.readInt(buffer);
+          final int common = Math.min(fragmentLength, value.remaining() - prefixLength);
+          int fragmentComparison = 0;
+          boolean shortCircuit = false;
+          for (int i = 0; i < common; i++) {
+            fragmentComparison = StringUtils.compareUtf8UsingJavaStringOrdering(
+                buffer.get(buffer.position() + i),
+                value.get(prefixLength + i)
+            );
+            if (fragmentComparison != 0) {
+              sharedPrefixLength = prefixLength + i;
+              shortCircuit = true;
+              break;
+            }
+          }
+          if (fragmentComparison == 0) {
+            fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining());
+          }
 
-  @FunctionalInterface
-  interface FindInBucket
-  {
-    int find(ByteBuffer value, int currBucketFirstValueIndex, int bucketSize, int sharedPrefixLength);
+          if (fragmentComparison == 0) {
+            return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
+          } else if (fragmentComparison < 0) {
+            // value we are looking for is longer than the current bucket value, continue on
+            if (!shortCircuit) {
+              sharedPrefixLength = prefixLength + common;
+            }
+            buffer.position(buffer.position() + fragmentLength);
+            insertionPoint++;
+          } else {
+            break;
+          }
+        }
+      }
+      // (-(insertion point) - 1)
+      return -(currBucketFirstValueIndex + adjustIndex) + (~insertionPoint);
+    }
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java
index 83b4850f97..efdf7336d1 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java
@@ -217,7 +217,7 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
       bucketBuffer.clear();
       final ByteBuffer valueBuffer = version == FrontCodedIndexed.V1
                                      ? getFromBucketV1(bucketBuffer, relativeIndex, bucketSize)
-                                     : FrontCodedIndexed.getFromBucketV0(bucketBuffer, relativeIndex);
+                                     : FrontCodedIndexed.FrontCodedV0.getValueFromBucket(bucketBuffer, relativeIndex);
       final byte[] valueBytes = new byte[valueBuffer.limit() - valueBuffer.position()];
       valueBuffer.get(valueBytes);
       return valueBytes;
@@ -413,15 +413,15 @@ public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
     return StringUtils.compareUtf8UsingJavaStringOrdering(b1, b2);
   }
 
-
   /**
-   * same as {@link FrontCodedIndexed#getFromBucketV1(ByteBuffer, int)} but without re-using prefixLength and buffer position
-   * arrays so has more overhead/garbage creation than the instance method.
+   * same as {@link FrontCodedIndexed.FrontCodedV1#getFromBucket(ByteBuffer, int)} but
+   * without re-using prefixLength and buffer position arrays so has more overhead/garbage creation than the instance
+   * method.
    *
    * Note: adding the unwindPrefixLength and unwindBufferPosition arrays as arguments and having
-   * {@link FrontCodedIndexed#getFromBucketV1(ByteBuffer, int)} call this static method added 5-10ns of overhead
-   * compared to having its own copy of the code, presumably due to the overhead of an additional method call and extra
-   * arguments.
+   * {@link FrontCodedIndexed.FrontCodedV1#getFromBucket(ByteBuffer, int)} call this static method added 5-10ns of
+   * overhead compared to having its own copy of the code, presumably due to the overhead of an additional method call
+   * and extra arguments.
    *
    * As such, since the writer is the only user of this method, it has been copied here...
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org