You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2022/10/26 01:05:43 UTC

[druid] branch master updated: add support for 'front coded' string dictionaries for smaller string columns (#12277)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 77e4246598 add support for 'front coded' string dictionaries for smaller string columns (#12277)
77e4246598 is described below

commit 77e4246598428ae501096b10e6b4d6438206e297
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Tue Oct 25 18:05:38 2022 -0700

    add support for 'front coded' string dictionaries for smaller string columns (#12277)
    
    * add FrontCodedIndexed for delta string encoding
    
    * now for actual segments
    
    * fix indexOf
    
    * fixes and thread safety
    
    * add bucket size 4, which seems generally better
    
    * fixes
    
    * fixes maybe
    
    * update indexes to latest interfaces
    
    * utf8 support
    
    * adjust
    
    * oops
    
    * oops
    
    * refactor, better, faster
    
    * more test
    
    * fixes
    
    * revert
    
    * adjustments
    
    * fix prefixing
    
    * more chill
    
    * sql nested benchmark too
    
    * refactor
    
    * more comments and javadocs
    
    * better get
    
    * remove base class
    
    * fix
    
    * hot rod
    
    * adjust comments
    
    * faster still
    
    * minor adjustments
    
    * spatial index support
    
    * spotbugs
    
    * add isSorted to Indexed to strengthen indexOf contract if set, improve javadocs, add docs
    
    * fix docs
    
    * push into constructor
    
    * use base buffer instead of copy
    
    * oops
---
 ...tionaryEncodedStringIndexSupplierBenchmark.java |   9 +-
 .../benchmark/FrontCodedIndexedBenchmark.java      | 285 ++++++++++
 .../apache/druid/benchmark/query/SqlBenchmark.java |  51 +-
 .../benchmark/query/SqlNestedDataBenchmark.java    |  25 +-
 .../apache/druid/java/util/common/StringUtils.java |  20 +-
 .../java/org/apache/druid/segment/data/VByte.java  | 131 +++++
 .../org/apache/druid/segment/data/VByteTest.java   |  75 +++
 docs/ingestion/ingestion-spec.md                   |  33 +-
 .../query/dimension/ListFilteredDimensionSpec.java |  12 +-
 .../segment/DictionaryEncodedColumnMerger.java     |  14 +-
 .../java/org/apache/druid/segment/IndexSpec.java   |  69 ++-
 .../druid/segment/NestedDataColumnMerger.java      |   5 +-
 .../druid/segment/StringDimensionMergerV9.java     |  12 +
 ...xedStringDictionaryEncodedStringValueIndex.java |  69 +++
 .../column/IndexedStringDruidPredicateIndex.java   | 107 ++++
 .../IndexedUtf8LexicographicalRangeIndex.java      | 213 ++++++++
 .../segment/column/IndexedUtf8ValueSetIndex.java   | 251 +++++++++
 .../segment/column/StringEncodingStrategies.java   | 126 +++++
 .../segment/column/StringEncodingStrategy.java     | 148 +++++
 .../StringFrontCodedDictionaryEncodedColumn.java   | 598 +++++++++++++++++++++
 .../apache/druid/segment/data/CachingIndexed.java  |   6 +
 .../druid/segment/data/DictionaryWriter.java       |  37 ++
 .../data/EncodedStringDictionaryWriter.java        |  90 ++++
 .../apache/druid/segment/data/FixedIndexed.java    |   6 +
 .../druid/segment/data/FrontCodedIndexed.java      | 504 +++++++++++++++++
 .../segment/data/FrontCodedIndexedWriter.java      | 347 ++++++++++++
 .../apache/druid/segment/data/GenericIndexed.java  |  17 +
 .../druid/segment/data/GenericIndexedWriter.java   |  15 +-
 .../org/apache/druid/segment/data/Indexed.java     |  22 +-
 .../nested/CompressedNestedDataComplexColumn.java  |  10 +-
 .../segment/nested/GlobalDimensionDictionary.java  |   3 +-
 .../segment/nested/NestedDataColumnSerializer.java |  12 +-
 .../segment/nested/NestedDataColumnSupplier.java   |  38 +-
 .../NestedFieldLiteralColumnIndexSupplier.java     |  33 +-
 .../NestedFieldLiteralDictionaryEncodedColumn.java |  23 +-
 .../serde/DictionaryEncodedColumnPartSerde.java    | 115 +++-
 .../DictionaryEncodedStringIndexSupplier.java      | 526 +-----------------
 .../serde/StringFrontCodedColumnIndexSupplier.java | 123 +++++
 ...gFrontCodedDictionaryEncodedColumnSupplier.java |  61 +++
 .../apache/druid/query/QueryRunnerTestHelper.java  |   4 +-
 .../segment/CustomSegmentizerFactoryTest.java      |   8 +-
 .../org/apache/druid/segment/IndexSpecTest.java    |   2 +-
 .../java/org/apache/druid/segment/TestIndex.java   |  27 +-
 .../druid/segment/data/FrontCodedIndexedTest.java  | 332 ++++++++++++
 .../druid/segment/filter/BaseFilterTest.java       |  46 +-
 .../druid/segment/filter/SpatialFilterTest.java    |  18 +
 .../druid/segment/generator/SegmentGenerator.java  |  26 +-
 .../NestedFieldLiteralColumnIndexSupplierTest.java |  89 +--
 website/.spelling                                  |   3 +
 49 files changed, 4099 insertions(+), 697 deletions(-)

diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/DictionaryEncodedStringIndexSupplierBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/DictionaryEncodedStringIndexSupplierBenchmark.java
index acacce00e5..282b25e198 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/DictionaryEncodedStringIndexSupplierBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/DictionaryEncodedStringIndexSupplierBenchmark.java
@@ -27,6 +27,7 @@ import org.apache.druid.collections.bitmap.RoaringBitmapFactory;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.column.BitmapColumnIndex;
+import org.apache.druid.segment.column.IndexedUtf8ValueSetIndex;
 import org.apache.druid.segment.column.StringValueSetIndex;
 import org.apache.druid.segment.data.BitmapSerdeFactory;
 import org.apache.druid.segment.data.GenericIndexed;
@@ -71,7 +72,7 @@ public class DictionaryEncodedStringIndexSupplierBenchmark
   public static class BenchmarkState
   {
     @Nullable
-    private DictionaryEncodedStringIndexSupplier.GenericIndexedDictionaryEncodedStringValueSetIndex stringValueSetIndex;
+    private IndexedUtf8ValueSetIndex<?> stringValueSetIndex;
     private final TreeSet<ByteBuffer> values = new TreeSet<>();
     private static final int START_INT = 10_000_000;
 
@@ -114,11 +115,9 @@ public class DictionaryEncodedStringIndexSupplierBenchmark
                          .iterator(),
           serdeFactory.getObjectStrategy()
       );
-      DictionaryEncodedStringIndexSupplier dictionaryEncodedStringIndexSupplier =
+      DictionaryEncodedStringIndexSupplier indexSupplier =
           new DictionaryEncodedStringIndexSupplier(bitmapFactory, dictionary, dictionaryUtf8, bitmaps, null);
-      stringValueSetIndex =
-          (DictionaryEncodedStringIndexSupplier.GenericIndexedDictionaryEncodedStringValueSetIndex)
-              dictionaryEncodedStringIndexSupplier.as(StringValueSetIndex.class);
+      stringValueSetIndex = (IndexedUtf8ValueSetIndex<?>) indexSupplier.as(StringValueSetIndex.class);
       List<Integer> filterValues = new ArrayList<>();
       List<Integer> nonFilterValues = new ArrayList<>();
       for (int i = 0; i < dictionarySize; i++) {
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java
new file mode 100644
index 0000000000..2dba1ba5c0
--- /dev/null
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FrontCodedIndexedBenchmark.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.benchmark.compression.EncodingSizeProfiler;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.StringEncodingStrategies;
+import org.apache.druid.segment.data.FrontCodedIndexed;
+import org.apache.druid.segment.data.FrontCodedIndexedWriter;
+import org.apache.druid.segment.data.GenericIndexed;
+import org.apache.druid.segment.data.GenericIndexedWriter;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@OperationsPerInvocation(GenericIndexedBenchmark.ITERATIONS)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@Fork(1)
+@State(Scope.Benchmark)
+public class FrontCodedIndexedBenchmark
+{
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  @Param({"10000", "100000"})
+  public int numElements;
+
+  @Param({"16"})
+  public int width;
+
+  @Param({"generic", "front-coded-4", "front-coded-16"})
+  public String indexType;
+
+  @Param({"10000"})
+  public int numOperations;
+
+  private File fileFrontCoded;
+  private File fileGeneric;
+  private File smooshDirFrontCoded;
+  private File smooshDirGeneric;
+  private GenericIndexed<ByteBuffer> genericIndexed;
+  private FrontCodedIndexed frontCodedIndexed;
+
+  private Indexed<ByteBuffer> indexed;
+
+  private String[] values;
+  private int[] iterationIndexes;
+  private String[] elementsToSearch;
+
+  private int written = 0;
+
+  @Setup(Level.Trial)
+  public void createIndex() throws IOException
+  {
+    values = new String[numElements];
+    TreeSet<String> set = new TreeSet<>(ColumnType.STRING.getStrategy());
+    while (set.size() < numElements) {
+      set.add(getRandomId(width));
+    }
+
+    Iterator<String> iterator = set.iterator();
+
+    GenericIndexedWriter<String> genericIndexedWriter = new GenericIndexedWriter<>(
+        new OffHeapMemorySegmentWriteOutMedium(),
+        "genericIndexedBenchmark",
+        GenericIndexed.STRING_STRATEGY
+    );
+    genericIndexedWriter.open();
+
+    FrontCodedIndexedWriter frontCodedIndexedWriter = new FrontCodedIndexedWriter(
+        new OnHeapMemorySegmentWriteOutMedium(),
+        ByteOrder.nativeOrder(),
+        "front-coded-4".equals(indexType) ? 4 : 16
+    );
+    frontCodedIndexedWriter.open();
+
+    int count = 0;
+    while (iterator.hasNext()) {
+      final String next = iterator.next();
+      values[count++] = next;
+      frontCodedIndexedWriter.write(StringUtils.toUtf8Nullable(next));
+      genericIndexedWriter.write(next);
+    }
+    smooshDirFrontCoded = FileUtils.createTempDir();
+    fileFrontCoded = File.createTempFile("frontCodedIndexedBenchmark", "meta");
+    smooshDirGeneric = FileUtils.createTempDir();
+    fileGeneric = File.createTempFile("genericIndexedBenchmark", "meta");
+
+    EncodingSizeProfiler.encodedSize = (int) ("generic".equals(indexType)
+                                              ? genericIndexedWriter.getSerializedSize()
+                                              : frontCodedIndexedWriter.getSerializedSize());
+    try (
+        FileChannel fileChannelFrontCoded = FileChannel.open(
+            fileFrontCoded.toPath(),
+            StandardOpenOption.CREATE, StandardOpenOption.WRITE
+        );
+        FileSmoosher fileSmoosherFrontCoded = new FileSmoosher(smooshDirFrontCoded);
+        FileChannel fileChannelGeneric = FileChannel.open(
+            fileGeneric.toPath(),
+            StandardOpenOption.CREATE, StandardOpenOption.WRITE
+        );
+        FileSmoosher fileSmoosherGeneric = new FileSmoosher(smooshDirGeneric)
+    ) {
+      frontCodedIndexedWriter.writeTo(fileChannelFrontCoded, fileSmoosherFrontCoded);
+      genericIndexedWriter.writeTo(fileChannelGeneric, fileSmoosherGeneric);
+    }
+
+    FileChannel fileChannelGeneric = FileChannel.open(fileGeneric.toPath());
+    MappedByteBuffer byteBufferGeneric = fileChannelGeneric.map(FileChannel.MapMode.READ_ONLY, 0, fileGeneric.length());
+    FileChannel fileChannelFrontCoded = FileChannel.open(fileFrontCoded.toPath());
+    MappedByteBuffer byteBufferFrontCoded = fileChannelFrontCoded.map(
+        FileChannel.MapMode.READ_ONLY,
+        0,
+        fileFrontCoded.length()
+    );
+
+    genericIndexed = GenericIndexed.read(
+        byteBufferGeneric,
+        GenericIndexed.BYTE_BUFFER_STRATEGY,
+        SmooshedFileMapper.load(smooshDirFrontCoded)
+    );
+    frontCodedIndexed = FrontCodedIndexed.read(
+        byteBufferFrontCoded.order(ByteOrder.nativeOrder()),
+        ByteOrder.nativeOrder()
+    ).get();
+
+    // sanity test
+    for (int i = 0; i < numElements; i++) {
+      final String expected = StringUtils.fromUtf8Nullable(genericIndexed.get(i));
+      final String actual = StringUtils.fromUtf8Nullable(frontCodedIndexed.get(i));
+      Preconditions.checkArgument(
+          Objects.equals(expected, actual),
+          "elements not equal: " + i + " " + expected + " " + actual
+      );
+    }
+
+    Iterator<ByteBuffer> genericIterator = genericIndexed.iterator();
+    Iterator<ByteBuffer> frontCodedIterator = frontCodedIndexed.iterator();
+    Iterator<String> frontCodedStringIterator =
+        new StringEncodingStrategies.Utf8ToStringIndexed(frontCodedIndexed).iterator();
+
+    int counter = 0;
+    while (genericIterator.hasNext() && frontCodedIterator.hasNext() && frontCodedStringIterator.hasNext()) {
+      final String expected = StringUtils.fromUtf8Nullable(genericIterator.next());
+      final String actual = StringUtils.fromUtf8Nullable(frontCodedIterator.next());
+      final String actual2 = frontCodedStringIterator.next();
+      Preconditions.checkArgument(
+          Objects.equals(expected, actual),
+          "elements not equal: " + counter + " " + expected + " " + actual
+      );
+      Preconditions.checkArgument(
+          Objects.equals(expected, actual2),
+          "elements not equal: " + counter + " " + expected + " " + actual
+      );
+      counter++;
+    }
+    Preconditions.checkArgument(counter == numElements);
+    Preconditions.checkArgument(genericIterator.hasNext() == frontCodedIterator.hasNext());
+    Preconditions.checkArgument(genericIterator.hasNext() == frontCodedStringIterator.hasNext());
+
+    elementsToSearch = new String[numOperations];
+    for (int i = 0; i < numOperations; i++) {
+      elementsToSearch[i] = values[ThreadLocalRandom.current().nextInt(numElements)];
+    }
+    iterationIndexes = new int[numOperations];
+    for (int i = 0; i < numOperations; i++) {
+      iterationIndexes[i] = ThreadLocalRandom.current().nextInt(numElements);
+    }
+    if ("generic".equals(indexType)) {
+      indexed = genericIndexed.singleThreaded();
+    } else {
+      indexed = frontCodedIndexed;
+    }
+  }
+
+  @Benchmark
+  public void get(Blackhole bh)
+  {
+    for (int i : iterationIndexes) {
+      bh.consume(indexed.get(i));
+    }
+  }
+
+  @Benchmark
+  public int indexOf()
+  {
+    int r = 0;
+    for (String elementToSearch : elementsToSearch) {
+      r ^= indexed.indexOf(StringUtils.toUtf8ByteBuffer(elementToSearch));
+    }
+    return r;
+  }
+
+  @Benchmark
+  public void iterator(Blackhole blackhole)
+  {
+    final Iterator<ByteBuffer> iterator = indexed.iterator();
+    while (iterator.hasNext()) {
+      final ByteBuffer buffer = iterator.next();
+      if (buffer == null) {
+        blackhole.consume(null);
+      } else {
+        blackhole.consume(StringUtils.fromUtf8(buffer));
+      }
+    }
+  }
+
+
+  private static String getRandomId(int width)
+  {
+    final StringBuilder suffix = new StringBuilder(8);
+    for (int i = 0; i < width; ++i) {
+      suffix.append((char) ('a' + ((ThreadLocalRandom.current().nextInt() >>> (i * 4)) & 0x0F)));
+    }
+    return suffix.toString();
+  }
+
+  public static void main(String[] args) throws RunnerException
+  {
+    Options opt = new OptionsBuilder()
+        .include(FrontCodedIndexedBenchmark.class.getSimpleName())
+        .addProfiler(EncodingSizeProfiler.class)
+        .build();
+
+    new Runner(opt).run();
+  }
+}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java
index 09b2081822..4214f00f7a 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java
@@ -34,9 +34,11 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.aggregation.datasketches.hll.sql.HllSketchApproxCountDistinctSqlAggregator;
 import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchApproxQuantileSqlAggregator;
 import org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchObjectSqlAggregator;
+import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.QueryableIndexStorageAdapter;
+import org.apache.druid.segment.column.StringEncodingStrategy;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.generator.SegmentGenerator;
@@ -75,7 +77,6 @@ import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.infra.Blackhole;
 
 import javax.annotation.Nullable;
-
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -87,8 +88,8 @@ import java.util.concurrent.TimeUnit;
  */
 @State(Scope.Benchmark)
 @Fork(value = 1)
-@Warmup(iterations = 5)
-@Measurement(iterations = 15)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
 public class SqlBenchmark
 {
   static {
@@ -391,16 +392,30 @@ public class SqlBenchmark
       // 20: GroupBy, doubles sketches
       "SELECT dimZipf, APPROX_QUANTILE_DS(sumFloatNormal, 0.5), DS_QUANTILES_SKETCH(maxLongUniform) "
       + "FROM foo "
-      + "GROUP BY 1"
+      + "GROUP BY 1",
+
+      // 21, 22: stringy stuff
+      "SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo WHERE dimUniform NOT LIKE '%3' GROUP BY 1, 2",
+      "SELECT dimZipf, SUM(sumLongSequential) FROM foo WHERE dimSequential = '311' GROUP BY 1 ORDER BY 1",
+      // 23: full scan
+      "SELECT * FROM foo",
+      "SELECT * FROM foo WHERE dimSequential IN ('1', '2', '3', '4', '5', '10', '11', '20', '21', '23', '40', '50', '64', '70', '100')",
+      "SELECT * FROM foo WHERE dimSequential > '10' AND dimSequential < '8500'",
+      "SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo WHERE dimSequential IN ('1', '2', '3', '4', '5', '10', '11', '20', '21', '23', '40', '50', '64', '70', '100') GROUP BY 1, 2",
+      "SELECT dimSequential, dimZipf, SUM(sumLongSequential) FROM foo WHERE dimSequential > '10' AND dimSequential < '8500' GROUP BY 1, 2"
+
+
   );
 
   @Param({"5000000"})
   private int rowsPerSegment;
 
-  @Param({"force"})
+  @Param({"false", "force"})
   private String vectorize;
+  @Param({"none", "front-coded-4", "front-coded-16"})
+  private String stringEncoding;
 
-  @Param({"0", "10", "18"})
+  @Param({"4", "5", "6", "7", "8", "10", "11", "12", "19", "21", "22", "23", "26", "27"})
   private String query;
 
   @Param({STORAGE_MMAP, STORAGE_FRAME_ROW, STORAGE_FRAME_COLUMNAR})
@@ -428,7 +443,29 @@ public class SqlBenchmark
 
     final SegmentGenerator segmentGenerator = closer.register(new SegmentGenerator());
     log.info("Starting benchmark setup using cacheDir[%s], rows[%,d].", segmentGenerator.getCacheDir(), rowsPerSegment);
-    final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, Granularities.NONE, rowsPerSegment);
+    StringEncodingStrategy encodingStrategy;
+    if (stringEncoding.startsWith("front-coded")) {
+      String[] split = stringEncoding.split("-");
+      int bucketSize = Integer.parseInt(split[2]);
+      encodingStrategy = new StringEncodingStrategy.FrontCoded(bucketSize);
+    } else {
+      encodingStrategy = new StringEncodingStrategy.Utf8();
+    }
+    final QueryableIndex index = segmentGenerator.generate(
+        dataSegment,
+        schemaInfo,
+        new IndexSpec(
+            null,
+            null,
+            encodingStrategy,
+            null,
+            null,
+            null,
+            null
+        ),
+        Granularities.NONE,
+        rowsPerSegment
+    );
 
     final QueryRunnerFactoryConglomerate conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
 
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
index ed7ad8f214..358c0bb196 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlNestedDataBenchmark.java
@@ -34,8 +34,10 @@ import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.NestedDataDimensionSchema;
 import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.column.StringEncodingStrategy;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.generator.SegmentGenerator;
@@ -71,7 +73,6 @@ import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.infra.Blackhole;
 
 import javax.annotation.Nullable;
-
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -183,6 +184,9 @@ public class SqlNestedDataBenchmark
   })
   private String vectorize;
 
+  @Param({"none", "front-coded-4", "front-coded-16"})
+  private String stringEncoding;
+
   @Param({
       "0",
       "1",
@@ -256,11 +260,30 @@ public class SqlNestedDataBenchmark
                                               .add(new NestedDataDimensionSchema("nested"))
                                               .build();
     DimensionsSpec dimsSpec = new DimensionsSpec(dims);
+
+
+    StringEncodingStrategy encodingStrategy;
+    if (stringEncoding.startsWith("front-coded")) {
+      String[] split = stringEncoding.split("-");
+      int bucketSize = Integer.parseInt(split[2]);
+      encodingStrategy = new StringEncodingStrategy.FrontCoded(bucketSize);
+    } else {
+      encodingStrategy = new StringEncodingStrategy.Utf8();
+    }
     final QueryableIndex index = segmentGenerator.generate(
         dataSegment,
         schemaInfo,
         dimsSpec,
         transformSpec,
+        new IndexSpec(
+            null,
+            null,
+            encodingStrategy,
+            null,
+            null,
+            null,
+            null
+        ),
         Granularities.NONE,
         rowsPerSegment
     );
diff --git a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
index fd74e48d9a..bd17f42c40 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
@@ -100,14 +100,32 @@ public class StringUtils
   }
 
   /**
-   * Decodes a UTF-8 string from the remaining bytes of a buffer.
+   * Decodes a UTF-8 string from the remaining bytes of a non-null buffer.
    * Advances the position of the buffer by {@link ByteBuffer#remaining()}.
+   *
+   * Use {@link #fromUtf8Nullable(ByteBuffer)} if the buffer might be null.
    */
   public static String fromUtf8(final ByteBuffer buffer)
   {
     return StringUtils.fromUtf8(buffer, buffer.remaining());
   }
 
+  /**
+   * If buffer is Decodes a UTF-8 string from the remaining bytes of a buffer.
+   * Advances the position of the buffer by {@link ByteBuffer#remaining()}.
+   *
+   * If the value is null, this method returns null. If the buffer will never be null, use {@link #fromUtf8(ByteBuffer)}
+   * instead.
+   */
+  @Nullable
+  public static String fromUtf8Nullable(@Nullable final ByteBuffer buffer)
+  {
+    if (buffer == null) {
+      return null;
+    }
+    return StringUtils.fromUtf8(buffer, buffer.remaining());
+  }
+
   /**
    * Converts a string to a UTF-8 byte array.
    *
diff --git a/core/src/main/java/org/apache/druid/segment/data/VByte.java b/core/src/main/java/org/apache/druid/segment/data/VByte.java
new file mode 100644
index 0000000000..749382cc00
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/segment/data/VByte.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import java.nio.ByteBuffer;
+
+public class VByte
+{
+  /**
+   * Read a variable byte (vbyte) encoded integer from a {@link ByteBuffer} at the current position. Moves the buffer
+   * ahead by 1 to 5 bytes depending on how many bytes was required to encode the integer value.
+   *
+   * vbyte encoding stores values in the last 7 bits of a byte and reserves the high bit for the 'contination'. If 0,
+   * one or more aditional bytes must be read to complete the value, and a 1 indicates the terminal byte. Because of
+   * this, it can only store positive values, and larger integers can take up to 5 bytes.
+   *
+   * implementation based on:
+   * https://github.com/lemire/JavaFastPFOR/blob/master/src/main/java/me/lemire/integercompression/VariableByte.java
+   *
+   */
+  public static int readInt(ByteBuffer buffer)
+  {
+    byte b;
+    int v = (b = buffer.get()) & 0x7F;
+    if (b < 0) {
+      return v;
+    }
+    v = (((b = buffer.get()) & 0x7F) << 7) | v;
+    if (b < 0) {
+      return v;
+    }
+    v = (((b = buffer.get()) & 0x7F) << 14) | v;
+    if (b < 0) {
+      return v;
+    }
+    v = (((b = buffer.get()) & 0x7F) << 21) | v;
+    if (b < 0) {
+      return v;
+    }
+    v = ((buffer.get() & 0x7F) << 28) | v;
+    return v;
+  }
+
+  /**
+   * Write a variable byte (vbyte) encoded integer to a {@link ByteBuffer} at the current position, advancing the buffer
+   * position by the number of bytes required to represent the integer, between 1 and 5 bytes.
+   *
+   * vbyte encoding stores values in the last 7 bits of a byte and reserves the high bit for the 'contination'. If 0,
+   * one or more aditional bytes must be read to complete the value, and a 1 indicates the terminal byte. Because of
+   * this, it can only store positive values, and larger integers can take up to 5 bytes.
+   *
+   * implementation based on:
+   * https://github.com/lemire/JavaFastPFOR/blob/master/src/main/java/me/lemire/integercompression/VariableByte.java
+   *
+   */
+  public static int writeInt(ByteBuffer buffer, int val)
+  {
+    final int pos = buffer.position();
+    if (val < (1 << 7)) {
+      buffer.put((byte) (val | (1 << 7)));
+    } else if (val < (1 << 14)) {
+      buffer.put((byte) extract7bits(0, val));
+      buffer.put((byte) (extract7bitsmaskless(1, (val)) | (1 << 7)));
+    } else if (val < (1 << 21)) {
+      buffer.put((byte) extract7bits(0, val));
+      buffer.put((byte) extract7bits(1, val));
+      buffer.put((byte) (extract7bitsmaskless(2, (val)) | (1 << 7)));
+    } else if (val < (1 << 28)) {
+      buffer.put((byte) extract7bits(0, val));
+      buffer.put((byte) extract7bits(1, val));
+      buffer.put((byte) extract7bits(2, val));
+      buffer.put((byte) (extract7bitsmaskless(3, (val)) | (1 << 7)));
+    } else {
+      buffer.put((byte) extract7bits(0, val));
+      buffer.put((byte) extract7bits(1, val));
+      buffer.put((byte) extract7bits(2, val));
+      buffer.put((byte) extract7bits(3, val));
+      buffer.put((byte) (extract7bitsmaskless(4, (val)) | (1 << 7)));
+    }
+    return buffer.position() - pos;
+  }
+
+  /**
+   * Compute number of bytes required to represent variable byte encoded integer.
+   *
+   * vbyte encoding stores values in the last 7 bits of a byte and reserves the high bit for the 'contination'. If 0,
+   * one or more aditional bytes must be read to complete the value, and a 1 indicates the terminal byte. Because of
+   * this, it can only store positive values, and larger integers can take up to 5 bytes.
+   */
+  public static int computeIntSize(int val)
+  {
+    if (val < (1 << 7)) {
+      return 1;
+    } else if (val < (1 << 14)) {
+      return 2;
+    } else if (val < (1 << 21)) {
+      return 3;
+    } else if (val < (1 << 28)) {
+      return 4;
+    } else {
+      return 5;
+    }
+  }
+
+  private static byte extract7bits(int i, int val)
+  {
+    return (byte) ((val >> (7 * i)) & ((1 << 7) - 1));
+  }
+
+  private static byte extract7bitsmaskless(int i, int val)
+  {
+    return (byte) ((val >> (7 * i)));
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/segment/data/VByteTest.java b/core/src/test/java/org/apache/druid/segment/data/VByteTest.java
new file mode 100644
index 0000000000..7dde8e3fca
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/segment/data/VByteTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Collection;
+
+@RunWith(Parameterized.class)
+public class VByteTest
+{
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(new Object[]{ByteOrder.LITTLE_ENDIAN}, new Object[]{ByteOrder.BIG_ENDIAN});
+  }
+
+  private final ByteOrder order;
+
+  public VByteTest(ByteOrder byteOrder)
+  {
+    this.order = byteOrder;
+  }
+
+  @Test
+  public void testVbyte()
+  {
+    ByteBuffer buffer = ByteBuffer.allocate(24).order(order);
+    roundTrip(buffer, 0, 0, 1);
+    roundTrip(buffer, 0, 4, 1);
+    roundTrip(buffer, 0, 224, 2);
+    roundTrip(buffer, 0, 1024, 2);
+    roundTrip(buffer, 0, 1 << 14 - 1, 2);
+    roundTrip(buffer, 0, 1 << 14, 3);
+    roundTrip(buffer, 0, 1 << 16, 3);
+    roundTrip(buffer, 0, 1 << 25, 4);
+    roundTrip(buffer, 0, 1 << 28 - 1, 4);
+    roundTrip(buffer, 0, 1 << 28, 5);
+    roundTrip(buffer, 0, Integer.MAX_VALUE, 5);
+  }
+
+  private static void roundTrip(ByteBuffer buffer, int position, int value, int expectedSize)
+  {
+    Assert.assertEquals(expectedSize, VByte.computeIntSize(value));
+    buffer.position(position);
+    VByte.writeInt(buffer, value);
+    Assert.assertEquals(expectedSize, buffer.position() - position);
+    buffer.position(position);
+    Assert.assertEquals(value, VByte.readInt(buffer));
+    Assert.assertEquals(expectedSize, buffer.position() - position);
+  }
+}
diff --git a/docs/ingestion/ingestion-spec.md b/docs/ingestion/ingestion-spec.md
index acfe73c49f..058858292f 100644
--- a/docs/ingestion/ingestion-spec.md
+++ b/docs/ingestion/ingestion-spec.md
@@ -469,16 +469,43 @@ is:
 |indexSpecForIntermediatePersists|Defines segment storage format options to use at indexing time for intermediate persisted temporary segments.|See [`indexSpec`](#indexspec) for more information.|
 |Other properties|Each ingestion method has its own list of additional tuning properties. See the documentation for each method for a full list: [Kafka indexing service](../development/extensions-core/kafka-supervisor-reference.md#tuningconfig), [Kinesis indexing service](../development/extensions-core/kinesis-ingestion.md#tuningconfig), [Native batch](native-batch.md#tuningconfig), and [Hadoop-based](hadoop.md#tuningconfig).||
 
-#### `indexSpec`
+### `indexSpec`
 
 The `indexSpec` object can include the following properties:
 
 |Field|Description|Default|
 |-----|-----------|-------|
 |bitmap|Compression format for bitmap indexes. Should be a JSON object with `type` set to `roaring` or `concise`. For type `roaring`, the boolean property `compressRunOnSerialization` (defaults to true) controls whether or not run-length encoding will be used when it is determined to be more space-efficient.|`{"type": "roaring"}`|
-|dimensionCompression|Compression format for dimension columns. Options are `lz4`, `lzf`, or `uncompressed`.|`lz4`|
-|metricCompression|Compression format for primitive type metric columns. Options are `lz4`, `lzf`, `uncompressed`, or `none` (which is more efficient than `uncompressed`, but not supported by older versions of Druid).|`lz4`|
+|dimensionCompression|Compression format for dimension columns. Options are `lz4`, `lzf`, `zstd`, or `uncompressed`.|`lz4`|
+|stringDictionaryEncoding|Encoding format for string typed column value dictionaries.|`{"type":"utf8"}`|
+|metricCompression|Compression format for primitive type metric columns. Options are `lz4`, `lzf`, `zstd`, `uncompressed`, or `none` (which is more efficient than `uncompressed`, but not supported by older versions of Druid).|`lz4`|
 |longEncoding|Encoding format for long-typed columns. Applies regardless of whether they are dimensions or metrics. Options are `auto` or `longs`. `auto` encodes the values using offset or lookup table depending on column cardinality, and store them with variable size. `longs` stores the value as-is with 8 bytes each.|`longs`|
+|jsonCompression|Compression format to use for nested column raw data. Options are `lz4`, `lzf`, `zstd`, or `uncompressed`.|`lz4`|
+
+
+#### String Dictionary Encoding
+
+##### UTF8
+By default, `STRING` typed column store the values as uncompressed UTF8 encoded bytes.
+
+|Field|Description|Default|
+|-----|-----------|-------|
+|type|Must be `"utf8"` .|n/a|
+
+##### Front Coding
+`STRING` columns can be stored using an incremental encoding strategy called front coding.
+In the Druid implementation of front coding, the column values are first divided into buckets,
+and the first value in each bucket is stored as is. The remaining values in the bucket are stored
+using a number representing a prefix length and the remaining suffix bytes.
+This technique allows the prefix portion of the values in each bucket from being duplicated.
+The values are still UTF-8 encoded, but front coding can often result in much smaller segments at very little
+performance cost. Segments created with this encoding are not compatible with Druid versions older than 25.0.0.
+
+|Field|Description|Default|
+|-----|-----------|-------|
+|type|Must be `"frontCoded"` .|n/a|
+|bucketSize|The number of values to place in a bucket to perform delta encoding, must be a power of 2, maximum is 128. Larger buckets allow columns with a high degree of overlap to produce smaller segments at a slight cost to read and search performance which scales with bucket size.|4|
+
 
 Beyond these properties, each ingestion method has its own specific tuning properties. See the documentation for each
 [ingestion method](./index.md#ingestion-methods) for details.
diff --git a/processing/src/main/java/org/apache/druid/query/dimension/ListFilteredDimensionSpec.java b/processing/src/main/java/org/apache/druid/query/dimension/ListFilteredDimensionSpec.java
index 145d3dffcd..2adad9b662 100644
--- a/processing/src/main/java/org/apache/druid/query/dimension/ListFilteredDimensionSpec.java
+++ b/processing/src/main/java/org/apache/druid/query/dimension/ListFilteredDimensionSpec.java
@@ -28,7 +28,6 @@ import org.apache.druid.query.filter.DimFilterUtils;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.IdLookup;
 import org.apache.druid.segment.IdMapping;
-import org.apache.druid.segment.data.Indexed;
 
 import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
@@ -90,7 +89,7 @@ public class ListFilteredDimensionSpec extends BaseFilteredDimensionSpec
       Set<String> values,
       int cardinality,
       @Nullable IdLookup idLookup,
-      Indexed.IndexedGetter<String> fn
+      IndexedGetter<String> fn
   )
   {
     final IdMapping.Builder builder = IdMapping.Builder.ofCardinality(values.size());
@@ -114,7 +113,7 @@ public class ListFilteredDimensionSpec extends BaseFilteredDimensionSpec
   public static IdMapping buildDenyListIdMapping(
       Set<String> values,
       int cardinality,
-      Indexed.IndexedGetter<String> fn
+      IndexedGetter<String> fn
   )
   {
     final IdMapping.Builder builder = IdMapping.Builder.ofCardinality(cardinality);
@@ -217,4 +216,11 @@ public class ListFilteredDimensionSpec extends BaseFilteredDimensionSpec
            ", isWhitelist=" + isWhitelist +
            '}';
   }
+
+  @FunctionalInterface
+  public interface IndexedGetter<T>
+  {
+    @Nullable
+    T get(int id);
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
index e74050f741..bcd2ce018b 100644
--- a/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
+++ b/processing/src/main/java/org/apache/druid/segment/DictionaryEncodedColumnMerger.java
@@ -42,6 +42,7 @@ import org.apache.druid.segment.data.ColumnarIntsSerializer;
 import org.apache.druid.segment.data.ColumnarMultiIntsSerializer;
 import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
 import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.data.DictionaryWriter;
 import org.apache.druid.segment.data.GenericIndexedWriter;
 import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.data.IndexedInts;
@@ -91,8 +92,10 @@ public abstract class DictionaryEncodedColumnMerger<T extends Comparable<T>> imp
   protected DictionaryMergingIterator<T> dictionaryMergeIterator;
   @Nullable
   protected ColumnarIntsSerializer encodedValueSerializer;
+
   @Nullable
-  protected GenericIndexedWriter<T> dictionaryWriter;
+  protected DictionaryWriter<T> dictionaryWriter;
+
   @Nullable
   protected T firstDictionaryValue;
 
@@ -145,7 +148,7 @@ public abstract class DictionaryEncodedColumnMerger<T extends Comparable<T>> imp
       Indexed<T> dimValues = closer.register(adapters.get(i).getDimValueLookup(dimensionName));
       if (dimValues != null && !allNull(dimValues)) {
         dimHasValues = true;
-        hasNull |= dimValues.indexOf(null) >= 0;
+        hasNull = hasNull || dimValues.indexOf(null) >= 0;
         dimValueLookups[i] = dimValueLookup = dimValues;
         numMergeIndex++;
       } else {
@@ -169,7 +172,7 @@ public abstract class DictionaryEncodedColumnMerger<T extends Comparable<T>> imp
     }
 
     String dictFilename = StringUtils.format("%s.dim_values", dimensionName);
-    dictionaryWriter = new GenericIndexedWriter<>(segmentWriteOutMedium, dictFilename, getObjectStrategy());
+    dictionaryWriter = makeDictionaryWriter(dictFilename);
     firstDictionaryValue = null;
     dictionarySize = 0;
     dictionaryWriter.open();
@@ -384,7 +387,10 @@ public abstract class DictionaryEncodedColumnMerger<T extends Comparable<T>> imp
     }
   }
 
-
+  protected DictionaryWriter<T> makeDictionaryWriter(String fileName)
+  {
+    return new GenericIndexedWriter<>(segmentWriteOutMedium, fileName, getObjectStrategy());
+  }
 
   @Nullable
   protected ExtendedIndexesMerger getExtendedIndexesMerger()
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexSpec.java b/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
index 852ea94233..4358444126 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexSpec.java
@@ -25,8 +25,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
+import org.apache.druid.segment.column.StringEncodingStrategy;
 import org.apache.druid.segment.data.BitmapSerde;
 import org.apache.druid.segment.data.BitmapSerdeFactory;
 import org.apache.druid.segment.data.CompressionFactory;
@@ -34,10 +33,8 @@ import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.loading.SegmentizerFactory;
 
 import javax.annotation.Nullable;
-import java.util.Arrays;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 
 /**
  * IndexSpec defines segment storage format options to be used at indexing time,
@@ -47,24 +44,9 @@ import java.util.Set;
  */
 public class IndexSpec
 {
-  public static final CompressionStrategy DEFAULT_METRIC_COMPRESSION = CompressionStrategy.DEFAULT_COMPRESSION_STRATEGY;
-  public static final CompressionStrategy DEFAULT_DIMENSION_COMPRESSION = CompressionStrategy.DEFAULT_COMPRESSION_STRATEGY;
-  public static final CompressionFactory.LongEncodingStrategy DEFAULT_LONG_ENCODING = CompressionFactory.DEFAULT_LONG_ENCODING_STRATEGY;
-
-  private static final Set<CompressionStrategy> METRIC_COMPRESSION = Sets.newHashSet(
-      Arrays.asList(CompressionStrategy.values())
-  );
-
-  private static final Set<CompressionStrategy> DIMENSION_COMPRESSION = Sets.newHashSet(
-      Arrays.asList(CompressionStrategy.noNoneValues())
-  );
-
-  private static final Set<CompressionFactory.LongEncodingStrategy> LONG_ENCODING_NAMES = Sets.newHashSet(
-      Arrays.asList(CompressionFactory.LongEncodingStrategy.values())
-  );
-
   private final BitmapSerdeFactory bitmapSerdeFactory;
   private final CompressionStrategy dimensionCompression;
+  private final StringEncodingStrategy stringDictionaryEncoding;
   private final CompressionStrategy metricCompression;
   private final CompressionFactory.LongEncodingStrategy longEncoding;
 
@@ -79,7 +61,7 @@ public class IndexSpec
    */
   public IndexSpec()
   {
-    this(null, null, null, null, null, null);
+    this(null, null, null, null, null, null, null);
   }
 
   @VisibleForTesting
@@ -90,7 +72,7 @@ public class IndexSpec
       @Nullable CompressionFactory.LongEncodingStrategy longEncoding
   )
   {
-    this(bitmapSerdeFactory, dimensionCompression, metricCompression, longEncoding, null, null);
+    this(bitmapSerdeFactory, dimensionCompression, null, metricCompression, longEncoding, null, null);
   }
 
   @VisibleForTesting
@@ -102,7 +84,7 @@ public class IndexSpec
       @Nullable SegmentizerFactory segmentLoader
   )
   {
-    this(bitmapSerdeFactory, dimensionCompression, metricCompression, longEncoding, null, segmentLoader);
+    this(bitmapSerdeFactory, dimensionCompression, null, metricCompression, longEncoding, null, segmentLoader);
   }
 
   /**
@@ -116,37 +98,44 @@ public class IndexSpec
    * @param dimensionCompression compression format for dimension columns, null to use the default.
    *                             Defaults to {@link CompressionStrategy#DEFAULT_COMPRESSION_STRATEGY}
    *
+   * @param stringDictionaryEncoding encoding strategy for string dictionaries of dictionary encoded string columns
+   *
    * @param metricCompression compression format for primitive type metric columns, null to use the default.
    *                          Defaults to {@link CompressionStrategy#DEFAULT_COMPRESSION_STRATEGY}
    *
    * @param longEncoding encoding strategy for metric and dimension columns with type long, null to use the default.
    *                     Defaults to {@link CompressionFactory#DEFAULT_LONG_ENCODING_STRATEGY}
+   *
+   * @param segmentLoader specify a {@link SegmentizerFactory} which will be written to 'factory.json' and used to load
+   *                      the written segment
    */
   @JsonCreator
   public IndexSpec(
       @JsonProperty("bitmap") @Nullable BitmapSerdeFactory bitmapSerdeFactory,
       @JsonProperty("dimensionCompression") @Nullable CompressionStrategy dimensionCompression,
+      @JsonProperty("stringDictionaryEncoding") @Nullable StringEncodingStrategy stringDictionaryEncoding,
       @JsonProperty("metricCompression") @Nullable CompressionStrategy metricCompression,
       @JsonProperty("longEncoding") @Nullable CompressionFactory.LongEncodingStrategy longEncoding,
       @JsonProperty("jsonCompression") @Nullable CompressionStrategy jsonCompression,
       @JsonProperty("segmentLoader") @Nullable SegmentizerFactory segmentLoader
   )
   {
-    Preconditions.checkArgument(dimensionCompression == null || DIMENSION_COMPRESSION.contains(dimensionCompression),
-                                "Unknown compression type[%s]", dimensionCompression);
-
-    Preconditions.checkArgument(metricCompression == null || METRIC_COMPRESSION.contains(metricCompression),
-                                "Unknown compression type[%s]", metricCompression);
-
-    Preconditions.checkArgument(longEncoding == null || LONG_ENCODING_NAMES.contains(longEncoding),
-                                "Unknown long encoding type[%s]", longEncoding);
-
     this.bitmapSerdeFactory = bitmapSerdeFactory != null
                               ? bitmapSerdeFactory
                               : new BitmapSerde.DefaultBitmapSerdeFactory();
-    this.dimensionCompression = dimensionCompression == null ? DEFAULT_DIMENSION_COMPRESSION : dimensionCompression;
-    this.metricCompression = metricCompression == null ? DEFAULT_METRIC_COMPRESSION : metricCompression;
-    this.longEncoding = longEncoding == null ? DEFAULT_LONG_ENCODING : longEncoding;
+    this.dimensionCompression = dimensionCompression == null
+                                ? CompressionStrategy.DEFAULT_COMPRESSION_STRATEGY
+                                : dimensionCompression;
+    this.stringDictionaryEncoding = stringDictionaryEncoding == null
+                                    ? StringEncodingStrategy.DEFAULT
+                                    : stringDictionaryEncoding;
+
+    this.metricCompression = metricCompression == null
+                             ? CompressionStrategy.DEFAULT_COMPRESSION_STRATEGY
+                             : metricCompression;
+    this.longEncoding = longEncoding == null
+                        ? CompressionFactory.DEFAULT_LONG_ENCODING_STRATEGY
+                        : longEncoding;
     this.jsonCompression = jsonCompression;
     this.segmentLoader = segmentLoader;
   }
@@ -163,6 +152,12 @@ public class IndexSpec
     return dimensionCompression;
   }
 
+  @JsonProperty
+  public StringEncodingStrategy getStringDictionaryEncoding()
+  {
+    return stringDictionaryEncoding;
+  }
+
   @JsonProperty
   public CompressionStrategy getMetricCompression()
   {
@@ -210,6 +205,7 @@ public class IndexSpec
     IndexSpec indexSpec = (IndexSpec) o;
     return Objects.equals(bitmapSerdeFactory, indexSpec.bitmapSerdeFactory) &&
            dimensionCompression == indexSpec.dimensionCompression &&
+           Objects.equals(stringDictionaryEncoding, indexSpec.stringDictionaryEncoding) &&
            metricCompression == indexSpec.metricCompression &&
            longEncoding == indexSpec.longEncoding &&
            Objects.equals(jsonCompression, indexSpec.jsonCompression) &&
@@ -219,7 +215,7 @@ public class IndexSpec
   @Override
   public int hashCode()
   {
-    return Objects.hash(bitmapSerdeFactory, dimensionCompression, metricCompression, longEncoding, jsonCompression, segmentLoader);
+    return Objects.hash(bitmapSerdeFactory, dimensionCompression, stringDictionaryEncoding, metricCompression, longEncoding, jsonCompression, segmentLoader);
   }
 
   @Override
@@ -228,6 +224,7 @@ public class IndexSpec
     return "IndexSpec{" +
            "bitmapSerdeFactory=" + bitmapSerdeFactory +
            ", dimensionCompression=" + dimensionCompression +
+           ", stringDictionaryEncoding=" + stringDictionaryEncoding +
            ", metricCompression=" + metricCompression +
            ", longEncoding=" + longEncoding +
            ", jsonCompression=" + jsonCompression +
diff --git a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java
index d0452c57a3..1b0d0f54d9 100644
--- a/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java
+++ b/processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java
@@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.column.BaseColumn;
 import org.apache.druid.segment.column.ColumnDescriptor;
 import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.StringEncodingStrategies;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.incremental.IncrementalIndex;
@@ -200,7 +201,7 @@ public class NestedDataColumnMerger implements DimensionMergerV9
   )
   {
     @SuppressWarnings("unchecked")
-    CompressedNestedDataComplexColumn column = (CompressedNestedDataComplexColumn) col;
+    CompressedNestedDataComplexColumn<?> column = (CompressedNestedDataComplexColumn) col;
     closer.register(column);
     for (int i = 0; i < column.getFields().size(); i++) {
       String fieldPath = column.getFields().get(i);
@@ -213,7 +214,7 @@ public class NestedDataColumnMerger implements DimensionMergerV9
       });
     }
     return new GlobalDictionarySortedCollector(
-        column.getStringDictionary(),
+        new StringEncodingStrategies.Utf8ToStringIndexed(column.getStringDictionary()),
         column.getLongDictionary(),
         column.getDoubleDictionary()
     );
diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
index e4e0fbdf9b..109423af58 100644
--- a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
@@ -32,10 +32,12 @@ import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnDescriptor;
+import org.apache.druid.segment.column.StringEncodingStrategies;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.BitmapSerdeFactory;
 import org.apache.druid.segment.data.ByteBufferWriter;
 import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.data.DictionaryWriter;
 import org.apache.druid.segment.data.GenericIndexed;
 import org.apache.druid.segment.data.ImmutableRTreeObjectStrategy;
 import org.apache.druid.segment.data.Indexed;
@@ -97,6 +99,16 @@ public class StringDimensionMergerV9 extends DictionaryEncodedColumnMerger<Strin
     return NullHandling.emptyToNullIfNeeded(value);
   }
 
+  @Override
+  protected DictionaryWriter<String> makeDictionaryWriter(String fileName)
+  {
+    return StringEncodingStrategies.getStringDictionaryWriter(
+        indexSpec.getStringDictionaryEncoding(),
+        segmentWriteOutMedium,
+        fileName
+    );
+  }
+
   @Nullable
   @Override
   protected ExtendedIndexesMerger getExtendedIndexesMerger()
diff --git a/processing/src/main/java/org/apache/druid/segment/column/IndexedStringDictionaryEncodedStringValueIndex.java b/processing/src/main/java/org/apache/druid/segment/column/IndexedStringDictionaryEncodedStringValueIndex.java
new file mode 100644
index 0000000000..49ef97ea6c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/column/IndexedStringDictionaryEncodedStringValueIndex.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.segment.data.Indexed;
+
+import javax.annotation.Nullable;
+
+public final class IndexedStringDictionaryEncodedStringValueIndex<TDictionary extends Indexed<String>>
+    implements DictionaryEncodedStringValueIndex
+{
+  private final BitmapFactory bitmapFactory;
+  private final TDictionary dictionary;
+  private final Indexed<ImmutableBitmap> bitmaps;
+
+  public IndexedStringDictionaryEncodedStringValueIndex(
+      BitmapFactory bitmapFactory,
+      TDictionary dictionary,
+      Indexed<ImmutableBitmap> bitmaps
+  )
+  {
+    this.bitmapFactory = bitmapFactory;
+    this.dictionary = dictionary;
+    this.bitmaps = bitmaps;
+  }
+
+  @Override
+  public int getCardinality()
+  {
+    return dictionary.size();
+  }
+
+  @Nullable
+  @Override
+  public String getValue(int index)
+  {
+    return dictionary.get(index);
+  }
+
+  @Override
+  public ImmutableBitmap getBitmap(int idx)
+  {
+    if (idx < 0) {
+      return bitmapFactory.makeEmptyImmutableBitmap();
+    }
+
+    final ImmutableBitmap bitmap = bitmaps.get(idx);
+    return bitmap == null ? bitmapFactory.makeEmptyImmutableBitmap() : bitmap;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/column/IndexedStringDruidPredicateIndex.java b/processing/src/main/java/org/apache/druid/segment/column/IndexedStringDruidPredicateIndex.java
new file mode 100644
index 0000000000..c6c8941e52
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/column/IndexedStringDruidPredicateIndex.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.query.filter.DruidPredicateFactory;
+import org.apache.druid.segment.data.Indexed;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public final class IndexedStringDruidPredicateIndex<TDictionary extends Indexed<String>> implements DruidPredicateIndex
+{
+  private final BitmapFactory bitmapFactory;
+  private final TDictionary dictionary;
+  private final Indexed<ImmutableBitmap> bitmaps;
+
+  public IndexedStringDruidPredicateIndex(
+      BitmapFactory bitmapFactory,
+      TDictionary dictionary,
+      Indexed<ImmutableBitmap> bitmaps
+  )
+  {
+    this.bitmapFactory = bitmapFactory;
+    this.dictionary = dictionary;
+    this.bitmaps = bitmaps;
+  }
+
+  @Override
+  public BitmapColumnIndex forPredicate(DruidPredicateFactory matcherFactory)
+  {
+    return new SimpleImmutableBitmapIterableIndex()
+    {
+      @Override
+      public Iterable<ImmutableBitmap> getBitmapIterable()
+      {
+        return () -> new Iterator<ImmutableBitmap>()
+        {
+          final Predicate<String> stringPredicate = matcherFactory.makeStringPredicate();
+          final Iterator<String> iterator = dictionary.iterator();
+          @Nullable
+          String next = null;
+          boolean nextSet = false;
+
+          @Override
+          public boolean hasNext()
+          {
+            if (!nextSet) {
+              findNext();
+            }
+            return nextSet;
+          }
+
+          @Override
+          public ImmutableBitmap next()
+          {
+            if (!nextSet) {
+              findNext();
+              if (!nextSet) {
+                throw new NoSuchElementException();
+              }
+            }
+            nextSet = false;
+            final int idx = dictionary.indexOf(next);
+            if (idx < 0) {
+              return bitmapFactory.makeEmptyImmutableBitmap();
+            }
+
+            final ImmutableBitmap bitmap = bitmaps.get(idx);
+            return bitmap == null ? bitmapFactory.makeEmptyImmutableBitmap() : bitmap;
+          }
+
+          private void findNext()
+          {
+            while (!nextSet && iterator.hasNext()) {
+              String nextValue = iterator.next();
+              nextSet = stringPredicate.apply(nextValue);
+              if (nextSet) {
+                next = nextValue;
+              }
+            }
+          }
+        };
+      }
+    };
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/column/IndexedUtf8LexicographicalRangeIndex.java b/processing/src/main/java/org/apache/druid/segment/column/IndexedUtf8LexicographicalRangeIndex.java
new file mode 100644
index 0000000000..7b1086ba8a
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/column/IndexedUtf8LexicographicalRangeIndex.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import it.unimi.dsi.fastutil.ints.IntIntImmutablePair;
+import it.unimi.dsi.fastutil.ints.IntIntPair;
+import it.unimi.dsi.fastutil.ints.IntIterator;
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.IntListUtils;
+import org.apache.druid.segment.data.Indexed;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public final class IndexedUtf8LexicographicalRangeIndex<TDictionary extends Indexed<ByteBuffer>>
+    implements LexicographicalRangeIndex
+{
+  private final BitmapFactory bitmapFactory;
+  private final TDictionary dictionary;
+  private final Indexed<ImmutableBitmap> bitmaps;
+  private final boolean hasNull;
+
+  public IndexedUtf8LexicographicalRangeIndex(
+      BitmapFactory bitmapFactory,
+      TDictionary dictionary,
+      Indexed<ImmutableBitmap> bitmaps,
+      boolean hasNull
+  )
+  {
+    Preconditions.checkArgument(dictionary.isSorted(), "Dictionary must be sorted");
+    this.bitmapFactory = bitmapFactory;
+    this.dictionary = dictionary;
+    this.bitmaps = bitmaps;
+    this.hasNull = hasNull;
+  }
+
+  @Override
+  public BitmapColumnIndex forRange(
+      @Nullable String startValue,
+      boolean startStrict,
+      @Nullable String endValue,
+      boolean endStrict
+  )
+  {
+    return new SimpleImmutableBitmapIterableIndex()
+    {
+      @Override
+      public Iterable<ImmutableBitmap> getBitmapIterable()
+      {
+        final IntIntPair range = getRange(startValue, startStrict, endValue, endStrict);
+        final int start = range.leftInt(), end = range.rightInt();
+        return () -> new Iterator<ImmutableBitmap>()
+        {
+          final IntIterator rangeIterator = IntListUtils.fromTo(start, end).iterator();
+
+          @Override
+          public boolean hasNext()
+          {
+            return rangeIterator.hasNext();
+          }
+
+          @Override
+          public ImmutableBitmap next()
+          {
+            return getBitmap(rangeIterator.nextInt());
+          }
+        };
+      }
+    };
+  }
+
+  @Override
+  public BitmapColumnIndex forRange(
+      @Nullable String startValue,
+      boolean startStrict,
+      @Nullable String endValue,
+      boolean endStrict,
+      Predicate<String> matcher
+  )
+  {
+    return new SimpleImmutableBitmapIterableIndex()
+    {
+      @Override
+      public Iterable<ImmutableBitmap> getBitmapIterable()
+      {
+        final IntIntPair range = getRange(startValue, startStrict, endValue, endStrict);
+        final int start = range.leftInt(), end = range.rightInt();
+        return () -> new Iterator<ImmutableBitmap>()
+        {
+          int currIndex = start;
+          int found;
+
+          {
+            found = findNext();
+          }
+
+          private int findNext()
+          {
+            while (currIndex < end && !applyMatcher(dictionary.get(currIndex))) {
+              currIndex++;
+            }
+
+            if (currIndex < end) {
+              return currIndex++;
+            } else {
+              return -1;
+            }
+          }
+
+          @Override
+          public boolean hasNext()
+          {
+            return found != -1;
+          }
+
+          @Override
+          public ImmutableBitmap next()
+          {
+            int cur = found;
+
+            if (cur == -1) {
+              throw new NoSuchElementException();
+            }
+
+            found = findNext();
+            return getBitmap(cur);
+          }
+        };
+      }
+
+      private boolean applyMatcher(@Nullable final ByteBuffer valueUtf8)
+      {
+        if (valueUtf8 == null) {
+          return matcher.apply(null);
+        } else {
+          // Duplicate buffer, because StringUtils.fromUtf8 advances the position, and we do not want to do that.
+          return matcher.apply(StringUtils.fromUtf8(valueUtf8.duplicate()));
+        }
+      }
+    };
+  }
+
+  private IntIntPair getRange(
+      @Nullable String startValue,
+      boolean startStrict,
+      @Nullable String endValue,
+      boolean endStrict
+  )
+  {
+    final int firstValue = hasNull ? 1 : 0;
+    int startIndex, endIndex;
+    if (startValue == null) {
+      startIndex = firstValue;
+    } else {
+      final String startValueToUse = NullHandling.emptyToNullIfNeeded(startValue);
+      final int found = dictionary.indexOf(StringUtils.toUtf8ByteBuffer(startValueToUse));
+      if (found >= firstValue) {
+        startIndex = startStrict ? found + 1 : found;
+      } else {
+        startIndex = -(found + 1);
+      }
+    }
+
+    if (endValue == null) {
+      endIndex = dictionary.size();
+    } else {
+      final String endValueToUse = NullHandling.emptyToNullIfNeeded(endValue);
+      final int found = dictionary.indexOf(StringUtils.toUtf8ByteBuffer(endValueToUse));
+      if (found >= firstValue) {
+        endIndex = endStrict ? found : found + 1;
+      } else {
+        endIndex = -(found + 1);
+      }
+    }
+
+    endIndex = Math.max(startIndex, endIndex);
+    return new IntIntImmutablePair(startIndex, endIndex);
+  }
+
+  private ImmutableBitmap getBitmap(int idx)
+  {
+    if (idx < 0) {
+      return bitmapFactory.makeEmptyImmutableBitmap();
+    }
+
+    final ImmutableBitmap bitmap = bitmaps.get(idx);
+    return bitmap == null ? bitmapFactory.makeEmptyImmutableBitmap() : bitmap;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/column/IndexedUtf8ValueSetIndex.java b/processing/src/main/java/org/apache/druid/segment/column/IndexedUtf8ValueSetIndex.java
new file mode 100644
index 0000000000..5680a1400f
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/column/IndexedUtf8ValueSetIndex.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.java.util.common.ByteBufferUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.BitmapResultFactory;
+import org.apache.druid.segment.data.Indexed;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.SortedSet;
+
+public final class IndexedUtf8ValueSetIndex<TDictionary extends Indexed<ByteBuffer>>
+    implements StringValueSetIndex, Utf8ValueSetIndex
+{
+  // This determines the cut-off point to switch the merging algorithm from doing binary-search per element in the value
+  // set to doing a sorted merge algorithm between value set and dictionary. The ratio here represents the ratio b/w
+  // the number of elements in value set and the number of elements in the dictionary. The number has been derived
+  // using benchmark in https://github.com/apache/druid/pull/13133. If the ratio is higher than the threshold, we use
+  // sorted merge instead of binary-search based algorithm.
+  private static final double SORTED_MERGE_RATIO_THRESHOLD = 0.12D;
+  private static final int SIZE_WORTH_CHECKING_MIN = 8;
+  private static final Comparator<ByteBuffer> COMPARATOR = ByteBufferUtils.unsignedComparator();
+
+  private final BitmapFactory bitmapFactory;
+  private final TDictionary dictionary;
+  private final Indexed<ImmutableBitmap> bitmaps;
+
+  public IndexedUtf8ValueSetIndex(
+      BitmapFactory bitmapFactory,
+      TDictionary dictionary,
+      Indexed<ImmutableBitmap> bitmaps
+  )
+  {
+    Preconditions.checkArgument(dictionary.isSorted(), "Dictionary must be sorted");
+    this.bitmapFactory = bitmapFactory;
+    this.dictionary = dictionary;
+    this.bitmaps = bitmaps;
+  }
+
+  @Override
+  public BitmapColumnIndex forValue(@Nullable String value)
+  {
+    return new SimpleBitmapColumnIndex()
+    {
+      @Override
+      public double estimateSelectivity(int totalRows)
+      {
+        return Math.min(1, (double) getBitmapForValue().size() / totalRows);
+      }
+
+      @Override
+      public <T> T computeBitmapResult(BitmapResultFactory<T> bitmapResultFactory)
+      {
+
+        return bitmapResultFactory.wrapDimensionValue(getBitmapForValue());
+      }
+
+      private ImmutableBitmap getBitmapForValue()
+      {
+        final ByteBuffer valueUtf8 = value == null ? null : ByteBuffer.wrap(StringUtils.toUtf8(value));
+        final int idx = dictionary.indexOf(valueUtf8);
+        return getBitmap(idx);
+      }
+    };
+  }
+
+  @Override
+  public BitmapColumnIndex forSortedValues(SortedSet<String> values)
+  {
+    return getBitmapColumnIndexForSortedIterableUtf8(
+        Iterables.transform(
+            values,
+            input -> input != null ? ByteBuffer.wrap(StringUtils.toUtf8(input)) : null
+        ),
+        values.size()
+    );
+  }
+
+  @Override
+  public BitmapColumnIndex forSortedValuesUtf8(SortedSet<ByteBuffer> valuesUtf8)
+  {
+    final SortedSet<ByteBuffer> tailSet;
+
+    if (valuesUtf8.size() >= SIZE_WORTH_CHECKING_MIN) {
+      final ByteBuffer minValueInColumn = dictionary.get(0);
+      tailSet = valuesUtf8.tailSet(minValueInColumn);
+    } else {
+      tailSet = valuesUtf8;
+    }
+
+    return getBitmapColumnIndexForSortedIterableUtf8(tailSet, tailSet.size());
+  }
+
+  private ImmutableBitmap getBitmap(int idx)
+  {
+    if (idx < 0) {
+      return bitmapFactory.makeEmptyImmutableBitmap();
+    }
+
+    final ImmutableBitmap bitmap = bitmaps.get(idx);
+    return bitmap == null ? bitmapFactory.makeEmptyImmutableBitmap() : bitmap;
+  }
+
+  /**
+   * Helper used by {@link #forSortedValues} and {@link #forSortedValuesUtf8}.
+   */
+  private BitmapColumnIndex getBitmapColumnIndexForSortedIterableUtf8(Iterable<ByteBuffer> valuesUtf8, int size)
+  {
+    // for large number of in-filter values in comparison to the dictionary size, use the sorted merge algorithm.
+    if (size > SORTED_MERGE_RATIO_THRESHOLD * dictionary.size()) {
+      return new SimpleImmutableBitmapIterableIndex()
+      {
+        @Override
+        public Iterable<ImmutableBitmap> getBitmapIterable()
+        {
+          return () -> new Iterator<ImmutableBitmap>()
+          {
+            final PeekingIterator<ByteBuffer> valuesIterator = Iterators.peekingIterator(valuesUtf8.iterator());
+            final PeekingIterator<ByteBuffer> dictionaryIterator = Iterators.peekingIterator(dictionary.iterator());
+            int next = -1;
+            int idx = 0;
+
+            @Override
+            public boolean hasNext()
+            {
+              if (next < 0) {
+                findNext();
+              }
+              return next >= 0;
+            }
+
+            @Override
+            public ImmutableBitmap next()
+            {
+              if (next < 0) {
+                findNext();
+                if (next < 0) {
+                  throw new NoSuchElementException();
+                }
+              }
+              final int swap = next;
+              next = -1;
+              return getBitmap(swap);
+            }
+
+            private void findNext()
+            {
+              while (next < 0 && valuesIterator.hasNext() && dictionaryIterator.hasNext()) {
+                final ByteBuffer nextValue = valuesIterator.peek();
+                final ByteBuffer nextDictionaryKey = dictionaryIterator.peek();
+                final int comparison = COMPARATOR.compare(nextValue, nextDictionaryKey);
+                if (comparison == 0) {
+                  next = idx;
+                  valuesIterator.next();
+                  break;
+                } else if (comparison < 0) {
+                  valuesIterator.next();
+                } else {
+                  dictionaryIterator.next();
+                  idx++;
+                }
+              }
+            }
+          };
+        }
+      };
+    }
+
+    // if the size of in-filter values is less than the threshold percentage of dictionary size, then use binary search
+    // based lookup per value. The algorithm works well for smaller number of values.
+    return new SimpleImmutableBitmapIterableIndex()
+    {
+      @Override
+      public Iterable<ImmutableBitmap> getBitmapIterable()
+      {
+        return () -> new Iterator<ImmutableBitmap>()
+        {
+          final int dictionarySize = dictionary.size();
+          final Iterator<ByteBuffer> iterator = valuesUtf8.iterator();
+          int next = -1;
+
+          @Override
+          public boolean hasNext()
+          {
+            if (next < 0) {
+              findNext();
+            }
+            return next >= 0;
+          }
+
+          @Override
+          public ImmutableBitmap next()
+          {
+            if (next < 0) {
+              findNext();
+              if (next < 0) {
+                throw new NoSuchElementException();
+              }
+            }
+            final int swap = next;
+            next = -1;
+            return getBitmap(swap);
+          }
+
+          private void findNext()
+          {
+            while (next < 0 && iterator.hasNext()) {
+              ByteBuffer nextValue = iterator.next();
+              next = dictionary.indexOf(nextValue);
+
+              if (next == -dictionarySize - 1) {
+                // nextValue is past the end of the dictionary so we can break early
+                // Note: we can rely on indexOf returning (-(insertion point) - 1), because of the earlier check
+                // for Indexed.isSorted(), which guarantees this behavior
+                break;
+              }
+            }
+          }
+        };
+      }
+    };
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java b/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java
new file mode 100644
index 0000000000..2427658b66
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategies.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.data.DictionaryWriter;
+import org.apache.druid.segment.data.EncodedStringDictionaryWriter;
+import org.apache.druid.segment.data.FrontCodedIndexedWriter;
+import org.apache.druid.segment.data.GenericIndexed;
+import org.apache.druid.segment.data.GenericIndexedWriter;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+public class StringEncodingStrategies
+{
+  public static DictionaryWriter<String> getStringDictionaryWriter(
+      StringEncodingStrategy encodingStrategy,
+      SegmentWriteOutMedium writeoutMedium,
+      String fileName
+  )
+  {
+    // write plain utf8 in the legacy format, where generic indexed was written directly
+    if (StringEncodingStrategy.UTF8.equals(encodingStrategy.getType())) {
+      return new GenericIndexedWriter<>(writeoutMedium, fileName, GenericIndexed.STRING_STRATEGY);
+    } else {
+      // otherwise, we wrap in an EncodedStringDictionaryWriter so that we write a small header that includes
+      // a version byte that should hopefully never conflict with a GenericIndexed version, along with a byte
+      // from StringEncodingStrategy.getId to indicate which encoding strategy is used for the dictionary before
+      // writing the dictionary itself
+      DictionaryWriter<byte[]> writer;
+      if (StringEncodingStrategy.FRONT_CODED.equals(encodingStrategy.getType())) {
+        writer = new FrontCodedIndexedWriter(
+            writeoutMedium,
+            IndexIO.BYTE_ORDER,
+            ((StringEncodingStrategy.FrontCoded) encodingStrategy).getBucketSize()
+        );
+      } else {
+        throw new ISE("Unknown encoding strategy: %s", encodingStrategy.getType());
+      }
+      return new EncodedStringDictionaryWriter(writer, encodingStrategy);
+    }
+  }
+
+  /**
+   * Adapter to convert {@link Indexed<ByteBuffer>} with utf8 encoded bytes into {@link Indexed<String>} to be friendly
+   * to consumers.
+   */
+  public static final class Utf8ToStringIndexed implements Indexed<String>
+  {
+    private final Indexed<ByteBuffer> delegate;
+
+    public Utf8ToStringIndexed(Indexed<ByteBuffer> delegate)
+    {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public int size()
+    {
+      return delegate.size();
+    }
+
+    @Nullable
+    @Override
+    public String get(int index)
+    {
+      return StringUtils.fromUtf8Nullable(delegate.get(index));
+    }
+
+    @Override
+    public int indexOf(@Nullable String value)
+    {
+      return delegate.indexOf(StringUtils.toUtf8ByteBuffer(value));
+    }
+
+    @Override
+    public Iterator<String> iterator()
+    {
+      final Iterator<ByteBuffer> delegateIterator = delegate.iterator();
+      return new Iterator<String>()
+      {
+        @Override
+        public boolean hasNext()
+        {
+          return delegateIterator.hasNext();
+        }
+
+        @Override
+        public String next()
+        {
+          return StringUtils.fromUtf8Nullable(delegateIterator.next());
+        }
+      };
+    }
+
+    @Override
+    public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+    {
+      inspector.visit("delegateIndex", delegate);
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategy.java b/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategy.java
new file mode 100644
index 0000000000..fedabb206a
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/column/StringEncodingStrategy.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.java.util.common.ISE;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes({
+    @JsonSubTypes.Type(value = StringEncodingStrategy.Utf8.class, name = StringEncodingStrategy.UTF8),
+    @JsonSubTypes.Type(value = StringEncodingStrategy.FrontCoded.class, name = StringEncodingStrategy.FRONT_CODED)
+})
+public interface StringEncodingStrategy
+{
+  Utf8 DEFAULT = new Utf8();
+  String UTF8 = "utf8";
+  String FRONT_CODED = "frontCoded";
+
+  byte UTF8_ID = 0x00;
+  byte FRONT_CODED_ID = 0x01;
+
+  String getType();
+
+  byte getId();
+
+  class Utf8 implements StringEncodingStrategy
+  {
+    @Override
+    public String getType()
+    {
+      return UTF8;
+    }
+
+    @Override
+    public byte getId()
+    {
+      return UTF8_ID;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      return o != null && getClass() == o.getClass();
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hashCode(UTF8);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "Utf8{}";
+    }
+  }
+
+  class FrontCoded implements StringEncodingStrategy
+  {
+    public static final int DEFAULT_BUCKET_SIZE = 4;
+
+    @JsonProperty
+    public int getBucketSize()
+    {
+      return bucketSize;
+    }
+
+    @JsonProperty
+    private final int bucketSize;
+
+    @JsonCreator
+    public FrontCoded(
+        @JsonProperty("bucketSize") @Nullable Integer bucketSize
+    )
+    {
+      this.bucketSize = bucketSize == null ? DEFAULT_BUCKET_SIZE : bucketSize;
+      if (Integer.bitCount(this.bucketSize) != 1) {
+        throw new ISE("bucketSize must be a power of two but was[%,d]", bucketSize);
+      }
+    }
+
+    @Override
+    public String getType()
+    {
+      return FRONT_CODED;
+    }
+
+    @Override
+    public byte getId()
+    {
+      return FRONT_CODED_ID;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      FrontCoded that = (FrontCoded) o;
+      return bucketSize == that.bucketSize;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(bucketSize);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "FrontCoded{" +
+             "bucketSize=" + bucketSize +
+             '}';
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/column/StringFrontCodedDictionaryEncodedColumn.java b/processing/src/main/java/org/apache/druid/segment/column/StringFrontCodedDictionaryEncodedColumn.java
new file mode 100644
index 0000000000..ba06f9d481
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/column/StringFrontCodedDictionaryEncodedColumn.java
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.column;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.extraction.ExtractionFn;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.AbstractDimensionSelector;
+import org.apache.druid.segment.DimensionSelectorUtils;
+import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.data.ColumnarInts;
+import org.apache.druid.segment.data.ColumnarMultiInts;
+import org.apache.druid.segment.data.FrontCodedIndexed;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.data.ReadableOffset;
+import org.apache.druid.segment.data.SingleIndexedInt;
+import org.apache.druid.segment.filter.BooleanValueMatcher;
+import org.apache.druid.segment.historical.HistoricalDimensionSelector;
+import org.apache.druid.segment.historical.SingleValueHistoricalDimensionSelector;
+import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.ReadableVectorInspector;
+import org.apache.druid.segment.vector.ReadableVectorOffset;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.utils.CloseableUtils;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+/**
+ * {@link DictionaryEncodedColumn<String>} for a column which uses a {@link FrontCodedIndexed} to store its value
+ * dictionary, which 'delta encodes' strings (instead of {@link org.apache.druid.segment.data.GenericIndexed} like
+ * {@link StringDictionaryEncodedColumn}).
+ *
+ * This class is otherwise nearly identical to {@link StringDictionaryEncodedColumn} other than the dictionary
+ * difference.
+ */
+public class StringFrontCodedDictionaryEncodedColumn implements DictionaryEncodedColumn<String>
+{
+  @Nullable
+  private final ColumnarInts column;
+  @Nullable
+  private final ColumnarMultiInts multiValueColumn;
+  private final FrontCodedIndexed utf8Dictionary;
+
+  public StringFrontCodedDictionaryEncodedColumn(
+      @Nullable ColumnarInts singleValueColumn,
+      @Nullable ColumnarMultiInts multiValueColumn,
+      FrontCodedIndexed utf8Dictionary
+  )
+  {
+    this.column = singleValueColumn;
+    this.multiValueColumn = multiValueColumn;
+    this.utf8Dictionary = utf8Dictionary;
+  }
+
+  @Override
+  public int length()
+  {
+    return hasMultipleValues() ? multiValueColumn.size() : column.size();
+  }
+
+  @Override
+  public boolean hasMultipleValues()
+  {
+    return column == null;
+  }
+
+  @Override
+  public int getSingleValueRow(int rowNum)
+  {
+    return column.get(rowNum);
+  }
+
+  @Override
+  public IndexedInts getMultiValueRow(int rowNum)
+  {
+    return multiValueColumn.get(rowNum);
+  }
+
+  @Override
+  @Nullable
+  public String lookupName(int id)
+  {
+    final ByteBuffer buffer = utf8Dictionary.get(id);
+    if (buffer == null) {
+      return null;
+    }
+    return StringUtils.fromUtf8(buffer);
+  }
+
+  @Override
+  public int lookupId(String name)
+  {
+    return utf8Dictionary.indexOf(StringUtils.toUtf8ByteBuffer(name));
+  }
+
+  @Override
+  public int getCardinality()
+  {
+    return utf8Dictionary.size();
+  }
+
+  @Override
+  public HistoricalDimensionSelector makeDimensionSelector(
+      final ReadableOffset offset,
+      @Nullable final ExtractionFn extractionFn
+  )
+  {
+    abstract class QueryableDimensionSelector extends AbstractDimensionSelector
+        implements HistoricalDimensionSelector, IdLookup
+    {
+      @Override
+      public int getValueCardinality()
+      {
+        /*
+         This is technically wrong if
+         extractionFn != null && (extractionFn.getExtractionType() != ExtractionFn.ExtractionType.ONE_TO_ONE ||
+                                    !extractionFn.preservesOrdering())
+         However current behavior allows some GroupBy-V1 queries to work that wouldn't work otherwise and doesn't
+         cause any problems due to special handling of extractionFn everywhere.
+         See https://github.com/apache/druid/pull/8433
+         */
+        return getCardinality();
+      }
+
+      @Override
+      public String lookupName(int id)
+      {
+        final String value = StringFrontCodedDictionaryEncodedColumn.this.lookupName(id);
+        return extractionFn == null ? value : extractionFn.apply(value);
+      }
+
+      @Nullable
+      @Override
+      public ByteBuffer lookupNameUtf8(int id)
+      {
+        return utf8Dictionary.get(id);
+      }
+
+      @Override
+      public boolean supportsLookupNameUtf8()
+      {
+        return true;
+      }
+
+      @Override
+      public boolean nameLookupPossibleInAdvance()
+      {
+        return true;
+      }
+
+      @Nullable
+      @Override
+      public IdLookup idLookup()
+      {
+        return extractionFn == null ? this : null;
+      }
+
+      @Override
+      public int lookupId(String name)
+      {
+        if (extractionFn != null) {
+          throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function");
+        }
+        return StringFrontCodedDictionaryEncodedColumn.this.lookupId(name);
+      }
+    }
+
+    if (hasMultipleValues()) {
+      class MultiValueDimensionSelector extends QueryableDimensionSelector
+      {
+        @Override
+        public IndexedInts getRow()
+        {
+          return multiValueColumn.get(offset.getOffset());
+        }
+
+        @Override
+        public IndexedInts getRow(int offset)
+        {
+          return multiValueColumn.get(offset);
+        }
+
+        @Override
+        public ValueMatcher makeValueMatcher(@Nullable String value)
+        {
+          return DimensionSelectorUtils.makeValueMatcherGeneric(this, value);
+        }
+
+        @Override
+        public ValueMatcher makeValueMatcher(Predicate<String> predicate)
+        {
+          return DimensionSelectorUtils.makeValueMatcherGeneric(this, predicate);
+        }
+
+        @Nullable
+        @Override
+        public Object getObject()
+        {
+          return defaultGetObject();
+        }
+
+        @Override
+        public Class classOfObject()
+        {
+          return Object.class;
+        }
+
+        @Override
+        public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+        {
+          inspector.visit("multiValueColumn", multiValueColumn);
+          inspector.visit("offset", offset);
+          inspector.visit("extractionFn", extractionFn);
+        }
+      }
+      return new MultiValueDimensionSelector();
+    } else {
+      class SingleValueQueryableDimensionSelector extends QueryableDimensionSelector
+          implements SingleValueHistoricalDimensionSelector
+      {
+        private final SingleIndexedInt row = new SingleIndexedInt();
+
+        @Override
+        public IndexedInts getRow()
+        {
+          row.setValue(getRowValue());
+          return row;
+        }
+
+        public int getRowValue()
+        {
+          return column.get(offset.getOffset());
+        }
+
+        @Override
+        public IndexedInts getRow(int offset)
+        {
+          row.setValue(getRowValue(offset));
+          return row;
+        }
+
+        @Override
+        public int getRowValue(int offset)
+        {
+          return column.get(offset);
+        }
+
+        @Override
+        public ValueMatcher makeValueMatcher(final @Nullable String value)
+        {
+          if (extractionFn == null) {
+            final int valueId = super.lookupId(value);
+            if (valueId >= 0) {
+              return new ValueMatcher()
+              {
+                @Override
+                public boolean matches()
+                {
+                  return getRowValue() == valueId;
+                }
+
+                @Override
+                public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+                {
+                  inspector.visit("column", StringFrontCodedDictionaryEncodedColumn.this);
+                }
+              };
+            } else {
+              return BooleanValueMatcher.of(false);
+            }
+          } else {
+            // Employ caching BitSet optimization
+            return makeValueMatcher(Predicates.equalTo(value));
+          }
+        }
+
+        @Override
+        public ValueMatcher makeValueMatcher(final Predicate<String> predicate)
+        {
+          final BitSet checkedIds = new BitSet(getCardinality());
+          final BitSet matchingIds = new BitSet(getCardinality());
+
+          // Lazy matcher; only check an id if matches() is called.
+          return new ValueMatcher()
+          {
+            @Override
+            public boolean matches()
+            {
+              final int id = getRowValue();
+
+              if (checkedIds.get(id)) {
+                return matchingIds.get(id);
+              } else {
+                final boolean matches = predicate.apply(lookupName(id));
+                checkedIds.set(id);
+                if (matches) {
+                  matchingIds.set(id);
+                }
+                return matches;
+              }
+            }
+
+            @Override
+            public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+            {
+              inspector.visit("column", StringFrontCodedDictionaryEncodedColumn.this);
+            }
+          };
+        }
+
+        @Override
+        public Object getObject()
+        {
+          return super.lookupName(getRowValue());
+        }
+
+        @Override
+        public Class classOfObject()
+        {
+          return String.class;
+        }
+
+        @Override
+        public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+        {
+          inspector.visit("column", column);
+          inspector.visit("offset", offset);
+          inspector.visit("extractionFn", extractionFn);
+        }
+      }
+      return new SingleValueQueryableDimensionSelector();
+    }
+  }
+
+  @Override
+  public SingleValueDimensionVectorSelector makeSingleValueDimensionVectorSelector(final ReadableVectorOffset offset)
+  {
+    class QueryableSingleValueDimensionVectorSelector implements SingleValueDimensionVectorSelector, IdLookup
+    {
+      private final int[] vector = new int[offset.getMaxVectorSize()];
+      private int id = ReadableVectorInspector.NULL_ID;
+
+      @Override
+      public int[] getRowVector()
+      {
+        if (id == offset.getId()) {
+          return vector;
+        }
+
+        if (offset.isContiguous()) {
+          column.get(vector, offset.getStartOffset(), offset.getCurrentVectorSize());
+        } else {
+          column.get(vector, offset.getOffsets(), offset.getCurrentVectorSize());
+        }
+
+        id = offset.getId();
+        return vector;
+      }
+
+      @Override
+      public int getValueCardinality()
+      {
+        return getCardinality();
+      }
+
+      @Nullable
+      @Override
+      public String lookupName(final int id)
+      {
+        return StringFrontCodedDictionaryEncodedColumn.this.lookupName(id);
+      }
+
+      @Nullable
+      @Override
+      public ByteBuffer lookupNameUtf8(int id)
+      {
+        return utf8Dictionary.get(id);
+      }
+
+      @Override
+      public boolean supportsLookupNameUtf8()
+      {
+        return true;
+      }
+
+      @Override
+      public boolean nameLookupPossibleInAdvance()
+      {
+        return true;
+      }
+
+      @Nullable
+      @Override
+      public IdLookup idLookup()
+      {
+        return this;
+      }
+
+      @Override
+      public int lookupId(@Nullable final String name)
+      {
+        return StringFrontCodedDictionaryEncodedColumn.this.lookupId(name);
+      }
+
+      @Override
+      public int getCurrentVectorSize()
+      {
+        return offset.getCurrentVectorSize();
+      }
+
+      @Override
+      public int getMaxVectorSize()
+      {
+        return offset.getMaxVectorSize();
+      }
+    }
+
+    return new QueryableSingleValueDimensionVectorSelector();
+  }
+
+  @Override
+  public MultiValueDimensionVectorSelector makeMultiValueDimensionVectorSelector(final ReadableVectorOffset offset)
+  {
+    class QueryableMultiValueDimensionVectorSelector implements MultiValueDimensionVectorSelector, IdLookup
+    {
+      private final IndexedInts[] vector = new IndexedInts[offset.getMaxVectorSize()];
+      private int id = ReadableVectorInspector.NULL_ID;
+
+      @Override
+      public IndexedInts[] getRowVector()
+      {
+        if (id == offset.getId()) {
+          return vector;
+        }
+
+        if (offset.isContiguous()) {
+          final int currentOffset = offset.getStartOffset();
+          final int numRows = offset.getCurrentVectorSize();
+
+          for (int i = 0; i < numRows; i++) {
+            // Must use getUnshared, otherwise all elements in the vector could be the same shared object.
+            vector[i] = multiValueColumn.getUnshared(i + currentOffset);
+          }
+        } else {
+          final int[] offsets = offset.getOffsets();
+          final int numRows = offset.getCurrentVectorSize();
+
+          for (int i = 0; i < numRows; i++) {
+            // Must use getUnshared, otherwise all elements in the vector could be the same shared object.
+            vector[i] = multiValueColumn.getUnshared(offsets[i]);
+          }
+        }
+
+        id = offset.getId();
+        return vector;
+      }
+
+      @Override
+      public int getValueCardinality()
+      {
+        return getCardinality();
+      }
+
+      @Nullable
+      @Override
+      public String lookupName(final int id)
+      {
+        return StringFrontCodedDictionaryEncodedColumn.this.lookupName(id);
+      }
+
+      @Nullable
+      @Override
+      public ByteBuffer lookupNameUtf8(int id)
+      {
+        return utf8Dictionary.get(id);
+      }
+
+      @Override
+      public boolean supportsLookupNameUtf8()
+      {
+        return true;
+      }
+
+      @Override
+      public boolean nameLookupPossibleInAdvance()
+      {
+        return true;
+      }
+
+      @Nullable
+      @Override
+      public IdLookup idLookup()
+      {
+        return this;
+      }
+
+      @Override
+      public int lookupId(@Nullable final String name)
+      {
+        return StringFrontCodedDictionaryEncodedColumn.this.lookupId(name);
+      }
+
+      @Override
+      public int getCurrentVectorSize()
+      {
+        return offset.getCurrentVectorSize();
+      }
+
+      @Override
+      public int getMaxVectorSize()
+      {
+        return offset.getMaxVectorSize();
+      }
+    }
+
+    return new QueryableMultiValueDimensionVectorSelector();
+  }
+
+  @Override
+  public VectorObjectSelector makeVectorObjectSelector(ReadableVectorOffset offset)
+  {
+    if (!hasMultipleValues()) {
+      class DictionaryEncodedStringSingleValueVectorObjectSelector implements VectorObjectSelector
+      {
+        private final int[] vector = new int[offset.getMaxVectorSize()];
+        private final String[] strings = new String[offset.getMaxVectorSize()];
+        private int id = ReadableVectorInspector.NULL_ID;
+
+        @Override
+
+        public Object[] getObjectVector()
+        {
+          if (id == offset.getId()) {
+            return strings;
+          }
+
+          if (offset.isContiguous()) {
+            column.get(vector, offset.getStartOffset(), offset.getCurrentVectorSize());
+          } else {
+            column.get(vector, offset.getOffsets(), offset.getCurrentVectorSize());
+          }
+          for (int i = 0; i < offset.getCurrentVectorSize(); i++) {
+            strings[i] = lookupName(vector[i]);
+          }
+          id = offset.getId();
+
+          return strings;
+        }
+
+        @Override
+        public int getMaxVectorSize()
+        {
+          return offset.getMaxVectorSize();
+        }
+
+        @Override
+        public int getCurrentVectorSize()
+        {
+          return offset.getCurrentVectorSize();
+        }
+      }
+
+      return new DictionaryEncodedStringSingleValueVectorObjectSelector();
+    } else {
+      throw new UnsupportedOperationException("Multivalue string object selector not implemented yet");
+    }
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    CloseableUtils.closeAll(column, multiValueColumn);
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CachingIndexed.java b/processing/src/main/java/org/apache/druid/segment/data/CachingIndexed.java
index ddf645f4f0..4c4823550a 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CachingIndexed.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CachingIndexed.java
@@ -88,6 +88,12 @@ public class CachingIndexed<T> implements CloseableIndexed<T>
     return delegate.indexOf(value);
   }
 
+  @Override
+  public boolean isSorted()
+  {
+    return delegate.isSorted();
+  }
+
   @Override
   public Iterator<T> iterator()
   {
diff --git a/processing/src/main/java/org/apache/druid/segment/data/DictionaryWriter.java b/processing/src/main/java/org/apache/druid/segment/data/DictionaryWriter.java
new file mode 100644
index 0000000000..a9cd5892ff
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/data/DictionaryWriter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import org.apache.druid.segment.serde.Serializer;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+public interface DictionaryWriter<T> extends Serializer
+{
+  boolean isSorted();
+
+  void open() throws IOException;
+
+  void write(@Nullable T objectToWrite) throws IOException;
+
+  @Nullable
+  T get(int dictId) throws IOException;
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/data/EncodedStringDictionaryWriter.java b/processing/src/main/java/org/apache/druid/segment/data/EncodedStringDictionaryWriter.java
new file mode 100644
index 0000000000..0ea3c809d0
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/data/EncodedStringDictionaryWriter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.column.StringEncodingStrategy;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+public class EncodedStringDictionaryWriter implements DictionaryWriter<String>
+{
+  public static final byte VERSION = Byte.MAX_VALUE; // hopefully GenericIndexed never makes a version this high...
+
+  private final StringEncodingStrategy encodingStrategy;
+  private final DictionaryWriter<byte[]> delegate;
+
+  public EncodedStringDictionaryWriter(
+      DictionaryWriter<byte[]> delegate,
+      StringEncodingStrategy encodingStrategy
+  )
+  {
+    this.delegate = delegate;
+    this.encodingStrategy = encodingStrategy;
+  }
+
+  @Override
+  public boolean isSorted()
+  {
+    return delegate.isSorted();
+  }
+
+  @Override
+  public void open() throws IOException
+  {
+    delegate.open();
+  }
+
+  @Override
+  public void write(@Nullable String objectToWrite) throws IOException
+  {
+    delegate.write(StringUtils.toUtf8Nullable(NullHandling.emptyToNullIfNeeded(objectToWrite)));
+  }
+
+  @Nullable
+  @Override
+  public String get(int dictId) throws IOException
+  {
+    final byte[] bytes = delegate.get(dictId);
+    if (bytes == null) {
+      return null;
+    }
+    return StringUtils.fromUtf8(bytes);
+  }
+
+  @Override
+  public long getSerializedSize() throws IOException
+  {
+    return 2 + delegate.getSerializedSize();
+  }
+
+  @Override
+  public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
+  {
+    channel.write(ByteBuffer.wrap(new byte[]{VERSION}));
+    channel.write(ByteBuffer.wrap(new byte[]{encodingStrategy.getId()}));
+    delegate.writeTo(channel, smoosher);
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexed.java b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexed.java
index 10a29dff91..1dced49151 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexed.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexed.java
@@ -149,6 +149,12 @@ public class FixedIndexed<T> implements Indexed<T>
     return -(minIndex + 1);
   }
 
+  @Override
+  public boolean isSorted()
+  {
+    return isSorted;
+  }
+
   @Override
   public Iterator<T> iterator()
   {
diff --git a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java
new file mode 100644
index 0000000000..890a797af7
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexed.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing variable-width binary values (such as utf8 encoded strings), which must be
+ * sorted and unique, using 'front coding'. Front coding is a type of delta encoding for byte arrays, where sorted
+ * values are grouped into buckets. The first value of the bucket is written entirely, and remaining values are stored
+ * as a pair of an integer which indicates how much of the first byte array of the bucket to use as a prefix, followed
+ * by the remaining bytes after the prefix to complete the value.
+ *
+ * front coded indexed layout:
+ * | version | bucket size | has null? | number of values | size of "offsets" + "buckets" | "offsets" | "buckets" |
+ * | ------- | ----------- | --------- | ---------------- | ----------------------------- | --------- | --------- |
+ * |    byte |        byte |      byte |        vbyte int |                     vbyte int |     int[] |  bucket[] |
+ *
+ * "offsets" are the ending offsets of each bucket stored in order, stored as plain integers for easy random access.
+ *
+ * bucket layout:
+ * | first value | prefix length | fragment | ... | prefix length | fragment |
+ * | ----------- | ------------- | -------- | --- | ------------- | -------- |
+ * |        blob |     vbyte int |     blob | ... |     vbyte int |     blob |
+ *
+ * blob layout:
+ * | blob length | blob bytes |
+ * | ----------- | ---------- |
+ * |   vbyte int |     byte[] |
+ *
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ *
+ */
+public final class FrontCodedIndexed implements Indexed<ByteBuffer>
+{
+  public static Supplier<FrontCodedIndexed> read(ByteBuffer buffer, ByteOrder ordering)
+  {
+    final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = orderedBuffer.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + version);
+    final int bucketSize = orderedBuffer.get();
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == orderedBuffer.get();
+    final int numValues = VByte.readInt(orderedBuffer);
+    // size of offsets + values
+    final int size = VByte.readInt(orderedBuffer);
+    final int offsetsPosition = orderedBuffer.position();
+    // move position to end of buffer
+    buffer.position(offsetsPosition + size);
+
+    return () -> new FrontCodedIndexed(
+        buffer,
+        ordering,
+        bucketSize,
+        numValues,
+        hasNull,
+        offsetsPosition
+    );
+  }
+
+  private final ByteBuffer buffer;
+  private final int adjustedNumValues;
+  private final int adjustIndex;
+  private final int bucketSize;
+  private final int numBuckets;
+  private final int div;
+  private final int rem;
+  private final int offsetsPosition;
+  private final int bucketsPosition;
+  private final boolean hasNull;
+  private final int lastBucketNumValues;
+
+  private FrontCodedIndexed(
+      ByteBuffer buffer,
+      ByteOrder order,
+      int bucketSize,
+      int numValues,
+      boolean hasNull,
+      int offsetsPosition
+  )
+  {
+    if (Integer.bitCount(bucketSize) != 1) {
+      throw new ISE("bucketSize must be a power of two but was[%,d]", bucketSize);
+    }
+    this.buffer = buffer.asReadOnlyBuffer().order(order);
+    this.bucketSize = bucketSize;
+    this.hasNull = hasNull;
+
+    this.numBuckets = (int) Math.ceil((double) numValues / (double) bucketSize);
+    this.adjustIndex = hasNull ? 1 : 0;
+    this.adjustedNumValues = numValues + adjustIndex;
+    this.div = Integer.numberOfTrailingZeros(bucketSize);
+    this.rem = bucketSize - 1;
+    this.lastBucketNumValues = (numValues & rem) == 0 ? bucketSize : numValues & rem;
+    this.offsetsPosition = offsetsPosition;
+    this.bucketsPosition = offsetsPosition + ((numBuckets - 1) * Integer.BYTES);
+  }
+
+  @Override
+  public int size()
+  {
+    return adjustedNumValues;
+  }
+
+  @Nullable
+  @Override
+  public ByteBuffer get(int index)
+  {
+    if (hasNull && index == 0) {
+      return null;
+    }
+
+    // due to vbyte encoding, the null value is not actually stored in the bucket (no negative values), so we adjust
+    // the index
+    final int adjustedIndex = index - adjustIndex;
+    // find the bucket which contains the value with maths
+    final int bucket = adjustedIndex >> div;
+    final int bucketIndex = adjustedIndex & rem;
+    final int offset = getBucketOffset(bucket);
+    buffer.position(offset);
+    return getFromBucket(buffer, bucketIndex);
+  }
+
+  @Override
+  public int indexOf(@Nullable ByteBuffer value)
+  {
+    // performs binary search using the first values of each bucket to locate the appropriate bucket, and then does
+    // a linear scan to find the value within the bucket
+    if (value == null) {
+      return hasNull ? 0 : -1;
+    }
+
+    int minBucketIndex = 0;
+    int maxBucketIndex = numBuckets - 1;
+    while (minBucketIndex < maxBucketIndex) {
+      int currentBucket = (minBucketIndex + maxBucketIndex) >>> 1;
+      int currBucketFirstValueIndex = currentBucket * bucketSize;
+
+      // compare against first value in "current" bucket
+      final int offset = getBucketOffset(currentBucket);
+      buffer.position(offset);
+      final int firstLength = VByte.readInt(buffer);
+      final int firstOffset = buffer.position();
+      int comparison = compareBucketFirstValue(buffer, firstLength, value);
+      // save the length of the shared prefix with the first value of the bucket and the value to match so we
+      // can use it later to skip over all values in the bucket that share a longer prefix with the first value
+      // (the bucket is sorted, so the prefix length gets smaller as values increase)
+      final int sharedPrefix = buffer.position() - firstOffset;
+      if (comparison == 0) {
+        if (firstLength == value.remaining()) {
+          // it turns out that the first value in current bucket is what we are looking for, short circuit
+          return currBucketFirstValueIndex + adjustIndex;
+        } else {
+          comparison = Integer.compare(firstLength, value.remaining());
+        }
+      }
+
+      // we also compare against the adjacent bucket to determine if the value is actually in this bucket or
+      // if we need to keep searching buckets
+      final int nextOffset = getBucketOffset(currentBucket + 1);
+      buffer.position(nextOffset);
+      final int nextLength = VByte.readInt(buffer);
+      int comparisonNext = compareBucketFirstValue(buffer, nextLength, value);
+      if (comparisonNext == 0) {
+        if (nextLength == value.remaining()) {
+          // it turns out that the first value in next bucket is what we are looking for, go ahead and short circuit
+          // for that as well, even though we weren't going to scan that bucket on this iteration...
+          return (currBucketFirstValueIndex + adjustIndex) + bucketSize;
+        } else {
+          comparisonNext = Integer.compare(nextLength, value.remaining());
+        }
+      }
+
+      if (comparison < 0 && comparisonNext > 0) {
+        // this is exactly the right bucket
+        // find the value in the bucket (or where it would be if it were present)
+        buffer.position(firstOffset + firstLength);
+
+        return findValueInBucket(value, currBucketFirstValueIndex, bucketSize, sharedPrefix);
+      } else if (comparison < 0) {
+        minBucketIndex = currentBucket + 1;
+      } else {
+        maxBucketIndex = currentBucket - 1;
+      }
+    }
+
+    // this is where we ended up, try to find the value in the bucket
+    final int bucketIndexBase = minBucketIndex * bucketSize;
+    final int numValuesInBucket;
+    if (minBucketIndex == numBuckets - 1) {
+      numValuesInBucket = lastBucketNumValues;
+    } else {
+      numValuesInBucket = bucketSize;
+    }
+    final int offset = getBucketOffset(minBucketIndex);
+
+    // like we did in the loop, except comparison being smaller the first value here is a short circuit
+    buffer.position(offset);
+    final int firstLength = VByte.readInt(buffer);
+    final int firstOffset = buffer.position();
+    int comparison = compareBucketFirstValue(buffer, firstLength, value);
+    final int sharedPrefix = buffer.position() - firstOffset;
+    if (comparison == 0) {
+      if (firstLength == value.remaining()) {
+        // it turns out that the first value in current bucket is what we are looking for, short circuit
+        return bucketIndexBase + adjustIndex;
+      } else {
+        comparison = Integer.compare(firstLength, value.remaining());
+      }
+    }
+
+    if (comparison > 0) {
+      // value preceedes bucket, so bail out
+      return -(bucketIndexBase + adjustIndex) - 1;
+    }
+
+    buffer.position(firstOffset + firstLength);
+
+    return findValueInBucket(value, bucketIndexBase, numValuesInBucket, sharedPrefix);
+  }
+
+  @Override
+  public boolean isSorted()
+  {
+    // FrontCodedIndexed only supports sorted values
+    return true;
+  }
+
+  @Override
+  public Iterator<ByteBuffer> iterator()
+  {
+    ByteBuffer copy = buffer.asReadOnlyBuffer().order(buffer.order());
+    copy.position(bucketsPosition);
+    final ByteBuffer[] firstBucket = readBucket(copy, numBuckets > 1 ? bucketSize : lastBucketNumValues);
+    // iterator decodes and buffers a bucket at a time, paging through buckets as the iterator is consumed
+    return new Iterator<ByteBuffer>()
+    {
+      private int currIndex = 0;
+      private int currentBucketIndex = 0;
+      private ByteBuffer[] currentBucket = firstBucket;
+
+      @Override
+      public boolean hasNext()
+      {
+        return currIndex < adjustedNumValues;
+      }
+
+      @Override
+      public ByteBuffer next()
+      {
+        // null is handled special
+        if (hasNull && currIndex == 0) {
+          currIndex++;
+          return null;
+        }
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+        final int adjustedCurrIndex = hasNull ? currIndex - 1 : currIndex;
+        final int bucketNum = adjustedCurrIndex >> div;
+        // load next bucket if needed
+        if (bucketNum != currentBucketIndex) {
+          final int offset = copy.getInt(offsetsPosition + ((bucketNum - 1) * Integer.BYTES));
+          copy.position(bucketsPosition + offset);
+          currentBucket = readBucket(
+              copy,
+              bucketNum < (numBuckets - 1) ? bucketSize : lastBucketNumValues
+          );
+          currentBucketIndex = bucketNum;
+        }
+        int offset = adjustedCurrIndex & rem;
+        currIndex++;
+        return currentBucket[offset];
+      }
+
+      @Override
+      public void remove()
+      {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  @Override
+  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+  {
+    inspector.visit("buffer", buffer);
+    inspector.visit("hasNulls", hasNull);
+    inspector.visit("bucketSize", bucketSize);
+  }
+
+  private int getBucketOffset(int bucket)
+  {
+    // get offset of that bucket in the value buffer, subtract 1 to get the starting position because we only store the
+    // ending offset, so look at the ending offset of the previous bucket, or 0 if this is the first bucket
+    return bucketsPosition + (bucket > 0 ? buffer.getInt(offsetsPosition + ((bucket - 1) * Integer.BYTES)) : 0);
+  }
+
+
+  /**
+   * Performs an unsigned byte comparison of the first value in a bucket with the specified value. Note that this method
+   * MUST be prepared before calling, as it expects the length of the first value to have already been read externally,
+   * and the buffer position to be at the start of the first bucket value. The final buffer position will be the
+   * 'shared prefix length' of the first value in the bucket and the value to compare
+   */
+  private static int compareBucketFirstValue(ByteBuffer bucketBuffer, int length, ByteBuffer value)
+  {
+    final int startOffset = bucketBuffer.position();
+    final int commonLength = Math.min(length, value.remaining());
+    // save the length of the shared prefix with the first value of the bucket and the value to match so we
+    // can use it later to skip over all values in the bucket that share a longer prefix with the first value
+    // (the bucket is sorted, so the prefix length gets smaller as values increase)
+    int sharedPrefix;
+    int comparison = 0;
+    for (sharedPrefix = 0; sharedPrefix < commonLength; sharedPrefix++) {
+      comparison = unsignedByteCompare(bucketBuffer.get(), value.get(sharedPrefix));
+      if (comparison != 0) {
+        bucketBuffer.position(startOffset + sharedPrefix);
+        break;
+      }
+    }
+    return comparison;
+  }
+
+  /**
+   * Finds a value in a bucket among the fragments. The first value is assumed to have been already compared against
+   * and be smaller than the value we are looking for. This comparison is the source of the 'shared prefix', which is
+   * the length which the value has in common with the first value of the bucket.
+   *
+   * This method uses this shared prefix length to skip more expensive byte by byte full value comparisons when
+   * possible by comparing the shared prefix length with the prefix length of the fragment. Since the bucket is always
+   * sorted, prefix lengths shrink as you progress to higher indexes, and we can use this to reason that a fragment
+   * with a longer prefix length than the shared prefix will always sort before the value we are looking for, and values
+   * which have a shorter prefix will always be greater than the value we are looking for, so we only need to do a
+   * full comparison if the prefix length is the same
+   *
+   * this method modifies the position of {@link #buffer}
+   */
+  private int findValueInBucket(
+      ByteBuffer value,
+      int currBucketFirstValueIndex,
+      int bucketSize,
+      int sharedPrefix
+  )
+  {
+    int relativePosition = 0;
+    int prefixLength;
+    // scan through bucket values until we find match or compare numValues
+    int insertionPoint = 1;
+    while (++relativePosition < bucketSize) {
+      prefixLength = VByte.readInt(buffer);
+      if (prefixLength > sharedPrefix) {
+        final int skip = VByte.readInt(buffer);
+        buffer.position(buffer.position() + skip);
+        insertionPoint++;
+      } else if (prefixLength < sharedPrefix) {
+        // prefix is smaller, that means this value sorts ahead of it
+        break;
+      } else {
+        final int fragmentLength = VByte.readInt(buffer);
+        final int common = Math.min(fragmentLength, value.remaining() - prefixLength);
+        int fragmentComparison = 0;
+        for (int i = 0; i < common; i++) {
+          fragmentComparison = unsignedByteCompare(buffer.get(buffer.position() + i), value.get(prefixLength + i));
+          if (fragmentComparison != 0) {
+            break;
+          }
+        }
+        if (fragmentComparison == 0) {
+          fragmentComparison = Integer.compare(prefixLength + fragmentLength, value.remaining());
+        }
+
+        if (fragmentComparison == 0) {
+          return (currBucketFirstValueIndex + adjustIndex) + relativePosition;
+        } else if (fragmentComparison < 0) {
+          buffer.position(buffer.position() + fragmentLength);
+          insertionPoint++;
+        } else {
+          break;
+        }
+      }
+    }
+    // (-(insertion point) - 1)
+    return -(currBucketFirstValueIndex + adjustIndex) + (-(insertionPoint) - 1);
+  }
+
+  /**
+   * Get a value from a bucket at a relative position.
+   *
+   * This method modifies the position of the buffer.
+   */
+  static ByteBuffer getFromBucket(ByteBuffer buffer, int offset)
+  {
+    int prefixPosition;
+    if (offset == 0) {
+      final int length = VByte.readInt(buffer);
+      final ByteBuffer firstValue = buffer.asReadOnlyBuffer().order(buffer.order());
+      firstValue.limit(firstValue.position() + length);
+      return firstValue;
+    } else {
+      final int firstLength = VByte.readInt(buffer);
+      prefixPosition = buffer.position();
+      buffer.position(buffer.position() + firstLength);
+    }
+    int pos = 0;
+    int prefixLength;
+    int fragmentLength;
+    int fragmentPosition;
+    // scan through bucket values until we reach offset
+    do {
+      prefixLength = VByte.readInt(buffer);
+      if (++pos < offset) {
+        // not there yet, no need to read anything other than the length to skip ahead
+        final int skipLength = VByte.readInt(buffer);
+        buffer.position(buffer.position() + skipLength);
+      } else {
+        // we've reached our destination
+        fragmentLength = VByte.readInt(buffer);
+        fragmentPosition = buffer.position();
+        break;
+      }
+    } while (true);
+    final int valueLength = prefixLength + fragmentLength;
+    ByteBuffer value = ByteBuffer.allocate(valueLength);
+    for (int i = 0; i < valueLength; i++) {
+      if (i < prefixLength) {
+        value.put(buffer.get(prefixPosition + i));
+      } else {
+        value.put(buffer.get(fragmentPosition + i - prefixLength));
+      }
+    }
+    value.flip();
+    return value;
+  }
+
+
+  /**
+   * Read an entire bucket from a {@link ByteBuffer}, returning an array of reconstructed value bytes.
+   *
+   * This method modifies the position of the buffer.
+   */
+  private static ByteBuffer[] readBucket(ByteBuffer bucket, int numValues)
+  {
+    final int length = VByte.readInt(bucket);
+    final byte[] prefixBytes = new byte[length];
+    bucket.get(prefixBytes, 0, length);
+    final ByteBuffer[] bucketBuffers = new ByteBuffer[numValues];
+    bucketBuffers[0] = ByteBuffer.wrap(prefixBytes);
+    int pos = 1;
+    while (pos < numValues) {
+      final int prefixLength = VByte.readInt(bucket);
+      final int fragmentLength = VByte.readInt(bucket);
+      final byte[] fragment = new byte[fragmentLength];
+      bucket.get(fragment, 0, fragmentLength);
+      final ByteBuffer value = ByteBuffer.allocate(prefixLength + fragmentLength);
+      value.put(prefixBytes, 0, prefixLength);
+      value.put(fragment);
+      value.flip();
+      bucketBuffers[pos++] = value;
+    }
+    return bucketBuffers;
+  }
+
+  public static int unsignedByteCompare(byte b1, byte b2)
+  {
+    return (b1 & 0xFF) - (b2 & 0xFF);
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java
new file mode 100644
index 0000000000..e295e7abb0
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/data/FrontCodedIndexedWriter.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.primitives.Ints;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.io.Channels;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+
+/**
+ * {@link DictionaryWriter} for a {@link FrontCodedIndexed}, written to a {@link SegmentWriteOutMedium}. Values MUST
+ * be added to this dictionary writer in sorted order, which is enforced.
+ *
+ * Front coding is a type of delta encoding for byte arrays, where values are grouped into buckets. The first value of
+ * the bucket is written entirely, and remaining values are stored as pairs of an integer which indicates how much
+ * of the first byte array of the bucket to use as a prefix, followed by the remaining value bytes after the prefix.
+ *
+ * This is valid to use for any values which can be compared byte by byte with unsigned comparison. Otherwise, this
+ * is not the collection for you.
+ *
+ * @see FrontCodedIndexed for additional details.
+ */
+public class FrontCodedIndexedWriter implements DictionaryWriter<byte[]>
+{
+  private static final int MAX_LOG_BUFFER_SIZE = 26;
+  private final SegmentWriteOutMedium segmentWriteOutMedium;
+  private final int bucketSize;
+  private final ByteOrder byteOrder;
+  private final byte[][] bucketBuffer;
+  private final ByteBuffer getOffsetBuffer;
+  private final int div;
+
+  @Nullable
+  private byte[] prevObject = null;
+  @Nullable
+  private WriteOutBytes headerOut = null;
+  @Nullable
+  private WriteOutBytes valuesOut = null;
+  private int numWritten = 0;
+  private ByteBuffer scratch;
+  private int logScratchSize = 10;
+  private boolean isClosed = false;
+  private boolean hasNulls = false;
+
+  public FrontCodedIndexedWriter(
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      ByteOrder byteOrder,
+      int bucketSize
+  )
+  {
+    if (Integer.bitCount(bucketSize) != 1) {
+      throw new ISE("bucketSize must be a power of two but was[%,d]", bucketSize);
+    }
+    this.segmentWriteOutMedium = segmentWriteOutMedium;
+    this.scratch = ByteBuffer.allocate(1 << logScratchSize).order(byteOrder);
+    this.bucketSize = bucketSize;
+    this.byteOrder = byteOrder;
+    this.bucketBuffer = new byte[bucketSize][];
+    this.getOffsetBuffer = ByteBuffer.allocate(Integer.BYTES).order(byteOrder);
+    this.div = Integer.numberOfTrailingZeros(bucketSize);
+  }
+
+  @Override
+  public void open() throws IOException
+  {
+    headerOut = segmentWriteOutMedium.makeWriteOutBytes();
+    valuesOut = segmentWriteOutMedium.makeWriteOutBytes();
+  }
+
+  @Override
+  public void write(@Nullable byte[] value) throws IOException
+  {
+    if (prevObject != null && unsignedCompare(prevObject, value) >= 0) {
+      throw new ISE(
+          "Values must be sorted and unique. Element [%s] with value [%s] is before or equivalent to [%s]",
+          numWritten,
+          value == null ? null : StringUtils.fromUtf8(value),
+          StringUtils.fromUtf8(prevObject)
+      );
+    }
+
+    if (value == null) {
+      hasNulls = true;
+      return;
+    }
+
+    // if the bucket buffer is full, write the bucket
+    if (numWritten > 0 && (numWritten % bucketSize) == 0) {
+      resetScratch();
+      int written;
+      // write the bucket, growing scratch buffer as necessary
+      do {
+        written = writeBucket(scratch, bucketBuffer, bucketSize);
+        if (written < 0) {
+          growScratch();
+        }
+      } while (written < 0);
+      scratch.flip();
+      Channels.writeFully(valuesOut, scratch);
+
+      resetScratch();
+      // write end offset for current value
+      scratch.putInt((int) valuesOut.size());
+      scratch.flip();
+      Channels.writeFully(headerOut, scratch);
+    }
+
+    bucketBuffer[numWritten % bucketSize] = value;
+
+    ++numWritten;
+    prevObject = value;
+  }
+
+
+  @Override
+  public long getSerializedSize() throws IOException
+  {
+    if (!isClosed) {
+      flush();
+    }
+    int headerAndValues = Ints.checkedCast(headerOut.size() + valuesOut.size());
+    return Byte.BYTES +
+           Byte.BYTES +
+           Byte.BYTES +
+           VByte.computeIntSize(numWritten) +
+           VByte.computeIntSize(headerAndValues) +
+           headerAndValues;
+  }
+
+  @Override
+  public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
+  {
+    if (!isClosed) {
+      flush();
+    }
+    resetScratch();
+    // version 0
+    scratch.put((byte) 0);
+    scratch.put((byte) bucketSize);
+    scratch.put(hasNulls ? NullHandling.IS_NULL_BYTE : NullHandling.IS_NOT_NULL_BYTE);
+    VByte.writeInt(scratch, numWritten);
+    VByte.writeInt(scratch, Ints.checkedCast(headerOut.size() + valuesOut.size()));
+    scratch.flip();
+    Channels.writeFully(channel, scratch);
+    headerOut.writeTo(channel);
+    valuesOut.writeTo(channel);
+  }
+
+  @Override
+  public boolean isSorted()
+  {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  public byte[] get(int index) throws IOException
+  {
+    if (index == 0 && hasNulls) {
+      return null;
+    }
+    final int adjustedIndex = hasNulls ? index - 1 : index;
+    final int relativeIndex = adjustedIndex % bucketSize;
+    // check for current page
+    if (adjustedIndex >= numWritten - bucketSize) {
+      return bucketBuffer[relativeIndex];
+    } else {
+      final int bucket = adjustedIndex >> div;
+      long startOffset;
+      if (bucket == 0) {
+        startOffset = 0;
+      } else {
+        startOffset = getBucketOffset(bucket - 1);
+      }
+      long endOffset = getBucketOffset(bucket);
+      int bucketSize = Ints.checkedCast(endOffset - startOffset);
+      if (bucketSize == 0) {
+        return null;
+      }
+      final ByteBuffer bucketBuffer = ByteBuffer.allocate(bucketSize).order(byteOrder);
+      valuesOut.readFully(startOffset, bucketBuffer);
+      bucketBuffer.clear();
+      final ByteBuffer valueBuffer = FrontCodedIndexed.getFromBucket(bucketBuffer, relativeIndex);
+      final byte[] valueBytes = new byte[valueBuffer.limit() - valueBuffer.position()];
+      valueBuffer.get(valueBytes);
+      return valueBytes;
+    }
+  }
+
+  private long getBucketOffset(int index) throws IOException
+  {
+    getOffsetBuffer.clear();
+    headerOut.readFully(index * (long) Integer.BYTES, getOffsetBuffer);
+    return getOffsetBuffer.getInt(0);
+  }
+
+  private void flush() throws IOException
+  {
+    int remainder = numWritten % bucketSize;
+    resetScratch();
+    int written;
+    do {
+      written = writeBucket(scratch, bucketBuffer, remainder == 0 ? bucketSize : remainder);
+      if (written < 0) {
+        growScratch();
+      }
+    } while (written < 0);
+    scratch.flip();
+    Channels.writeFully(valuesOut, scratch);
+    resetScratch();
+    isClosed = true;
+  }
+
+  private void resetScratch()
+  {
+    scratch.position(0);
+    scratch.limit(scratch.capacity());
+  }
+
+  private void growScratch()
+  {
+    if (logScratchSize < MAX_LOG_BUFFER_SIZE) {
+      this.scratch = ByteBuffer.allocate(1 << ++logScratchSize).order(byteOrder);
+    } else {
+      throw new IllegalStateException("scratch buffer to big to write buckets");
+    }
+  }
+
+  /**
+   * Write bucket of values to a {@link ByteBuffer}. The first value is written completely, subsequent values are
+   * written with an integer to indicate how much of the first value in the bucket is a prefix of the value, followed
+   * by the remaining bytes of the value.
+   *
+   * Uses {@link VByte} encoded integers to indicate prefix length and value length.
+   */
+  public static int writeBucket(ByteBuffer buffer, byte[][] values, int numValues)
+  {
+    int written = 0;
+    byte[] first = null;
+    while (written < numValues) {
+      byte[] next = values[written];
+      if (written == 0) {
+        first = next;
+        // the first value in the bucket is written completely as it is
+        int rem = writeValue(buffer, first);
+        // wasn't enough room, bail out
+        if (rem < 0) {
+          return rem;
+        }
+      } else {
+        // all other values must be partitioned into a prefix length and suffix bytes
+        int prefixLength = 0;
+        for (; prefixLength < first.length; prefixLength++) {
+          final int cmp = FrontCodedIndexed.unsignedByteCompare(first[prefixLength], next[prefixLength]);
+          if (cmp != 0) {
+            break;
+          }
+        }
+        // convert to bytes because not every char is a single byte
+        final byte[] suffix = new byte[next.length - prefixLength];
+        System.arraycopy(next, prefixLength, suffix, 0, suffix.length);
+        int rem = buffer.remaining() - VByte.computeIntSize(prefixLength);
+        // wasn't enough room, bail out
+        if (rem < 0) {
+          return rem;
+        }
+        VByte.writeInt(buffer, prefixLength);
+        rem = writeValue(buffer, suffix);
+        // wasn't enough room, bail out
+        if (rem < 0) {
+          return rem;
+        }
+      }
+      written++;
+    }
+    return written;
+  }
+
+  /**
+   * Write a variable length byte[] value to a {@link ByteBuffer}, storing the length as a {@link VByte} encoded
+   * integer followed by the value itself. Returns the number of bytes written to the buffer. This method returns a
+   * negative value if there is no room available in the buffer, so that it can be grown if needed.
+   */
+  public static int writeValue(ByteBuffer buffer, byte[] bytes)
+  {
+    final int remaining = buffer.remaining() - VByte.computeIntSize(bytes.length) - bytes.length;
+    if (remaining < 0) {
+      return remaining;
+    }
+    final int pos = buffer.position();
+    VByte.writeInt(buffer, bytes.length);
+    buffer.put(bytes, 0, bytes.length);
+    return buffer.position() - pos;
+  }
+
+  public static int unsignedCompare(
+      @Nullable final byte[] b1,
+      @Nullable final byte[] b2
+  )
+  {
+    if (b1 == null) {
+      return b2 == null ? 0 : -1;
+    }
+
+    if (b2 == null) {
+      return 1;
+    }
+    final int commonLength = Math.min(b1.length, b2.length);
+
+    for (int i = 0; i < commonLength; i++) {
+      final int cmp = FrontCodedIndexed.unsignedByteCompare(b1[i], b2[i]);
+      if (cmp != 0) {
+        return cmp;
+      }
+    }
+
+    return Integer.compare(b1.length, b2.length);
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
index 4fd6417f0f..c66868f761 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexed.java
@@ -76,6 +76,11 @@ import java.util.Iterator;
  * Header file name is identified as: StringUtils.format("%s_header", columnName)
  * value files are identified as: StringUtils.format("%s_value_%d", columnName, fileNumber)
  * number of value files == numElements/numberOfElementsPerValueFile
+ *
+ * The version {@link EncodedStringDictionaryWriter#VERSION} is reserved and must never be specified as the
+ * {@link GenericIndexed} version byte, else it will interfere with string column deserialization.
+ *
+ * @see GenericIndexedWriter
  */
 public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
 {
@@ -373,6 +378,12 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
     return -(minIndex + 1);
   }
 
+  @Override
+  public boolean isSorted()
+  {
+    return allowReverseLookup;
+  }
+
   @Override
   public Iterator<T> iterator()
   {
@@ -562,6 +573,12 @@ public class GenericIndexed<T> implements CloseableIndexed<T>, Serializer
       return -(minIndex + 1);
     }
 
+    @Override
+    public boolean isSorted()
+    {
+      return allowReverseLookup;
+    }
+
     @Override
     public Iterator<T> iterator()
     {
diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
index 449ed791f3..d62a7e7824 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java
@@ -31,7 +31,6 @@ import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
 import org.apache.druid.segment.serde.MetaSerdeHelper;
-import org.apache.druid.segment.serde.Serializer;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 import org.apache.druid.segment.writeout.WriteOutBytes;
 
@@ -46,8 +45,11 @@ import java.nio.channels.WritableByteChannel;
 
 /**
  * Streams arrays of objects out in the binary format described by {@link GenericIndexed}
+ *
+ * The version {@link EncodedStringDictionaryWriter#VERSION} is reserved and must never be specified as the
+ * {@link GenericIndexed} version byte, else it will interfere with string column deserialization.
  */
-public class GenericIndexedWriter<T> implements Serializer
+public class GenericIndexedWriter<T> implements DictionaryWriter<T>
 {
   private static final int PAGE_SIZE = 4096;
 
@@ -213,6 +215,7 @@ public class GenericIndexedWriter<T> implements Serializer
     }
   }
 
+  @Override
   public void open() throws IOException
   {
     headerOut = segmentWriteOutMedium.makeWriteOutBytes();
@@ -224,12 +227,19 @@ public class GenericIndexedWriter<T> implements Serializer
     objectsSorted = false;
   }
 
+  @Override
+  public boolean isSorted()
+  {
+    return objectsSorted;
+  }
+
   @VisibleForTesting
   void setIntMaxForCasting(final int intMaxForCasting)
   {
     this.intMaxForCasting = intMaxForCasting;
   }
 
+  @Override
   public void write(@Nullable T objectToWrite) throws IOException
   {
     if (objectsSorted && prevObject != null && strategy.compare(prevObject, objectToWrite) >= 0) {
@@ -271,6 +281,7 @@ public class GenericIndexedWriter<T> implements Serializer
   }
 
   @Nullable
+  @Override
   public T get(int index) throws IOException
   {
     long startOffset;
diff --git a/processing/src/main/java/org/apache/druid/segment/data/Indexed.java b/processing/src/main/java/org/apache/druid/segment/data/Indexed.java
index b99fc79266..528160fc53 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/Indexed.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/Indexed.java
@@ -36,27 +36,35 @@ import javax.annotation.Nullable;
 public interface Indexed<T> extends Iterable<T>, HotLoopCallee
 {
 
+  /**
+   * Number of elements in the value set
+   */
   int size();
 
+  /**
+   * Get the value at specified position
+   */
   @CalledFromHotLoop
   @Nullable
   T get(int index);
 
   /**
    * Returns the index of "value" in this Indexed object, or a negative number if the value is not present.
-   * The negative number is not guaranteed to be any particular number. Subclasses may tighten this contract
-   * (GenericIndexed does this).
+   * The negative number is not guaranteed to be any particular number unless {@link #isSorted()} returns true, in
+   * which case it will be a negative number equal to (-(insertion point) - 1), in the manner of Arrays.binarySearch.
    *
    * @param value value to search for
    *
-   * @return index of value, or a negative number
+   * @return index of value, or a negative number (equal to (-(insertion point) - 1) if {@link #isSorted()})
    */
   int indexOf(@Nullable T value);
 
-  @FunctionalInterface
-  interface IndexedGetter<T>
+  /**
+   * Indicates if this value set is sorted, the implication being that the contract of {@link #indexOf} is strenthened
+   * to return a negative number equal to (-(insertion point) - 1) when the value is not present in the set.
+   */
+  default boolean isSorted()
   {
-    @Nullable
-    T get(int id);
+    return false;
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
index c5b2c5de66..2b118a7138 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
@@ -51,6 +51,7 @@ import org.apache.druid.segment.data.CompressedVariableSizedBlobColumn;
 import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSupplier;
 import org.apache.druid.segment.data.FixedIndexed;
 import org.apache.druid.segment.data.GenericIndexed;
+import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.data.ObjectStrategy;
 import org.apache.druid.segment.data.ReadableOffset;
 import org.apache.druid.segment.data.VSizeColumnarInts;
@@ -74,7 +75,8 @@ import java.util.concurrent.ConcurrentHashMap;
  * Implementation of {@link NestedDataComplexColumn} which uses a {@link CompressedVariableSizedBlobColumn} for the
  * 'raw' {@link StructuredData} values and provides selectors for nested 'literal' field columns.
  */
-public final class CompressedNestedDataComplexColumn extends NestedDataComplexColumn
+public final class CompressedNestedDataComplexColumn<TStringDictionary extends Indexed<ByteBuffer>>
+    extends NestedDataComplexColumn
 {
   private final NestedDataColumnMetadata metadata;
   private final Closer closer;
@@ -85,7 +87,7 @@ public final class CompressedNestedDataComplexColumn extends NestedDataComplexCo
   private final GenericIndexed<String> fields;
   private final NestedLiteralTypeInfo fieldInfo;
 
-  private final GenericIndexed<String> stringDictionary;
+  private final TStringDictionary stringDictionary;
   private final FixedIndexed<Long> longDictionary;
   private final FixedIndexed<Double> doubleDictionary;
   private final SmooshedFileMapper fileMapper;
@@ -101,7 +103,7 @@ public final class CompressedNestedDataComplexColumn extends NestedDataComplexCo
       ImmutableBitmap nullValues,
       GenericIndexed<String> fields,
       NestedLiteralTypeInfo fieldInfo,
-      GenericIndexed<String> stringDictionary,
+      TStringDictionary stringDictionary,
       FixedIndexed<Long> longDictionary,
       FixedIndexed<Double> doubleDictionary,
       SmooshedFileMapper fileMapper
@@ -129,7 +131,7 @@ public final class CompressedNestedDataComplexColumn extends NestedDataComplexCo
     return fieldInfo;
   }
 
-  public GenericIndexed<String> getStringDictionary()
+  public TStringDictionary getStringDictionary()
   {
     return stringDictionary;
   }
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDimensionDictionary.java b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDimensionDictionary.java
index fb17424735..4f22577b41 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/GlobalDimensionDictionary.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/GlobalDimensionDictionary.java
@@ -25,6 +25,7 @@ import org.apache.druid.segment.ComparatorSortedDimensionDictionary;
 import org.apache.druid.segment.DimensionDictionary;
 import org.apache.druid.segment.NestedDataColumnIndexer;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.data.GenericIndexed;
 import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.data.IndexedIterable;
 
@@ -44,7 +45,7 @@ public class GlobalDimensionDictionary
 
   public GlobalDimensionDictionary()
   {
-    this.stringDictionary = new ComparatorDimensionDictionary<String>(ColumnType.STRING.getNullableStrategy()) {
+    this.stringDictionary = new ComparatorDimensionDictionary<String>(GenericIndexed.STRING_STRATEGY) {
       @Override
       public long estimateSizeOfValue(String value)
       {
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
index 521740aada..cd0a6ce61f 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java
@@ -36,6 +36,7 @@ import org.apache.druid.segment.IndexMerger;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.ProgressIndicator;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.StringEncodingStrategies;
 import org.apache.druid.segment.column.TypeStrategies;
 import org.apache.druid.segment.column.TypeStrategy;
 import org.apache.druid.segment.column.Types;
@@ -43,6 +44,7 @@ import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.ByteBufferWriter;
 import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSerializer;
 import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.data.DictionaryWriter;
 import org.apache.druid.segment.data.FixedIndexedWriter;
 import org.apache.druid.segment.data.GenericIndexed;
 import org.apache.druid.segment.data.GenericIndexedWriter;
@@ -99,7 +101,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
   private SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> fields;
   private GenericIndexedWriter<String> fieldsWriter;
   private NestedLiteralTypeInfo.Writer fieldsInfoWriter;
-  private GenericIndexedWriter<String> dictionaryWriter;
+  private DictionaryWriter<String> dictionaryWriter;
   private FixedIndexedWriter<Long> longDictionaryWriter;
   private FixedIndexedWriter<Double> doubleDictionaryWriter;
   private CompressedVariableSizedBlobColumnSerializer rawWriter;
@@ -133,7 +135,11 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
     fieldsInfoWriter = new NestedLiteralTypeInfo.Writer(segmentWriteOutMedium);
     fieldsInfoWriter.open();
 
-    dictionaryWriter = new GenericIndexedWriter<>(segmentWriteOutMedium, name, GenericIndexed.STRING_STRATEGY);
+    dictionaryWriter = StringEncodingStrategies.getStringDictionaryWriter(
+        indexSpec.getStringDictionaryEncoding(),
+        segmentWriteOutMedium,
+        name
+    );
     dictionaryWriter.open();
 
     longDictionaryWriter = new FixedIndexedWriter<>(
@@ -310,7 +316,7 @@ public class NestedDataColumnSerializer implements GenericColumnSerializer<Struc
   ) throws IOException
   {
     Preconditions.checkState(closedForWrite, "Not closed yet!");
-
+    Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not sorted?!?");
     // version 3
     channel.write(ByteBuffer.wrap(new byte[]{0x03}));
     channel.write(ByteBuffer.wrap(metadataBytes));
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java
index bd73fd5c88..031e45eed8 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSupplier.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 import org.apache.druid.segment.IndexMerger;
@@ -30,8 +31,11 @@ import org.apache.druid.segment.column.ColumnBuilder;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.ComplexColumn;
+import org.apache.druid.segment.column.StringEncodingStrategy;
 import org.apache.druid.segment.data.CompressedVariableSizedBlobColumnSupplier;
+import org.apache.druid.segment.data.EncodedStringDictionaryWriter;
 import org.apache.druid.segment.data.FixedIndexed;
+import org.apache.druid.segment.data.FrontCodedIndexed;
 import org.apache.druid.segment.data.GenericIndexed;
 
 import java.io.IOException;
@@ -44,7 +48,8 @@ public class NestedDataColumnSupplier implements Supplier<ComplexColumn>
   private final ImmutableBitmap nullValues;
   private final GenericIndexed<String> fields;
   private final NestedLiteralTypeInfo fieldInfo;
-  private final GenericIndexed<String> dictionary;
+  private final GenericIndexed<ByteBuffer> dictionary;
+  private final Supplier<FrontCodedIndexed> frontCodedDictionary;
   private final FixedIndexed<Long> longDictionary;
   private final FixedIndexed<Double> doubleDictionary;
   private final ColumnConfig columnConfig;
@@ -74,7 +79,32 @@ public class NestedDataColumnSupplier implements Supplier<ComplexColumn>
             mapper,
             NestedDataColumnSerializer.STRING_DICTIONARY_FILE_NAME
         );
-        dictionary = GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.STRING_STRATEGY, mapper);
+
+        final int dictionaryStartPosition = stringDictionaryBuffer.position();
+        final byte dictionaryVersion = stringDictionaryBuffer.get();
+
+        if (dictionaryVersion == EncodedStringDictionaryWriter.VERSION) {
+          final byte encodingId = stringDictionaryBuffer.get();
+          if (encodingId == StringEncodingStrategy.FRONT_CODED_ID) {
+            frontCodedDictionary = FrontCodedIndexed.read(stringDictionaryBuffer, metadata.getByteOrder());
+            dictionary = null;
+          } else if (encodingId == StringEncodingStrategy.UTF8_ID) {
+            // this cannot happen naturally right now since generic indexed is written in the 'legacy' format, but
+            // this provides backwards compatibility should we switch at some point in the future to always
+            // writing dictionaryVersion
+            dictionary = GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.BYTE_BUFFER_STRATEGY, mapper);
+            frontCodedDictionary = null;
+          } else {
+            throw new ISE("impossible, unknown encoding strategy id: %s", encodingId);
+          }
+        } else {
+          // legacy format that only supports plain utf8 enoding stored in GenericIndexed and the byte we are reading
+          // as dictionaryVersion is actually also the GenericIndexed version, so we reset start position so the
+          // GenericIndexed version can be correctly read
+          stringDictionaryBuffer.position(dictionaryStartPosition);
+          dictionary = GenericIndexed.read(stringDictionaryBuffer, GenericIndexed.BYTE_BUFFER_STRATEGY, mapper);
+          frontCodedDictionary = null;
+        }
         final ByteBuffer longDictionaryBuffer = loadInternalFile(
             mapper,
             NestedDataColumnSerializer.LONG_DICTIONARY_FILE_NAME
@@ -126,14 +156,14 @@ public class NestedDataColumnSupplier implements Supplier<ComplexColumn>
   @Override
   public ComplexColumn get()
   {
-    return new CompressedNestedDataComplexColumn(
+    return new CompressedNestedDataComplexColumn<>(
         metadata,
         columnConfig,
         compressedRawColumnSupplier,
         nullValues,
         fields,
         fieldInfo,
-        dictionary,
+        frontCodedDictionary == null ? dictionary : frontCodedDictionary.get(),
         longDictionary,
         doubleDictionary,
         fileMapper
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplier.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplier.java
index d3030c66f2..ba042856ed 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplier.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplier.java
@@ -37,6 +37,7 @@ import org.apache.druid.collections.bitmap.BitmapFactory;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.common.guava.GuavaUtils;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.BitmapResultFactory;
 import org.apache.druid.query.filter.DruidDoublePredicate;
 import org.apache.druid.query.filter.DruidLongPredicate;
@@ -60,6 +61,7 @@ import org.apache.druid.segment.data.GenericIndexed;
 import org.apache.druid.segment.data.Indexed;
 
 import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.SortedSet;
@@ -68,14 +70,15 @@ import java.util.SortedSet;
  * Supplies indexes for nested field columns {@link NestedFieldLiteralDictionaryEncodedColumn} of
  * {@link NestedDataComplexColumn}.
  */
-public class NestedFieldLiteralColumnIndexSupplier implements ColumnIndexSupplier
+public class NestedFieldLiteralColumnIndexSupplier<TStringDictionary extends Indexed<ByteBuffer>>
+    implements ColumnIndexSupplier
 {
   @Nullable
   private final ColumnType singleType;
   private final BitmapFactory bitmapFactory;
   private final GenericIndexed<ImmutableBitmap> bitmaps;
   private final FixedIndexed<Integer> dictionary;
-  private final GenericIndexed<String> globalDictionary;
+  private final TStringDictionary globalDictionary;
   private final FixedIndexed<Long> globalLongDictionary;
   private final FixedIndexed<Double> globalDoubleDictionary;
 
@@ -87,7 +90,7 @@ public class NestedFieldLiteralColumnIndexSupplier implements ColumnIndexSupplie
       BitmapFactory bitmapFactory,
       GenericIndexed<ImmutableBitmap> bitmaps,
       FixedIndexed<Integer> dictionary,
-      GenericIndexed<String> globalDictionary,
+      TStringDictionary globalDictionary,
       FixedIndexed<Long> globalLongDictionary,
       FixedIndexed<Double> globalDoubleDictionary
   )
@@ -288,7 +291,7 @@ public class NestedFieldLiteralColumnIndexSupplier implements ColumnIndexSupplie
     {
       int globalIndex = dictionary.get(index);
       if (globalIndex < adjustLongId) {
-        return globalDictionary.get(globalIndex);
+        return StringUtils.fromUtf8Nullable(globalDictionary.get(globalIndex));
       } else if (globalIndex < adjustDoubleId) {
         return String.valueOf(globalLongDictionary.get(globalIndex - adjustLongId));
       } else {
@@ -314,7 +317,7 @@ public class NestedFieldLiteralColumnIndexSupplier implements ColumnIndexSupplie
         public double estimateSelectivity(int totalRows)
         {
           return (double) getBitmap(
-              dictionary.indexOf(globalDictionary.indexOf(value))
+              dictionary.indexOf(globalDictionary.indexOf(StringUtils.toUtf8ByteBuffer(value)))
           ).size() / totalRows;
         }
 
@@ -323,7 +326,7 @@ public class NestedFieldLiteralColumnIndexSupplier implements ColumnIndexSupplie
         {
           return bitmapResultFactory.wrapDimensionValue(
               getBitmap(
-                  dictionary.indexOf(globalDictionary.indexOf(value))
+                  dictionary.indexOf(globalDictionary.indexOf(StringUtils.toUtf8ByteBuffer(value)))
               )
           );
         }
@@ -370,7 +373,7 @@ public class NestedFieldLiteralColumnIndexSupplier implements ColumnIndexSupplie
             {
               while (next < 0 && iterator.hasNext()) {
                 String nextValue = iterator.next();
-                next = dictionary.indexOf(globalDictionary.indexOf(nextValue));
+                next = dictionary.indexOf(globalDictionary.indexOf(StringUtils.toUtf8ByteBuffer(nextValue)));
               }
             }
           };
@@ -390,9 +393,9 @@ public class NestedFieldLiteralColumnIndexSupplier implements ColumnIndexSupplie
     )
     {
       return makeRangeIndex(
-          NullHandling.emptyToNullIfNeeded(startValue),
+          StringUtils.toUtf8ByteBuffer(NullHandling.emptyToNullIfNeeded(startValue)),
           startStrict,
-          NullHandling.emptyToNullIfNeeded(endValue),
+          StringUtils.toUtf8ByteBuffer(NullHandling.emptyToNullIfNeeded(endValue)),
           endStrict,
           globalDictionary,
           0
@@ -414,9 +417,9 @@ public class NestedFieldLiteralColumnIndexSupplier implements ColumnIndexSupplie
         public Iterable<ImmutableBitmap> getBitmapIterable()
         {
           final IntIntPair range = getLocalRangeFromDictionary(
-              startValue,
+              StringUtils.toUtf8ByteBuffer(startValue),
               startStrict,
-              endValue,
+              StringUtils.toUtf8ByteBuffer(endValue),
               endStrict,
               globalDictionary,
               0
@@ -433,7 +436,7 @@ public class NestedFieldLiteralColumnIndexSupplier implements ColumnIndexSupplie
 
             private int findNext()
             {
-              while (currIndex < end && !matcher.apply(globalDictionary.get(dictionary.get(currIndex)))) {
+              while (currIndex < end && !matcher.apply(StringUtils.fromUtf8Nullable(globalDictionary.get(dictionary.get(currIndex))))) {
                 currIndex++;
               }
 
@@ -515,7 +518,7 @@ public class NestedFieldLiteralColumnIndexSupplier implements ColumnIndexSupplie
             {
               while (!nextSet && iterator.hasNext()) {
                 Integer nextValue = iterator.next();
-                nextSet = stringPredicate.apply(globalDictionary.get(nextValue));
+                nextSet = stringPredicate.apply(StringUtils.fromUtf8Nullable(globalDictionary.get(nextValue)));
                 if (nextSet) {
                   next = index;
                 }
@@ -904,7 +907,7 @@ public class NestedFieldLiteralColumnIndexSupplier implements ColumnIndexSupplie
       }
 
       // multi-type, return all that match
-      int globalId = globalDictionary.indexOf(value);
+      int globalId = globalDictionary.indexOf(StringUtils.toUtf8ByteBuffer(value));
       int localId = dictionary.indexOf(globalId);
       if (localId >= 0) {
         intList.add(localId);
@@ -1065,7 +1068,7 @@ public class NestedFieldLiteralColumnIndexSupplier implements ColumnIndexSupplie
                 } else if (nextValue >= adjustLongId) {
                   nextSet = longPredicate.applyLong(globalLongDictionary.get(nextValue - adjustLongId));
                 } else {
-                  nextSet = stringPredicate.apply(globalDictionary.get(nextValue));
+                  nextSet = stringPredicate.apply(StringUtils.fromUtf8Nullable(globalDictionary.get(nextValue)));
                 }
                 if (nextSet) {
                   next = index;
diff --git a/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralDictionaryEncodedColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralDictionaryEncodedColumn.java
index b00ca96e8f..d0fc6cb3fb 100644
--- a/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralDictionaryEncodedColumn.java
+++ b/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldLiteralDictionaryEncodedColumn.java
@@ -27,6 +27,7 @@ import com.google.common.primitives.Floats;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.common.guava.GuavaUtils;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.filter.ValueMatcher;
@@ -45,7 +46,7 @@ import org.apache.druid.segment.data.ColumnarDoubles;
 import org.apache.druid.segment.data.ColumnarInts;
 import org.apache.druid.segment.data.ColumnarLongs;
 import org.apache.druid.segment.data.FixedIndexed;
-import org.apache.druid.segment.data.GenericIndexed;
+import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.data.IndexedInts;
 import org.apache.druid.segment.data.ReadableOffset;
 import org.apache.druid.segment.data.SingleIndexedInt;
@@ -65,9 +66,11 @@ import org.roaringbitmap.PeekableIntIterator;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.BitSet;
 
-public class NestedFieldLiteralDictionaryEncodedColumn implements DictionaryEncodedColumn<String>
+public class NestedFieldLiteralDictionaryEncodedColumn<TStringDictionary extends Indexed<ByteBuffer>>
+    implements DictionaryEncodedColumn<String>
 {
   private final NestedLiteralTypeInfo.TypeSet types;
   @Nullable
@@ -75,7 +78,7 @@ public class NestedFieldLiteralDictionaryEncodedColumn implements DictionaryEnco
   private final ColumnarLongs longsColumn;
   private final ColumnarDoubles doublesColumn;
   private final ColumnarInts column;
-  private final GenericIndexed<String> globalDictionary;
+  private final TStringDictionary globalDictionary;
   private final FixedIndexed<Long> globalLongDictionary;
   private final FixedIndexed<Double> globalDoubleDictionary;
   private final FixedIndexed<Integer> dictionary;
@@ -89,7 +92,7 @@ public class NestedFieldLiteralDictionaryEncodedColumn implements DictionaryEnco
       ColumnarLongs longsColumn,
       ColumnarDoubles doublesColumn,
       ColumnarInts column,
-      GenericIndexed<String> globalDictionary,
+      TStringDictionary globalDictionary,
       FixedIndexed<Long> globalLongDictionary,
       FixedIndexed<Double> globalDoubleDictionary,
       FixedIndexed<Integer> dictionary,
@@ -140,7 +143,7 @@ public class NestedFieldLiteralDictionaryEncodedColumn implements DictionaryEnco
   {
     final int globalId = dictionary.get(id);
     if (globalId < globalDictionary.size()) {
-      return globalDictionary.get(globalId);
+      return StringUtils.fromUtf8Nullable(globalDictionary.get(globalId));
     } else if (globalId < adjustLongId + globalLongDictionary.size()) {
       return String.valueOf(globalLongDictionary.get(globalId - adjustLongId));
     } else {
@@ -173,10 +176,10 @@ public class NestedFieldLiteralDictionaryEncodedColumn implements DictionaryEnco
         case DOUBLE:
           return globalDoubleDictionary.indexOf(Doubles.tryParse(val));
         default:
-          return globalDictionary.indexOf(val);
+          return globalDictionary.indexOf(StringUtils.toUtf8ByteBuffer(val));
       }
     } else {
-      int candidate = globalDictionary.indexOf(val);
+      int candidate = globalDictionary.indexOf(StringUtils.toUtf8ByteBuffer(val));
       if (candidate < 0) {
         candidate = globalLongDictionary.indexOf(GuavaUtils.tryParseLong(val));
       }
@@ -222,7 +225,7 @@ public class NestedFieldLiteralDictionaryEncodedColumn implements DictionaryEnco
           return 0f;
         } else if (globalId < adjustLongId) {
           // try to convert string to float
-          Float f = Floats.tryParse(globalDictionary.get(globalId));
+          Float f = Floats.tryParse(StringUtils.fromUtf8(globalDictionary.get(globalId)));
           return f == null ? 0f : f;
         } else if (globalId < adjustDoubleId) {
           return globalLongDictionary.get(globalId - adjustLongId).floatValue();
@@ -242,7 +245,7 @@ public class NestedFieldLiteralDictionaryEncodedColumn implements DictionaryEnco
           return 0.0;
         } else if (globalId < adjustLongId) {
           // try to convert string to double
-          Double d = Doubles.tryParse(globalDictionary.get(globalId));
+          Double d = Doubles.tryParse(StringUtils.fromUtf8(globalDictionary.get(globalId)));
           return d == null ? 0.0 : d;
         } else if (globalId < adjustDoubleId) {
           return globalLongDictionary.get(globalId - adjustLongId).doubleValue();
@@ -262,7 +265,7 @@ public class NestedFieldLiteralDictionaryEncodedColumn implements DictionaryEnco
           return 0L;
         } else if (globalId < adjustLongId) {
           // try to convert string to long
-          Long l = GuavaUtils.tryParseLong(globalDictionary.get(globalId));
+          Long l = GuavaUtils.tryParseLong(StringUtils.fromUtf8(globalDictionary.get(globalId)));
           return l == null ? 0L : l;
         } else if (globalId < adjustDoubleId) {
           return globalLongDictionary.get(globalId - adjustLongId);
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
index 2358372426..ef130f5428 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedColumnPartSerde.java
@@ -22,14 +22,17 @@ package org.apache.druid.segment.serde;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import com.google.common.primitives.Ints;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.collections.spatial.ImmutableRTree;
 import org.apache.druid.io.Channels;
 import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
 import org.apache.druid.segment.column.ColumnBuilder;
 import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.column.StringEncodingStrategy;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.BitmapSerde;
 import org.apache.druid.segment.data.BitmapSerdeFactory;
@@ -39,6 +42,9 @@ import org.apache.druid.segment.data.ColumnarIntsSerializer;
 import org.apache.druid.segment.data.ColumnarMultiInts;
 import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier;
 import org.apache.druid.segment.data.CompressedVSizeColumnarMultiIntsSupplier;
+import org.apache.druid.segment.data.DictionaryWriter;
+import org.apache.druid.segment.data.EncodedStringDictionaryWriter;
+import org.apache.druid.segment.data.FrontCodedIndexed;
 import org.apache.druid.segment.data.GenericIndexed;
 import org.apache.druid.segment.data.GenericIndexedWriter;
 import org.apache.druid.segment.data.ImmutableRTreeObjectStrategy;
@@ -148,7 +154,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
     @Nullable
     private VERSION version = null;
     @Nullable
-    private GenericIndexedWriter<String> dictionaryWriter = null;
+    private DictionaryWriter<String> dictionaryWriter = null;
     @Nullable
     private ColumnarIntsSerializer valueWriter = null;
     @Nullable
@@ -160,7 +166,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
     @Nullable
     private ByteOrder byteOrder = null;
 
-    public SerializerBuilder withDictionary(GenericIndexedWriter<String> dictionaryWriter)
+    public SerializerBuilder withDictionary(DictionaryWriter<String> dictionaryWriter)
     {
       this.dictionaryWriter = dictionaryWriter;
       return this;
@@ -305,6 +311,39 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
 
         builder.setType(ValueType.STRING);
 
+        final int dictionaryStartPosition = buffer.position();
+        final byte dictionaryVersion = buffer.get();
+
+        if (dictionaryVersion == EncodedStringDictionaryWriter.VERSION) {
+          final byte encodingId = buffer.get();
+          if (encodingId == StringEncodingStrategy.FRONT_CODED_ID) {
+            readFrontCodedColumn(buffer, builder, rVersion, rFlags, hasMultipleValues);
+          } else if (encodingId == StringEncodingStrategy.UTF8_ID) {
+            // this cannot happen naturally right now since generic indexed is written in the 'legacy' format, but
+            // this provides backwards compatibility should we switch at some point in the future to always
+            // writing dictionaryVersion
+            readGenericIndexedColumn(buffer, builder, columnConfig, rVersion, rFlags, hasMultipleValues);
+          } else {
+            throw new ISE("impossible, unknown encoding strategy id: %s", encodingId);
+          }
+        } else {
+          // legacy format that only supports plain utf8 enoding stored in GenericIndexed and the byte we are reading
+          // as dictionaryVersion is actually also the GenericIndexed version, so we reset start position so the
+          // GenericIndexed version can be correctly read
+          buffer.position(dictionaryStartPosition);
+          readGenericIndexedColumn(buffer, builder, columnConfig, rVersion, rFlags, hasMultipleValues);
+        }
+      }
+
+      private void readGenericIndexedColumn(
+          ByteBuffer buffer,
+          ColumnBuilder builder,
+          ColumnConfig columnConfig,
+          VERSION rVersion,
+          int rFlags,
+          boolean hasMultipleValues
+      )
+      {
         // Duplicate the first buffer since we are reading the dictionary twice.
         final GenericIndexed<String> rDictionary = GenericIndexed.read(
             buffer.duplicate(),
@@ -339,10 +378,9 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
             columnConfig.columnCacheSizeBytes()
         );
 
-        builder
-            .setHasMultipleValues(hasMultipleValues)
-            .setHasNulls(firstDictionaryEntry == null)
-            .setDictionaryEncodedColumnSupplier(dictionaryEncodedColumnSupplier);
+        builder.setHasMultipleValues(hasMultipleValues)
+               .setHasNulls(firstDictionaryEntry == null)
+               .setDictionaryEncodedColumnSupplier(dictionaryEncodedColumnSupplier);
 
         GenericIndexed<ImmutableBitmap> rBitmaps = null;
         ImmutableRTree rSpatialIndex = null;
@@ -375,6 +413,71 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
         }
       }
 
+      private void readFrontCodedColumn(
+          ByteBuffer buffer,
+          ColumnBuilder builder,
+          VERSION rVersion,
+          int rFlags,
+          boolean hasMultipleValues
+      )
+      {
+        final Supplier<FrontCodedIndexed> rUtf8Dictionary = FrontCodedIndexed.read(
+            buffer,
+            byteOrder
+        );
+
+        final WritableSupplier<ColumnarInts> rSingleValuedColumn;
+        final WritableSupplier<ColumnarMultiInts> rMultiValuedColumn;
+
+        if (hasMultipleValues) {
+          rMultiValuedColumn = readMultiValuedColumn(rVersion, buffer, rFlags);
+          rSingleValuedColumn = null;
+        } else {
+          rSingleValuedColumn = readSingleValuedColumn(rVersion, buffer);
+          rMultiValuedColumn = null;
+        }
+
+        final boolean hasNulls = rUtf8Dictionary.get().get(0) == null;
+
+        StringFrontCodedDictionaryEncodedColumnSupplier dictionaryEncodedColumnSupplier =
+            new StringFrontCodedDictionaryEncodedColumnSupplier(
+                rUtf8Dictionary,
+                rSingleValuedColumn,
+                rMultiValuedColumn
+            );
+        builder.setHasMultipleValues(hasMultipleValues)
+               .setHasNulls(hasNulls)
+               .setDictionaryEncodedColumnSupplier(dictionaryEncodedColumnSupplier);
+
+        GenericIndexed<ImmutableBitmap> rBitmaps = null;
+        ImmutableRTree rSpatialIndex = null;
+        if (!Feature.NO_BITMAP_INDEX.isSet(rFlags)) {
+          rBitmaps = GenericIndexed.read(
+              buffer,
+              bitmapSerdeFactory.getObjectStrategy(),
+              builder.getFileMapper()
+          );
+        }
+
+        if (buffer.hasRemaining()) {
+          rSpatialIndex = new ImmutableRTreeObjectStrategy(
+              bitmapSerdeFactory.getBitmapFactory()
+          ).fromByteBufferWithSize(buffer);
+        }
+
+        if (rBitmaps != null || rSpatialIndex != null) {
+          builder.setIndexSupplier(
+              new StringFrontCodedColumnIndexSupplier(
+                  bitmapSerdeFactory.getBitmapFactory(),
+                  rUtf8Dictionary,
+                  rBitmaps,
+                  rSpatialIndex
+              ),
+              rBitmaps != null,
+              rSpatialIndex != null
+          );
+        }
+      }
 
       private WritableSupplier<ColumnarInts> readSingleValuedColumn(VERSION version, ByteBuffer buffer)
       {
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedStringIndexSupplier.java b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedStringIndexSupplier.java
index 5c1d2bc08b..ae2a1e1482 100644
--- a/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedStringIndexSupplier.java
+++ b/processing/src/main/java/org/apache/druid/segment/serde/DictionaryEncodedStringIndexSupplier.java
@@ -19,31 +19,22 @@
 
 package org.apache.druid.segment.serde;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
-import it.unimi.dsi.fastutil.ints.IntIntImmutablePair;
-import it.unimi.dsi.fastutil.ints.IntIntPair;
-import it.unimi.dsi.fastutil.ints.IntIterator;
 import org.apache.druid.collections.bitmap.BitmapFactory;
 import org.apache.druid.collections.bitmap.ImmutableBitmap;
 import org.apache.druid.collections.spatial.ImmutableRTree;
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.query.BitmapResultFactory;
-import org.apache.druid.query.filter.DruidPredicateFactory;
-import org.apache.druid.segment.IntListUtils;
 import org.apache.druid.segment.column.BitmapColumnIndex;
 import org.apache.druid.segment.column.ColumnIndexSupplier;
 import org.apache.druid.segment.column.DictionaryEncodedStringValueIndex;
 import org.apache.druid.segment.column.DictionaryEncodedValueIndex;
 import org.apache.druid.segment.column.DruidPredicateIndex;
+import org.apache.druid.segment.column.IndexedStringDictionaryEncodedStringValueIndex;
+import org.apache.druid.segment.column.IndexedStringDruidPredicateIndex;
+import org.apache.druid.segment.column.IndexedUtf8LexicographicalRangeIndex;
+import org.apache.druid.segment.column.IndexedUtf8ValueSetIndex;
 import org.apache.druid.segment.column.LexicographicalRangeIndex;
 import org.apache.druid.segment.column.NullValueIndex;
-import org.apache.druid.segment.column.SimpleBitmapColumnIndex;
 import org.apache.druid.segment.column.SimpleImmutableBitmapIndex;
-import org.apache.druid.segment.column.SimpleImmutableBitmapIterableIndex;
 import org.apache.druid.segment.column.SpatialIndex;
 import org.apache.druid.segment.column.StringValueSetIndex;
 import org.apache.druid.segment.column.Utf8ValueSetIndex;
@@ -52,9 +43,6 @@ import org.apache.druid.segment.data.Indexed;
 
 import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.SortedSet;
 
 public class DictionaryEncodedStringIndexSupplier implements ColumnIndexSupplier
 {
@@ -87,6 +75,9 @@ public class DictionaryEncodedStringIndexSupplier implements ColumnIndexSupplier
   public <T> T as(Class<T> clazz)
   {
     if (bitmaps != null) {
+      final Indexed<String> singleThreadedStrings = dictionary.singleThreaded();
+      final Indexed<ByteBuffer> singleThreadedUtf8 = dictionaryUtf8.singleThreaded();
+      final Indexed<ImmutableBitmap> singleThreadedBitmaps = bitmaps.singleThreaded();
       if (clazz.equals(NullValueIndex.class)) {
         final BitmapColumnIndex nullIndex;
         if (NullHandling.isNullOrEquivalent(dictionary.get(0))) {
@@ -96,20 +87,24 @@ public class DictionaryEncodedStringIndexSupplier implements ColumnIndexSupplier
         }
         return (T) (NullValueIndex) () -> nullIndex;
       } else if (clazz.equals(StringValueSetIndex.class)) {
-        return (T) new GenericIndexedDictionaryEncodedStringValueSetIndex(bitmapFactory, dictionaryUtf8, bitmaps);
+        return (T) new IndexedUtf8ValueSetIndex<>(bitmapFactory, singleThreadedUtf8, singleThreadedBitmaps);
       } else if (clazz.equals(Utf8ValueSetIndex.class)) {
-        return (T) new GenericIndexedDictionaryEncodedStringValueSetIndex(bitmapFactory, dictionaryUtf8, bitmaps);
+        return (T) new IndexedUtf8ValueSetIndex<>(bitmapFactory, singleThreadedUtf8, singleThreadedBitmaps);
       } else if (clazz.equals(DruidPredicateIndex.class)) {
-        return (T) new GenericIndexedDictionaryEncodedStringDruidPredicateIndex(bitmapFactory, dictionary, bitmaps);
+        return (T) new IndexedStringDruidPredicateIndex<>(bitmapFactory, singleThreadedStrings, singleThreadedBitmaps);
       } else if (clazz.equals(LexicographicalRangeIndex.class)) {
-        return (T) new GenericIndexedDictionaryEncodedColumnLexicographicalRangeIndex(
+        return (T) new IndexedUtf8LexicographicalRangeIndex<>(
             bitmapFactory,
-            dictionaryUtf8,
-            bitmaps,
+            singleThreadedUtf8,
+            singleThreadedBitmaps,
             NullHandling.isNullOrEquivalent(dictionary.get(0))
         );
       } else if (clazz.equals(DictionaryEncodedStringValueIndex.class) || clazz.equals(DictionaryEncodedValueIndex.class)) {
-        return (T) new GenericIndexedDictionaryEncodedStringValueIndex(bitmapFactory, dictionary, bitmaps);
+        return (T) new IndexedStringDictionaryEncodedStringValueIndex<>(
+            bitmapFactory,
+            singleThreadedStrings,
+            bitmaps
+        );
       }
     }
     if (indexedTree != null && clazz.equals(SpatialIndex.class)) {
@@ -117,489 +112,4 @@ public class DictionaryEncodedStringIndexSupplier implements ColumnIndexSupplier
     }
     return null;
   }
-
-  private abstract static class BaseGenericIndexedDictionaryEncodedIndex<T>
-  {
-    protected final BitmapFactory bitmapFactory;
-    protected final Indexed<T> dictionary;
-    protected final Indexed<ImmutableBitmap> bitmaps;
-
-    protected BaseGenericIndexedDictionaryEncodedIndex(
-        BitmapFactory bitmapFactory,
-        GenericIndexed<T> dictionary,
-        GenericIndexed<ImmutableBitmap> bitmaps
-    )
-    {
-      this.bitmapFactory = bitmapFactory;
-      this.dictionary = dictionary.singleThreaded();
-      this.bitmaps = bitmaps.singleThreaded();
-    }
-
-    public ImmutableBitmap getBitmap(int idx)
-    {
-      if (idx < 0) {
-        return bitmapFactory.makeEmptyImmutableBitmap();
-      }
-
-      final ImmutableBitmap bitmap = bitmaps.get(idx);
-      return bitmap == null ? bitmapFactory.makeEmptyImmutableBitmap() : bitmap;
-    }
-  }
-
-  public static final class GenericIndexedDictionaryEncodedStringValueIndex
-      extends BaseGenericIndexedDictionaryEncodedIndex<String> implements DictionaryEncodedStringValueIndex
-  {
-    public GenericIndexedDictionaryEncodedStringValueIndex(
-        BitmapFactory bitmapFactory,
-        GenericIndexed<String> dictionary,
-        GenericIndexed<ImmutableBitmap> bitmaps
-    )
-    {
-      super(bitmapFactory, dictionary, bitmaps);
-    }
-
-    @Override
-    public int getCardinality()
-    {
-      return dictionary.size();
-    }
-
-    @Nullable
-    @Override
-    public String getValue(int index)
-    {
-      return dictionary.get(index);
-    }
-
-  }
-
-  public static final class GenericIndexedDictionaryEncodedStringValueSetIndex
-      extends BaseGenericIndexedDictionaryEncodedIndex<ByteBuffer> implements StringValueSetIndex, Utf8ValueSetIndex
-  {
-    private static final int SIZE_WORTH_CHECKING_MIN = 8;
-    // This determines the cut-off point to swtich the merging algorithm from doing binary-search per element in the value
-    // set to doing a sorted merge algorithm between value set and dictionary. The ratio here represents the ratio b/w
-    // the number of elements in value set and the number of elements in the dictionary. The number has been derived
-    // using benchmark in https://github.com/apache/druid/pull/13133. If the ratio is higher than the threshold, we use
-    // sorted merge instead of binary-search based algorithm.
-    private static final double SORTED_MERGE_RATIO_THRESHOLD = 0.12D;
-    private final GenericIndexed<ByteBuffer> genericIndexedDictionary;
-
-    public GenericIndexedDictionaryEncodedStringValueSetIndex(
-        BitmapFactory bitmapFactory,
-        GenericIndexed<ByteBuffer> dictionary,
-        GenericIndexed<ImmutableBitmap> bitmaps
-    )
-    {
-      super(bitmapFactory, dictionary, bitmaps);
-      this.genericIndexedDictionary = dictionary;
-    }
-
-    @Override
-    public BitmapColumnIndex forValue(@Nullable String value)
-    {
-      return new SimpleBitmapColumnIndex()
-      {
-        @Override
-        public double estimateSelectivity(int totalRows)
-        {
-          return Math.min(1, (double) getBitmapForValue().size() / totalRows);
-        }
-
-        @Override
-        public <T> T computeBitmapResult(BitmapResultFactory<T> bitmapResultFactory)
-        {
-
-          return bitmapResultFactory.wrapDimensionValue(getBitmapForValue());
-        }
-
-        private ImmutableBitmap getBitmapForValue()
-        {
-          final ByteBuffer valueUtf8 = value == null ? null : ByteBuffer.wrap(StringUtils.toUtf8(value));
-          final int idx = dictionary.indexOf(valueUtf8);
-          return getBitmap(idx);
-        }
-      };
-    }
-
-    @Override
-    public BitmapColumnIndex forSortedValues(SortedSet<String> values)
-    {
-      return getBitmapColumnIndexForSortedIterableUtf8(
-          Iterables.transform(
-              values,
-              input -> ByteBuffer.wrap(StringUtils.toUtf8(input))
-          ),
-          values.size()
-      );
-    }
-
-    @Override
-    public BitmapColumnIndex forSortedValuesUtf8(SortedSet<ByteBuffer> valuesUtf8)
-    {
-      final SortedSet<ByteBuffer> tailSet;
-
-      if (valuesUtf8.size() >= SIZE_WORTH_CHECKING_MIN) {
-        final ByteBuffer minValueInColumn = dictionary.get(0);
-        tailSet = valuesUtf8.tailSet(minValueInColumn);
-      } else {
-        tailSet = valuesUtf8;
-      }
-
-      return getBitmapColumnIndexForSortedIterableUtf8(tailSet, tailSet.size());
-    }
-
-    /**
-     * Helper used by {@link #forSortedValues} and {@link #forSortedValuesUtf8}.
-     */
-    private BitmapColumnIndex getBitmapColumnIndexForSortedIterableUtf8(Iterable<ByteBuffer> valuesUtf8, int size)
-    {
-      // for large number of in-filter values in comparison to the dictionary size, use the sorted merge algorithm.
-      if (size > SORTED_MERGE_RATIO_THRESHOLD * dictionary.size()) {
-        return new SimpleImmutableBitmapIterableIndex()
-        {
-          @Override
-          public Iterable<ImmutableBitmap> getBitmapIterable()
-          {
-            return () -> new Iterator<ImmutableBitmap>()
-            {
-              final PeekingIterator<ByteBuffer> valuesIterator = Iterators.peekingIterator(valuesUtf8.iterator());
-              final PeekingIterator<ByteBuffer> dictionaryIterator =
-                  Iterators.peekingIterator(genericIndexedDictionary.iterator());
-              int next = -1;
-              int idx = 0;
-
-              @Override
-              public boolean hasNext()
-              {
-                if (next < 0) {
-                  findNext();
-                }
-                return next >= 0;
-              }
-
-              @Override
-              public ImmutableBitmap next()
-              {
-                if (next < 0) {
-                  findNext();
-                  if (next < 0) {
-                    throw new NoSuchElementException();
-                  }
-                }
-                final int swap = next;
-                next = -1;
-                return getBitmap(swap);
-              }
-
-              private void findNext()
-              {
-                while (next < 0 && valuesIterator.hasNext() && dictionaryIterator.hasNext()) {
-                  ByteBuffer nextValue = valuesIterator.peek();
-                  ByteBuffer nextDictionaryKey = dictionaryIterator.peek();
-                  int comparison = GenericIndexed.BYTE_BUFFER_STRATEGY.compare(nextValue, nextDictionaryKey);
-                  if (comparison == 0) {
-                    next = idx;
-                    valuesIterator.next();
-                    break;
-                  } else if (comparison < 0) {
-                    valuesIterator.next();
-                  } else {
-                    dictionaryIterator.next();
-                    idx++;
-                  }
-                }
-              }
-            };
-          }
-        };
-      }
-
-      // if the size of in-filter values is less than the threshold percentage of dictionary size, then use binary search
-      // based lookup per value. The algorithm works well for smaller number of values.
-      return new SimpleImmutableBitmapIterableIndex()
-      {
-        @Override
-        public Iterable<ImmutableBitmap> getBitmapIterable()
-        {
-          return () -> new Iterator<ImmutableBitmap>()
-          {
-            final PeekingIterator<ByteBuffer> valuesIterator = Iterators.peekingIterator(valuesUtf8.iterator());
-            final int dictionarySize = dictionary.size();
-            int next = -1;
-
-            @Override
-            public boolean hasNext()
-            {
-              if (next < 0) {
-                findNext();
-              }
-              return next >= 0;
-            }
-
-            @Override
-            public ImmutableBitmap next()
-            {
-              if (next < 0) {
-                findNext();
-                if (next < 0) {
-                  throw new NoSuchElementException();
-                }
-              }
-              final int swap = next;
-              next = -1;
-              return getBitmap(swap);
-            }
-
-            private void findNext()
-            {
-              while (next < 0 && valuesIterator.hasNext()) {
-                ByteBuffer nextValue = valuesIterator.next();
-                next = dictionary.indexOf(nextValue);
-
-                if (next == -dictionarySize - 1) {
-                  // nextValue is past the end of the dictionary.
-                  // Note: we can rely on indexOf returning (-(insertion point) - 1), even though Indexed doesn't
-                  // guarantee it, because "dictionary" comes from GenericIndexed singleThreaded().
-                  break;
-                }
-              }
-            }
-          };
-        }
-      };
-    }
-  }
-
-  public static final class GenericIndexedDictionaryEncodedStringDruidPredicateIndex
-      extends BaseGenericIndexedDictionaryEncodedIndex<String> implements DruidPredicateIndex
-  {
-    public GenericIndexedDictionaryEncodedStringDruidPredicateIndex(
-        BitmapFactory bitmapFactory,
-        GenericIndexed<String> dictionary,
-        GenericIndexed<ImmutableBitmap> bitmaps
-    )
-    {
-      super(bitmapFactory, dictionary, bitmaps);
-    }
-
-    @Override
-    public BitmapColumnIndex forPredicate(DruidPredicateFactory matcherFactory)
-    {
-      return new SimpleImmutableBitmapIterableIndex()
-      {
-        @Override
-        public Iterable<ImmutableBitmap> getBitmapIterable()
-        {
-          return () -> new Iterator<ImmutableBitmap>()
-          {
-            final Predicate<String> stringPredicate = matcherFactory.makeStringPredicate();
-            final Iterator<String> iterator = dictionary.iterator();
-            @Nullable
-            String next = null;
-            boolean nextSet = false;
-
-            @Override
-            public boolean hasNext()
-            {
-              if (!nextSet) {
-                findNext();
-              }
-              return nextSet;
-            }
-
-            @Override
-            public ImmutableBitmap next()
-            {
-              if (!nextSet) {
-                findNext();
-                if (!nextSet) {
-                  throw new NoSuchElementException();
-                }
-              }
-              nextSet = false;
-              final int idx = dictionary.indexOf(next);
-              if (idx < 0) {
-                return bitmapFactory.makeEmptyImmutableBitmap();
-              }
-
-              final ImmutableBitmap bitmap = bitmaps.get(idx);
-              return bitmap == null ? bitmapFactory.makeEmptyImmutableBitmap() : bitmap;
-            }
-
-            private void findNext()
-            {
-              while (!nextSet && iterator.hasNext()) {
-                String nextValue = iterator.next();
-                nextSet = stringPredicate.apply(nextValue);
-                if (nextSet) {
-                  next = nextValue;
-                }
-              }
-            }
-          };
-        }
-      };
-    }
-  }
-
-  public static final class GenericIndexedDictionaryEncodedColumnLexicographicalRangeIndex
-      extends BaseGenericIndexedDictionaryEncodedIndex<ByteBuffer> implements LexicographicalRangeIndex
-  {
-    private final boolean hasNull;
-
-    public GenericIndexedDictionaryEncodedColumnLexicographicalRangeIndex(
-        BitmapFactory bitmapFactory,
-        GenericIndexed<ByteBuffer> dictionary,
-        GenericIndexed<ImmutableBitmap> bitmaps,
-        boolean hasNull
-    )
-    {
-      super(bitmapFactory, dictionary, bitmaps);
-      this.hasNull = hasNull;
-    }
-
-    @Override
-    public BitmapColumnIndex forRange(
-        @Nullable String startValue,
-        boolean startStrict,
-        @Nullable String endValue,
-        boolean endStrict
-    )
-    {
-      return new SimpleImmutableBitmapIterableIndex()
-      {
-        @Override
-        public Iterable<ImmutableBitmap> getBitmapIterable()
-        {
-          final IntIntPair range = getRange(startValue, startStrict, endValue, endStrict);
-          final int start = range.leftInt(), end = range.rightInt();
-          return () -> new Iterator<ImmutableBitmap>()
-          {
-            final IntIterator rangeIterator = IntListUtils.fromTo(start, end).iterator();
-
-            @Override
-            public boolean hasNext()
-            {
-              return rangeIterator.hasNext();
-            }
-
-            @Override
-            public ImmutableBitmap next()
-            {
-              return getBitmap(rangeIterator.nextInt());
-            }
-          };
-        }
-      };
-    }
-
-    @Override
-    public BitmapColumnIndex forRange(
-        @Nullable String startValue,
-        boolean startStrict,
-        @Nullable String endValue,
-        boolean endStrict,
-        Predicate<String> matcher
-    )
-    {
-      return new SimpleImmutableBitmapIterableIndex()
-      {
-        @Override
-        public Iterable<ImmutableBitmap> getBitmapIterable()
-        {
-          final IntIntPair range = getRange(startValue, startStrict, endValue, endStrict);
-          final int start = range.leftInt(), end = range.rightInt();
-          return () -> new Iterator<ImmutableBitmap>()
-          {
-            int currIndex = start;
-            int found;
-
-            {
-              found = findNext();
-            }
-
-            private int findNext()
-            {
-              while (currIndex < end && !applyMatcher(dictionary.get(currIndex))) {
-                currIndex++;
-              }
-
-              if (currIndex < end) {
-                return currIndex++;
-              } else {
-                return -1;
-              }
-            }
-
-            @Override
-            public boolean hasNext()
-            {
-              return found != -1;
-            }
-
-            @Override
-            public ImmutableBitmap next()
-            {
-              int cur = found;
-
-              if (cur == -1) {
-                throw new NoSuchElementException();
-              }
-
-              found = findNext();
-              return getBitmap(cur);
-            }
-          };
-        }
-
-        private boolean applyMatcher(@Nullable final ByteBuffer valueUtf8)
-        {
-          if (valueUtf8 == null) {
-            return matcher.apply(null);
-          } else {
-            // Duplicate buffer, because StringUtils.fromUtf8 advances the position, and we do not want to do that.
-            return matcher.apply(StringUtils.fromUtf8(valueUtf8.duplicate()));
-          }
-        }
-      };
-    }
-
-    private IntIntPair getRange(
-        @Nullable String startValue,
-        boolean startStrict,
-        @Nullable String endValue,
-        boolean endStrict
-    )
-    {
-      final int firstValue = hasNull ? 1 : 0;
-      int startIndex, endIndex;
-      if (startValue == null) {
-        startIndex = firstValue;
-      } else {
-        final String startValueToUse = NullHandling.emptyToNullIfNeeded(startValue);
-        final int found = dictionary.indexOf(StringUtils.toUtf8ByteBuffer(startValueToUse));
-        if (found >= firstValue) {
-          startIndex = startStrict ? found + 1 : found;
-        } else {
-          startIndex = -(found + 1);
-        }
-      }
-
-      if (endValue == null) {
-        endIndex = dictionary.size();
-      } else {
-        final String endValueToUse = NullHandling.emptyToNullIfNeeded(endValue);
-        final int found = dictionary.indexOf(StringUtils.toUtf8ByteBuffer(endValueToUse));
-        if (found >= firstValue) {
-          endIndex = endStrict ? found : found + 1;
-        } else {
-          endIndex = -(found + 1);
-        }
-      }
-
-      endIndex = Math.max(startIndex, endIndex);
-      return new IntIntImmutablePair(startIndex, endIndex);
-    }
-  }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/StringFrontCodedColumnIndexSupplier.java b/processing/src/main/java/org/apache/druid/segment/serde/StringFrontCodedColumnIndexSupplier.java
new file mode 100644
index 0000000000..894ddb55ff
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/StringFrontCodedColumnIndexSupplier.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde;
+
+import com.google.common.base.Supplier;
+import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.collections.spatial.ImmutableRTree;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.segment.column.BitmapColumnIndex;
+import org.apache.druid.segment.column.ColumnIndexSupplier;
+import org.apache.druid.segment.column.DictionaryEncodedStringValueIndex;
+import org.apache.druid.segment.column.DictionaryEncodedValueIndex;
+import org.apache.druid.segment.column.DruidPredicateIndex;
+import org.apache.druid.segment.column.IndexedStringDictionaryEncodedStringValueIndex;
+import org.apache.druid.segment.column.IndexedStringDruidPredicateIndex;
+import org.apache.druid.segment.column.IndexedUtf8LexicographicalRangeIndex;
+import org.apache.druid.segment.column.IndexedUtf8ValueSetIndex;
+import org.apache.druid.segment.column.LexicographicalRangeIndex;
+import org.apache.druid.segment.column.NullValueIndex;
+import org.apache.druid.segment.column.SimpleImmutableBitmapIndex;
+import org.apache.druid.segment.column.SpatialIndex;
+import org.apache.druid.segment.column.StringEncodingStrategies;
+import org.apache.druid.segment.column.StringValueSetIndex;
+import org.apache.druid.segment.data.FrontCodedIndexed;
+import org.apache.druid.segment.data.GenericIndexed;
+import org.apache.druid.segment.data.Indexed;
+
+import javax.annotation.Nullable;
+
+public class StringFrontCodedColumnIndexSupplier implements ColumnIndexSupplier
+{
+  private final BitmapFactory bitmapFactory;
+  private final Supplier<StringEncodingStrategies.Utf8ToStringIndexed> dictionary;
+  private final Supplier<FrontCodedIndexed> utf8Dictionary;
+
+  @Nullable
+  private final GenericIndexed<ImmutableBitmap> bitmaps;
+
+  @Nullable
+  private final ImmutableRTree indexedTree;
+
+  public StringFrontCodedColumnIndexSupplier(
+      BitmapFactory bitmapFactory,
+      Supplier<FrontCodedIndexed> utf8Dictionary,
+      @Nullable GenericIndexed<ImmutableBitmap> bitmaps,
+      @Nullable ImmutableRTree indexedTree
+  )
+  {
+    this.bitmapFactory = bitmapFactory;
+    this.bitmaps = bitmaps;
+    this.utf8Dictionary = utf8Dictionary;
+    this.dictionary = () -> new StringEncodingStrategies.Utf8ToStringIndexed(this.utf8Dictionary.get());
+    this.indexedTree = indexedTree;
+  }
+
+  @Nullable
+  @Override
+  public <T> T as(Class<T> clazz)
+  {
+    if (bitmaps != null) {
+      final Indexed<ImmutableBitmap> singleThreadedBitmaps = bitmaps.singleThreaded();
+      if (clazz.equals(NullValueIndex.class)) {
+        final BitmapColumnIndex nullIndex;
+        final StringEncodingStrategies.Utf8ToStringIndexed stringDictionary = dictionary.get();
+        if (NullHandling.isNullOrEquivalent(stringDictionary.get(0))) {
+          nullIndex = new SimpleImmutableBitmapIndex(bitmaps.get(0));
+        } else {
+          nullIndex = new SimpleImmutableBitmapIndex(bitmapFactory.makeEmptyImmutableBitmap());
+        }
+        return (T) (NullValueIndex) () -> nullIndex;
+      } else if (clazz.equals(StringValueSetIndex.class)) {
+        return (T) new IndexedUtf8ValueSetIndex<>(
+            bitmapFactory,
+            utf8Dictionary.get(),
+            singleThreadedBitmaps
+        );
+      } else if (clazz.equals(DruidPredicateIndex.class)) {
+        return (T) new IndexedStringDruidPredicateIndex<>(
+            bitmapFactory,
+            dictionary.get(),
+            singleThreadedBitmaps
+        );
+      } else if (clazz.equals(LexicographicalRangeIndex.class)) {
+        final FrontCodedIndexed dict = utf8Dictionary.get();
+        return (T) new IndexedUtf8LexicographicalRangeIndex<>(
+            bitmapFactory,
+            dict,
+            singleThreadedBitmaps,
+            dict.get(0) == null
+        );
+      } else if (clazz.equals(DictionaryEncodedStringValueIndex.class)
+                 || clazz.equals(DictionaryEncodedValueIndex.class)) {
+        return (T) new IndexedStringDictionaryEncodedStringValueIndex<>(
+            bitmapFactory,
+            dictionary.get(),
+            bitmaps
+        );
+      }
+    }
+    if (indexedTree != null && clazz.equals(SpatialIndex.class)) {
+      return (T) (SpatialIndex) () -> indexedTree;
+    }
+    return null;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/StringFrontCodedDictionaryEncodedColumnSupplier.java b/processing/src/main/java/org/apache/druid/segment/serde/StringFrontCodedDictionaryEncodedColumnSupplier.java
new file mode 100644
index 0000000000..622b554d3d
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/StringFrontCodedDictionaryEncodedColumnSupplier.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde;
+
+import com.google.common.base.Supplier;
+import org.apache.druid.segment.column.DictionaryEncodedColumn;
+import org.apache.druid.segment.column.StringFrontCodedDictionaryEncodedColumn;
+import org.apache.druid.segment.data.ColumnarInts;
+import org.apache.druid.segment.data.ColumnarMultiInts;
+import org.apache.druid.segment.data.FrontCodedIndexed;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link DictionaryEncodedColumnSupplier} but for columns using a {@link StringFrontCodedDictionaryEncodedColumn}
+ * instead of the traditional {@link org.apache.druid.segment.column.StringDictionaryEncodedColumn}
+ */
+public class StringFrontCodedDictionaryEncodedColumnSupplier implements Supplier<DictionaryEncodedColumn<?>>
+{
+  private final Supplier<FrontCodedIndexed> utf8Dictionary;
+  private final @Nullable Supplier<ColumnarInts> singleValuedColumn;
+  private final @Nullable Supplier<ColumnarMultiInts> multiValuedColumn;
+
+  public StringFrontCodedDictionaryEncodedColumnSupplier(
+      Supplier<FrontCodedIndexed> utf8Dictionary,
+      @Nullable Supplier<ColumnarInts> singleValuedColumn,
+      @Nullable Supplier<ColumnarMultiInts> multiValuedColumn
+  )
+  {
+    this.utf8Dictionary = utf8Dictionary;
+    this.singleValuedColumn = singleValuedColumn;
+    this.multiValuedColumn = multiValuedColumn;
+  }
+
+  @Override
+  public DictionaryEncodedColumn<?> get()
+  {
+    return new StringFrontCodedDictionaryEncodedColumn(
+        singleValuedColumn != null ? singleValuedColumn.get() : null,
+        multiValuedColumn != null ? multiValuedColumn.get() : null,
+        utf8Dictionary.get()
+    );
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
index 968df8f7e3..bf0e8d4c44 100644
--- a/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
+++ b/processing/src/test/java/org/apache/druid/query/QueryRunnerTestHelper.java
@@ -362,6 +362,7 @@ public class QueryRunnerTestHelper
     final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex();
     final QueryableIndex noRollupMMappedTestIndex = TestIndex.getNoRollupMMappedTestIndex();
     final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex();
+    final QueryableIndex frontCodedMappedTestIndex = TestIndex.getFrontCodedMMappedTestIndex();
     return ImmutableList.of(
         makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, SEGMENT_ID), ("rtIndex")),
         makeQueryRunner(factory, new IncrementalIndexSegment(noRollupRtIndex, SEGMENT_ID), "noRollupRtIndex"),
@@ -371,7 +372,8 @@ public class QueryRunnerTestHelper
             new QueryableIndexSegment(noRollupMMappedTestIndex, SEGMENT_ID),
             "noRollupMMappedTestIndex"
         ),
-        makeQueryRunner(factory, new QueryableIndexSegment(mergedRealtimeIndex, SEGMENT_ID), "mergedRealtimeIndex")
+        makeQueryRunner(factory, new QueryableIndexSegment(mergedRealtimeIndex, SEGMENT_ID), "mergedRealtimeIndex"),
+        makeQueryRunner(factory, new QueryableIndexSegment(frontCodedMappedTestIndex, SEGMENT_ID), "frontCodedMMappedTestIndex")
     );
   }
 
diff --git a/processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java
index 9728f230f0..8e452c4431 100644
--- a/processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java
@@ -82,13 +82,7 @@ public class CustomSegmentizerFactoryTest extends InitializedNullHandlingTest
         data,
         Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"),
         segment,
-        new IndexSpec(
-            null,
-            null,
-            null,
-            null,
-            null
-        ),
+        new IndexSpec(),
         null
     );
 
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexSpecTest.java b/processing/src/test/java/org/apache/druid/segment/IndexSpecTest.java
index 6e481b3b00..b2263867ea 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexSpecTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexSpecTest.java
@@ -40,7 +40,7 @@ public class IndexSpecTest
     final ObjectMapper objectMapper = new DefaultObjectMapper();
     final String json =
         "{ \"bitmap\" : { \"type\" : \"roaring\" }, \"dimensionCompression\" : \"lz4\", \"metricCompression\" : \"lzf\""
-        + ", \"longEncoding\" : \"auto\" }";
+        + ", \"longEncoding\" : \"auto\", \"stringDictionaryEncoding\":{\"type\":\"frontCoded\", \"bucketSize\":16}}";
 
     final IndexSpec spec = objectMapper.readValue(json, IndexSpec.class);
     Assert.assertEquals(new RoaringBitmapSerdeFactory(null), spec.getBitmapSerdeFactory());
diff --git a/processing/src/test/java/org/apache/druid/segment/TestIndex.java b/processing/src/test/java/org/apache/druid/segment/TestIndex.java
index 187cc28bff..a42982dc94 100644
--- a/processing/src/test/java/org/apache/druid/segment/TestIndex.java
+++ b/processing/src/test/java/org/apache/druid/segment/TestIndex.java
@@ -48,6 +48,7 @@ import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFact
 import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
 import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.StringEncodingStrategy;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
@@ -210,6 +211,20 @@ public class TestIndex
       throw new RuntimeException(e);
     }
   });
+  private static Supplier<QueryableIndex> frontCodedMmappedIndex = Suppliers.memoize(
+      () -> persistRealtimeAndLoadMMapped(
+          realtimeIndex.get(),
+          new IndexSpec(
+              null,
+              null,
+              new StringEncodingStrategy.FrontCoded(4),
+              null,
+              null,
+              null,
+              null
+          )
+      )
+  );
 
   public static IncrementalIndex getIncrementalTestIndex()
   {
@@ -246,6 +261,11 @@ public class TestIndex
     return mergedRealtime.get();
   }
 
+  public static QueryableIndex getFrontCodedMMappedTestIndex()
+  {
+    return frontCodedMmappedIndex.get();
+  }
+
   public static IncrementalIndex makeRealtimeIndex(final String resourceFilename)
   {
     return makeRealtimeIndex(resourceFilename, true);
@@ -366,6 +386,11 @@ public class TestIndex
   }
 
   public static QueryableIndex persistRealtimeAndLoadMMapped(IncrementalIndex index)
+  {
+    return persistRealtimeAndLoadMMapped(index, INDEX_SPEC);
+  }
+
+  public static QueryableIndex persistRealtimeAndLoadMMapped(IncrementalIndex index, IndexSpec indexSpec)
   {
     try {
       File someTmpFile = File.createTempFile("billy", "yay");
@@ -373,7 +398,7 @@ public class TestIndex
       FileUtils.mkdirp(someTmpFile);
       someTmpFile.deleteOnExit();
 
-      INDEX_MERGER.persist(index, someTmpFile, INDEX_SPEC, null);
+      INDEX_MERGER.persist(index, someTmpFile, indexSpec, null);
       return INDEX_IO.loadIndex(someTmpFile);
     }
     catch (IOException e) {
diff --git a/processing/src/test/java/org/apache/druid/segment/data/FrontCodedIndexedTest.java b/processing/src/test/java/org/apache/druid/segment/data/FrontCodedIndexedTest.java
new file mode 100644
index 0000000000..ece2ad99be
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/data/FrontCodedIndexedTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+
+@RunWith(Parameterized.class)
+public class FrontCodedIndexedTest extends InitializedNullHandlingTest
+{
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(new Object[]{ByteOrder.LITTLE_ENDIAN}, new Object[]{ByteOrder.BIG_ENDIAN});
+  }
+
+  private final ByteOrder order;
+
+  public FrontCodedIndexedTest(ByteOrder byteOrder)
+  {
+    this.order = byteOrder;
+  }
+
+  @Test
+  public void testFrontCodedIndexed() throws IOException
+  {
+    ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
+    List<String> theList = ImmutableList.of("hello", "helloo", "hellooo", "hellooz", "helloozy");
+    fillBuffer(buffer, theList, 4);
+
+    buffer.position(0);
+    FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
+        buffer,
+        buffer.order()
+    ).get();
+    Assert.assertEquals("helloo", StringUtils.fromUtf8(codedUtf8Indexed.get(1)));
+    Assert.assertEquals("helloozy", StringUtils.fromUtf8(codedUtf8Indexed.get(4)));
+
+    Iterator<ByteBuffer> utf8Iterator = codedUtf8Indexed.iterator();
+    Iterator<String> newListIterator = theList.iterator();
+    int ctr = 0;
+    while (newListIterator.hasNext() && utf8Iterator.hasNext()) {
+      final String next = newListIterator.next();
+      final ByteBuffer nextUtf8 = utf8Iterator.next();
+      Assert.assertEquals(next, StringUtils.fromUtf8(nextUtf8));
+      nextUtf8.position(0);
+      Assert.assertEquals(next, StringUtils.fromUtf8(codedUtf8Indexed.get(ctr)));
+      Assert.assertEquals(ctr, codedUtf8Indexed.indexOf(nextUtf8));
+      ctr++;
+    }
+    Assert.assertEquals(newListIterator.hasNext(), utf8Iterator.hasNext());
+  }
+
+
+  @Test
+  public void testFrontCodedIndexedSingleBucket() throws IOException
+  {
+    ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
+    List<String> theList = ImmutableList.of("hello", "helloo", "hellooo", "hellooz", "helloozy");
+    fillBuffer(buffer, theList, 16);
+
+    FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
+        buffer,
+        buffer.order()
+    ).get();
+    Assert.assertEquals("helloo", StringUtils.fromUtf8(codedUtf8Indexed.get(1)));
+    Assert.assertEquals("helloozy", StringUtils.fromUtf8(codedUtf8Indexed.get(4)));
+
+    Iterator<String> newListIterator = theList.iterator();
+    Iterator<ByteBuffer> utf8Iterator = codedUtf8Indexed.iterator();
+    int ctr = 0;
+    while (utf8Iterator.hasNext() && newListIterator.hasNext()) {
+      final String next = newListIterator.next();
+      final ByteBuffer nextUtf8 = utf8Iterator.next();
+      Assert.assertEquals(next, StringUtils.fromUtf8(nextUtf8));
+      nextUtf8.position(0);
+      Assert.assertEquals(next, StringUtils.fromUtf8(codedUtf8Indexed.get(ctr)));
+      Assert.assertEquals(ctr, codedUtf8Indexed.indexOf(nextUtf8));
+      ctr++;
+    }
+    Assert.assertEquals(newListIterator.hasNext(), utf8Iterator.hasNext());
+  }
+
+  @Test
+  public void testFrontCodedIndexedBigger() throws IOException
+  {
+    final int sizeBase = 10000;
+    final int bucketSize = 16;
+    final ByteBuffer buffer = ByteBuffer.allocate(1 << 24).order(order);
+    for (int sizeAdjust = 0; sizeAdjust < bucketSize; sizeAdjust++) {
+      final TreeSet<String> values = new TreeSet<>(GenericIndexed.STRING_STRATEGY);
+      for (int i = 0; i < sizeBase + sizeAdjust; i++) {
+        values.add(IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId());
+      }
+      fillBuffer(buffer, values, bucketSize);
+
+      FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
+          buffer,
+          buffer.order()
+      ).get();
+
+      Iterator<String> newListIterator = values.iterator();
+      Iterator<ByteBuffer> utf8Iterator = codedUtf8Indexed.iterator();
+      int ctr = 0;
+      while (utf8Iterator.hasNext() && newListIterator.hasNext()) {
+        final String next = newListIterator.next();
+        final ByteBuffer nextUtf8 = utf8Iterator.next();
+        Assert.assertEquals(next, StringUtils.fromUtf8(nextUtf8));
+        nextUtf8.position(0);
+        Assert.assertEquals(next, StringUtils.fromUtf8(codedUtf8Indexed.get(ctr)));
+        Assert.assertEquals(ctr, codedUtf8Indexed.indexOf(nextUtf8));
+        ctr++;
+      }
+      Assert.assertEquals(newListIterator.hasNext(), utf8Iterator.hasNext());
+      Assert.assertEquals(ctr, sizeBase + sizeAdjust);
+    }
+  }
+
+  @Test
+  public void testFrontCodedIndexedBiggerWithNulls() throws IOException
+  {
+    final int sizeBase = 10000;
+    final int bucketSize = 16;
+    final ByteBuffer buffer = ByteBuffer.allocate(1 << 24).order(order);
+    for (int sizeAdjust = 0; sizeAdjust < bucketSize; sizeAdjust++) {
+      TreeSet<String> values = new TreeSet<>(GenericIndexed.STRING_STRATEGY);
+      values.add(null);
+      for (int i = 0; i < sizeBase + sizeAdjust; i++) {
+        values.add(IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId() + IdUtils.getRandomId());
+      }
+      fillBuffer(buffer, values, 4);
+
+      FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
+          buffer,
+          buffer.order()
+      ).get();
+
+      Iterator<String> newListIterator = values.iterator();
+      Iterator<ByteBuffer> utf8Iterator = codedUtf8Indexed.iterator();
+      int ctr = 0;
+      while (utf8Iterator.hasNext() && newListIterator.hasNext()) {
+        final String next = newListIterator.next();
+        final ByteBuffer nextUtf8 = utf8Iterator.next();
+        if (next == null) {
+          Assert.assertNull(nextUtf8);
+        } else {
+          Assert.assertEquals(next, StringUtils.fromUtf8(nextUtf8));
+          nextUtf8.position(0);
+          Assert.assertEquals(next, StringUtils.fromUtf8(codedUtf8Indexed.get(ctr)));
+        }
+        Assert.assertEquals(ctr, codedUtf8Indexed.indexOf(nextUtf8));
+        ctr++;
+      }
+      Assert.assertEquals(newListIterator.hasNext(), utf8Iterator.hasNext());
+      Assert.assertEquals(ctr, sizeBase + sizeAdjust + 1);
+    }
+  }
+
+  @Test
+  public void testFrontCodedIndexedIndexOf() throws IOException
+  {
+    ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
+    List<String> theList = ImmutableList.of("hello", "helloo", "hellooo", "hellooz", "helloozy");
+
+    fillBuffer(buffer, theList, 4);
+
+    FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
+        buffer,
+        buffer.order()
+    ).get();
+    Assert.assertEquals(-1, codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer("a")));
+    Assert.assertEquals(0, codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer("hello")));
+    Assert.assertEquals(1, codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer("helloo")));
+    Assert.assertEquals(-3, codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer("helloob")));
+    Assert.assertEquals(4, codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer("helloozy")));
+    Assert.assertEquals(-6, codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer("helloozz")));
+    Assert.assertEquals(-6, codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer("wat")));
+  }
+
+
+  @Test
+  public void testFrontCodedIndexedIndexOfWithNull() throws IOException
+  {
+    ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
+    List<String> theList = ImmutableList.of("hello", "helloo", "hellooo", "hellooz", "helloozy");
+    TreeSet<String> values = new TreeSet<>(GenericIndexed.STRING_STRATEGY);
+    values.add(null);
+    values.addAll(theList);
+    fillBuffer(buffer, values, 4);
+
+    FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
+        buffer,
+        buffer.order()
+    ).get();
+    Assert.assertEquals(0, codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer(null)));
+    Assert.assertEquals(-2, codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer("a")));
+    Assert.assertEquals(1, codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer("hello")));
+    Assert.assertEquals(2, codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer("helloo")));
+    Assert.assertEquals(-4, codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer("helloob")));
+    Assert.assertEquals(5, codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer("helloozy")));
+    Assert.assertEquals(-7, codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer("helloozz")));
+    Assert.assertEquals(-7, codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer("wat")));
+  }
+
+  @Test
+  public void testFrontCodedIndexedUnicodes() throws IOException
+  {
+    ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
+    List<String> theList = ImmutableList.of("Győ-Moson-Sopron", "Győr");
+    fillBuffer(buffer, theList, 4);
+
+    buffer.position(0);
+    FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
+        buffer,
+        buffer.order()
+    ).get();
+
+    Iterator<ByteBuffer> utf8Iterator = codedUtf8Indexed.iterator();
+    Iterator<String> newListIterator = theList.iterator();
+    int ctr = 0;
+    while (newListIterator.hasNext() && utf8Iterator.hasNext()) {
+      final String next = newListIterator.next();
+      final ByteBuffer nextUtf8 = utf8Iterator.next();
+      Assert.assertEquals(next, StringUtils.fromUtf8(nextUtf8));
+      nextUtf8.position(0);
+      Assert.assertEquals(next, StringUtils.fromUtf8(codedUtf8Indexed.get(ctr)));
+      Assert.assertEquals(ctr, codedUtf8Indexed.indexOf(nextUtf8));
+      ctr++;
+    }
+    Assert.assertEquals(newListIterator.hasNext(), utf8Iterator.hasNext());
+  }
+
+  private static long fillBuffer(ByteBuffer buffer, Iterable<String> sortedIterable, int bucketSize) throws IOException
+  {
+    Iterator<String> sortedStrings = sortedIterable.iterator();
+    buffer.position(0);
+    OnHeapMemorySegmentWriteOutMedium medium = new OnHeapMemorySegmentWriteOutMedium();
+    FrontCodedIndexedWriter writer = new FrontCodedIndexedWriter(
+        medium,
+        buffer.order(),
+        bucketSize
+    );
+    writer.open();
+    int index = 0;
+    while (sortedStrings.hasNext()) {
+      final String next = sortedStrings.next();
+      final byte[] nextBytes = StringUtils.toUtf8Nullable(next);
+      writer.write(nextBytes);
+      if (nextBytes == null) {
+        Assert.assertNull(writer.get(index));
+      } else {
+        Assert.assertArrayEquals(nextBytes, writer.get(index));
+      }
+      index++;
+    }
+
+    // check 'get' again so that we aren't always reading from current page
+    index = 0;
+    sortedStrings = sortedIterable.iterator();
+    while (sortedStrings.hasNext()) {
+      final String next = sortedStrings.next();
+      final byte[] nextBytes = StringUtils.toUtf8Nullable(next);
+      if (nextBytes == null) {
+        Assert.assertNull("row " + index, writer.get(index));
+      } else {
+        Assert.assertArrayEquals("row " + index, nextBytes, writer.get(index));
+      }
+      index++;
+    }
+
+    WritableByteChannel channel = new WritableByteChannel()
+    {
+      @Override
+      public int write(ByteBuffer src)
+      {
+        int size = src.remaining();
+        buffer.put(src);
+        return size;
+      }
+
+      @Override
+      public boolean isOpen()
+      {
+        return true;
+      }
+
+      @Override
+      public void close()
+      {
+      }
+    };
+    long size = writer.getSerializedSize();
+    buffer.position(0);
+    writer.writeTo(channel, null);
+    Assert.assertEquals(size, buffer.position());
+    buffer.position(0);
+    return size;
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/filter/BaseFilterTest.java b/processing/src/test/java/org/apache/druid/segment/filter/BaseFilterTest.java
index eb639023f1..ac998023cb 100644
--- a/processing/src/test/java/org/apache/druid/segment/filter/BaseFilterTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/filter/BaseFilterTest.java
@@ -81,6 +81,7 @@ import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.BitmapColumnIndex;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.column.StringEncodingStrategy;
 import org.apache.druid.segment.data.BitmapSerdeFactory;
 import org.apache.druid.segment.data.ConciseBitmapSerdeFactory;
 import org.apache.druid.segment.data.IndexedInts;
@@ -347,6 +348,10 @@ public abstract class BaseFilterTest extends InitializedNullHandlingTest
             })
             .build();
 
+    StringEncodingStrategy[] stringEncoding = new StringEncodingStrategy[]{
+        new StringEncodingStrategy.Utf8(),
+        new StringEncodingStrategy.FrontCoded(4)
+    };
     for (Map.Entry<String, BitmapSerdeFactory> bitmapSerdeFactoryEntry : bitmapSerdeFactories.entrySet()) {
       for (Map.Entry<String, SegmentWriteOutMediumFactory> segmentWriteOutMediumFactoryEntry :
           segmentWriteOutMediumFactories.entrySet()) {
@@ -354,20 +359,33 @@ public abstract class BaseFilterTest extends InitializedNullHandlingTest
             finishers.entrySet()) {
           for (boolean cnf : ImmutableList.of(false, true)) {
             for (boolean optimize : ImmutableList.of(false, true)) {
-              final String testName = StringUtils.format(
-                  "bitmaps[%s], indexMerger[%s], finisher[%s], cnf[%s], optimize[%s]",
-                  bitmapSerdeFactoryEntry.getKey(),
-                  segmentWriteOutMediumFactoryEntry.getKey(),
-                  finisherEntry.getKey(),
-                  cnf,
-                  optimize
-              );
-              final IndexBuilder indexBuilder = IndexBuilder
-                  .create()
-                  .schema(DEFAULT_INDEX_SCHEMA)
-                  .indexSpec(new IndexSpec(bitmapSerdeFactoryEntry.getValue(), null, null, null))
-                  .segmentWriteOutMediumFactory(segmentWriteOutMediumFactoryEntry.getValue());
-              constructors.add(new Object[]{testName, indexBuilder, finisherEntry.getValue(), cnf, optimize});
+              for (StringEncodingStrategy encodingStrategy : stringEncoding) {
+                final String testName = StringUtils.format(
+                    "bitmaps[%s], indexMerger[%s], finisher[%s], cnf[%s], optimize[%s], stringDictionaryEncoding[%s]",
+                    bitmapSerdeFactoryEntry.getKey(),
+                    segmentWriteOutMediumFactoryEntry.getKey(),
+                    finisherEntry.getKey(),
+                    cnf,
+                    optimize,
+                    encodingStrategy.getType()
+                );
+                final IndexBuilder indexBuilder = IndexBuilder
+                    .create()
+                    .schema(DEFAULT_INDEX_SCHEMA)
+                    .indexSpec(
+                        new IndexSpec(
+                            bitmapSerdeFactoryEntry.getValue(),
+                            null,
+                            encodingStrategy,
+                            null,
+                            null,
+                            null,
+                            null
+                        )
+                    )
+                    .segmentWriteOutMediumFactory(segmentWriteOutMediumFactoryEntry.getValue());
+                constructors.add(new Object[]{testName, indexBuilder, finisherEntry.getValue(), cnf, optimize});
+              }
             }
           }
         }
diff --git a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
index c3fb24bb68..0f2954b9c4 100644
--- a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
@@ -54,6 +54,7 @@ import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.StringEncodingStrategy;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
@@ -95,9 +96,20 @@ public class SpatialFilterTest extends InitializedNullHandlingTest
   public static Collection<?> constructorFeeder() throws IOException
   {
     final IndexSpec indexSpec = new IndexSpec();
+    final IndexSpec frontCodedIndexSpec = new IndexSpec(
+        null,
+        null,
+        new StringEncodingStrategy.FrontCoded(4),
+        null,
+        null,
+        null,
+        null
+    );
     final IncrementalIndex rtIndex = makeIncrementalIndex();
     final QueryableIndex mMappedTestIndex = makeQueryableIndex(indexSpec);
     final QueryableIndex mergedRealtimeIndex = makeMergedQueryableIndex(indexSpec);
+    final QueryableIndex mMappedTestIndexFrontCoded = makeQueryableIndex(frontCodedIndexSpec);
+    final QueryableIndex mergedRealtimeIndexFrontCoded = makeMergedQueryableIndex(frontCodedIndexSpec);
     return Arrays.asList(
         new Object[][]{
             {
@@ -108,6 +120,12 @@ public class SpatialFilterTest extends InitializedNullHandlingTest
             },
             {
                 new QueryableIndexSegment(mergedRealtimeIndex, null)
+            },
+            {
+                new QueryableIndexSegment(mMappedTestIndexFrontCoded, null)
+            },
+            {
+                new QueryableIndexSegment(mergedRealtimeIndexFrontCoded, null)
             }
         }
     );
diff --git a/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java
index 1856325dae..9f3851bd64 100644
--- a/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java
+++ b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java
@@ -37,7 +37,6 @@ import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.serde.ComplexMetrics;
@@ -113,7 +112,18 @@ public class SegmentGenerator implements Closeable
       final int numRows
   )
   {
-    return generate(dataSegment, schemaInfo, schemaInfo.getDimensionsSpec(), TransformSpec.NONE, granularity, numRows);
+    return generate(dataSegment, schemaInfo, schemaInfo.getDimensionsSpec(), TransformSpec.NONE, new IndexSpec(), granularity, numRows);
+  }
+
+  public QueryableIndex generate(
+      final DataSegment dataSegment,
+      final GeneratorSchemaInfo schemaInfo,
+      final IndexSpec indexSpec,
+      final Granularity granularity,
+      final int numRows
+  )
+  {
+    return generate(dataSegment, schemaInfo, schemaInfo.getDimensionsSpec(), TransformSpec.NONE, indexSpec, granularity, numRows);
   }
 
   public QueryableIndex generate(
@@ -121,6 +131,7 @@ public class SegmentGenerator implements Closeable
       final GeneratorSchemaInfo schemaInfo,
       final DimensionsSpec dimensionsSpec,
       final TransformSpec transformSpec,
+      final IndexSpec indexSpec,
       final Granularity queryGranularity,
       final int numRows
   )
@@ -135,6 +146,7 @@ public class SegmentGenerator implements Closeable
                                    .putString(schemaInfo.toString(), StandardCharsets.UTF_8)
                                    .putString(dimensionsSpec.toString(), StandardCharsets.UTF_8)
                                    .putString(queryGranularity.toString(), StandardCharsets.UTF_8)
+                                   .putString(indexSpec.toString(), StandardCharsets.UTF_8)
                                    .putInt(numRows)
                                    .hash()
                                    .toString();
@@ -186,7 +198,7 @@ public class SegmentGenerator implements Closeable
       }
 
       if (rows.size() % MAX_ROWS_IN_MEMORY == 0) {
-        indexes.add(makeIndex(dataSegment.getId(), dataHash, indexes.size(), rows, indexSchema));
+        indexes.add(makeIndex(dataSegment.getId(), dataHash, indexes.size(), rows, indexSchema, indexSpec));
         rows.clear();
       }
     }
@@ -194,7 +206,7 @@ public class SegmentGenerator implements Closeable
     log.info("%,d/%,d rows generated for[%s].", numRows, numRows, dataSegment);
 
     if (rows.size() > 0) {
-      indexes.add(makeIndex(dataSegment.getId(), dataHash, indexes.size(), rows, indexSchema));
+      indexes.add(makeIndex(dataSegment.getId(), dataHash, indexes.size(), rows, indexSchema, indexSpec));
       rows.clear();
     }
 
@@ -204,8 +216,6 @@ public class SegmentGenerator implements Closeable
       throw new ISE("No rows to index?");
     } else {
       try {
-        final IndexSpec indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null);
-
         retVal = TestHelper
             .getTestIndexIO()
             .loadIndex(
@@ -305,12 +315,14 @@ public class SegmentGenerator implements Closeable
       final String dataHash,
       final int indexNumber,
       final List<InputRow> rows,
-      final IncrementalIndexSchema indexSchema
+      final IncrementalIndexSchema indexSchema,
+      final IndexSpec indexSpec
   )
   {
     return IndexBuilder
         .create()
         .schema(indexSchema)
+        .indexSpec(indexSpec)
         .tmpDir(new File(getSegmentDir(identifier, dataHash), String.valueOf(indexNumber)))
         .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
         .rows(rows)
diff --git a/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplierTest.java b/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplierTest.java
index 850baf4412..f77ecf0906 100644
--- a/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplierTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/nested/NestedFieldLiteralColumnIndexSupplierTest.java
@@ -41,6 +41,7 @@ import org.apache.druid.segment.data.FixedIndexed;
 import org.apache.druid.segment.data.FixedIndexedWriter;
 import org.apache.druid.segment.data.GenericIndexed;
 import org.apache.druid.segment.data.GenericIndexedWriter;
+import org.apache.druid.segment.data.Indexed;
 import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
 import org.apache.druid.segment.serde.Serializer;
 import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
@@ -62,7 +63,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   BitmapResultFactory<ImmutableBitmap> bitmapResultFactory = new DefaultBitmapResultFactory(
       roaringFactory.getBitmapFactory()
   );
-  GenericIndexed<String> globalStrings;
+  Indexed<ByteBuffer> globalStrings;
   FixedIndexed<Long> globalLongs;
   FixedIndexed<Double> globalDoubles;
 
@@ -123,7 +124,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
     doubleWriter.write(9.9);
     writeToBuffer(doubleBuffer, doubleWriter);
 
-    globalStrings = GenericIndexed.read(stringBuffer, GenericIndexed.STRING_STRATEGY);
+    globalStrings = GenericIndexed.read(stringBuffer, GenericIndexed.BYTE_BUFFER_STRATEGY);
     globalLongs = FixedIndexed.read(longBuffer, TypeStrategies.LONG, ByteOrder.nativeOrder(), Long.BYTES);
     globalDoubles = FixedIndexed.read(doubleBuffer, TypeStrategies.DOUBLE, ByteOrder.nativeOrder(), Double.BYTES);
   }
@@ -131,7 +132,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeStringColumnValueIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeStringSupplier();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeStringSupplier();
 
     NullValueIndex nullIndex = indexSupplier.as(NullValueIndex.class);
     Assert.assertNotNull(nullIndex);
@@ -150,7 +151,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeStringColumnValueSetIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeStringSupplier();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeStringSupplier();
 
     StringValueSetIndex valueSetIndex = indexSupplier.as(StringValueSetIndex.class);
     Assert.assertNotNull(valueSetIndex);
@@ -183,7 +184,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeStringColumnRangeIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeStringSupplier();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeStringSupplier();
 
     LexicographicalRangeIndex rangeIndex = indexSupplier.as(LexicographicalRangeIndex.class);
     Assert.assertNotNull(rangeIndex);
@@ -254,7 +255,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeStringColumnRangeIndexWithPredicate() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeStringSupplier();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeStringSupplier();
 
     LexicographicalRangeIndex rangeIndex = indexSupplier.as(LexicographicalRangeIndex.class);
     Assert.assertNotNull(rangeIndex);
@@ -322,7 +323,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeStringColumnPredicateIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeStringSupplier();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeStringSupplier();
 
     DruidPredicateIndex predicateIndex = indexSupplier.as(DruidPredicateIndex.class);
     Assert.assertNotNull(predicateIndex);
@@ -345,7 +346,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeStringColumnWithNullValueIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeStringWithNullsSupplier();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeStringWithNullsSupplier();
 
     NullValueIndex nullIndex = indexSupplier.as(NullValueIndex.class);
     Assert.assertNotNull(nullIndex);
@@ -364,7 +365,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeStringColumnWithNullValueSetIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeStringWithNullsSupplier();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeStringWithNullsSupplier();
 
     StringValueSetIndex valueSetIndex = indexSupplier.as(StringValueSetIndex.class);
     Assert.assertNotNull(valueSetIndex);
@@ -397,7 +398,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleValueStringWithNullRangeIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeStringWithNullsSupplier();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeStringWithNullsSupplier();
 
     LexicographicalRangeIndex rangeIndex = indexSupplier.as(LexicographicalRangeIndex.class);
     Assert.assertNotNull(rangeIndex);
@@ -469,7 +470,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleValueStringWithNullPredicateIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeStringWithNullsSupplier();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeStringWithNullsSupplier();
 
     DruidPredicateIndex predicateIndex = indexSupplier.as(DruidPredicateIndex.class);
     Assert.assertNotNull(predicateIndex);
@@ -492,7 +493,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeLongColumnValueSetIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeLongSupplier();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeLongSupplier();
 
     StringValueSetIndex valueSetIndex = indexSupplier.as(StringValueSetIndex.class);
     Assert.assertNotNull(valueSetIndex);
@@ -518,7 +519,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeLongColumnRangeIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeLongSupplier();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeLongSupplier();
 
     NumericRangeIndex rangeIndex = indexSupplier.as(NumericRangeIndex.class);
     Assert.assertNotNull(rangeIndex);
@@ -548,7 +549,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeLongColumnPredicateIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeLongSupplier();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeLongSupplier();
 
     DruidPredicateIndex predicateIndex = indexSupplier.as(DruidPredicateIndex.class);
     Assert.assertNotNull(predicateIndex);
@@ -571,7 +572,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeLongColumnWithNullValueIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeLongSupplierWithNull();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeLongSupplierWithNull();
 
     NullValueIndex nullIndex = indexSupplier.as(NullValueIndex.class);
     Assert.assertNotNull(nullIndex);
@@ -590,7 +591,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeLongColumnWithNullValueSetIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeLongSupplierWithNull();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeLongSupplierWithNull();
 
     StringValueSetIndex valueSetIndex = indexSupplier.as(StringValueSetIndex.class);
     Assert.assertNotNull(valueSetIndex);
@@ -616,7 +617,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleValueLongWithNullRangeIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeLongSupplierWithNull();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeLongSupplierWithNull();
 
     NumericRangeIndex rangeIndex = indexSupplier.as(NumericRangeIndex.class);
     Assert.assertNotNull(rangeIndex);
@@ -646,7 +647,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleValueLongWithNullPredicateIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeLongSupplierWithNull();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeLongSupplierWithNull();
 
     DruidPredicateIndex predicateIndex = indexSupplier.as(DruidPredicateIndex.class);
     Assert.assertNotNull(predicateIndex);
@@ -669,7 +670,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeDoubleColumnValueSetIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeDoubleSupplier();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeDoubleSupplier();
 
     StringValueSetIndex valueSetIndex = indexSupplier.as(StringValueSetIndex.class);
     Assert.assertNotNull(valueSetIndex);
@@ -695,7 +696,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeDoubleColumnRangeIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeDoubleSupplier();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeDoubleSupplier();
 
     NumericRangeIndex rangeIndex = indexSupplier.as(NumericRangeIndex.class);
     Assert.assertNotNull(rangeIndex);
@@ -739,7 +740,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeDoubleColumnPredicateIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeDoubleSupplier();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeDoubleSupplier();
 
     DruidPredicateIndex predicateIndex = indexSupplier.as(DruidPredicateIndex.class);
     Assert.assertNotNull(predicateIndex);
@@ -762,7 +763,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeDoubleColumnWithNullValueIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeDoubleSupplierWithNull();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeDoubleSupplierWithNull();
 
     NullValueIndex nullIndex = indexSupplier.as(NullValueIndex.class);
     Assert.assertNotNull(nullIndex);
@@ -781,7 +782,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleTypeDoubleColumnWithNullValueSetIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeDoubleSupplierWithNull();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeDoubleSupplierWithNull();
 
     StringValueSetIndex valueSetIndex = indexSupplier.as(StringValueSetIndex.class);
     Assert.assertNotNull(valueSetIndex);
@@ -807,7 +808,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleValueDoubleWithNullRangeIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeDoubleSupplierWithNull();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeDoubleSupplierWithNull();
 
     NumericRangeIndex rangeIndex = indexSupplier.as(NumericRangeIndex.class);
     Assert.assertNotNull(rangeIndex);
@@ -837,7 +838,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testSingleValueDoubleWithNullPredicateIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeSingleTypeDoubleSupplierWithNull();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeSingleTypeDoubleSupplierWithNull();
 
     DruidPredicateIndex predicateIndex = indexSupplier.as(DruidPredicateIndex.class);
     Assert.assertNotNull(predicateIndex);
@@ -860,7 +861,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testVariantNullValueIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeVariantSupplierWithNull();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeVariantSupplierWithNull();
 
     NullValueIndex nullIndex = indexSupplier.as(NullValueIndex.class);
     Assert.assertNotNull(nullIndex);
@@ -879,7 +880,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testVariantValueSetIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeVariantSupplierWithNull();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeVariantSupplierWithNull();
 
     StringValueSetIndex valueSetIndex = indexSupplier.as(StringValueSetIndex.class);
     Assert.assertNotNull(valueSetIndex);
@@ -917,7 +918,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testVariantRangeIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeVariantSupplierWithNull();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeVariantSupplierWithNull();
 
     LexicographicalRangeIndex rangeIndex = indexSupplier.as(LexicographicalRangeIndex.class);
     Assert.assertNull(rangeIndex);
@@ -929,7 +930,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testVariantPredicateIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeVariantSupplierWithNull();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeVariantSupplierWithNull();
 
     DruidPredicateIndex predicateIndex = indexSupplier.as(DruidPredicateIndex.class);
     Assert.assertNotNull(predicateIndex);
@@ -952,7 +953,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
   @Test
   public void testDictionaryEncodedStringValueIndex() throws IOException
   {
-    NestedFieldLiteralColumnIndexSupplier indexSupplier = makeVariantSupplierWithNull();
+    NestedFieldLiteralColumnIndexSupplier<?> indexSupplier = makeVariantSupplierWithNull();
 
     DictionaryEncodedStringValueIndex lowLevelIndex = indexSupplier.as(DictionaryEncodedStringValueIndex.class);
     Assert.assertNotNull(lowLevelIndex);
@@ -971,7 +972,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
     Assert.assertEquals("9.9", lowLevelIndex.getValue(6));
   }
 
-  private NestedFieldLiteralColumnIndexSupplier makeSingleTypeStringSupplier() throws IOException
+  private NestedFieldLiteralColumnIndexSupplier<?> makeSingleTypeStringSupplier() throws IOException
   {
     ByteBuffer localDictionaryBuffer = ByteBuffer.allocate(1 << 12).order(ByteOrder.nativeOrder());
     ByteBuffer bitmapsBuffer = ByteBuffer.allocate(1 << 12);
@@ -1029,7 +1030,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
 
     GenericIndexed<ImmutableBitmap> bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy());
 
-    return new NestedFieldLiteralColumnIndexSupplier(
+    return new NestedFieldLiteralColumnIndexSupplier<>(
         new NestedLiteralTypeInfo.TypeSet(
             new NestedLiteralTypeInfo.MutableTypeSet().add(ColumnType.STRING).getByteValue()
         ),
@@ -1042,7 +1043,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
     );
   }
 
-  private NestedFieldLiteralColumnIndexSupplier makeSingleTypeStringWithNullsSupplier() throws IOException
+  private NestedFieldLiteralColumnIndexSupplier<?> makeSingleTypeStringWithNullsSupplier() throws IOException
   {
     ByteBuffer localDictionaryBuffer = ByteBuffer.allocate(1 << 12).order(ByteOrder.nativeOrder());
     ByteBuffer bitmapsBuffer = ByteBuffer.allocate(1 << 12);
@@ -1103,7 +1104,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
 
     GenericIndexed<ImmutableBitmap> bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy());
 
-    return new NestedFieldLiteralColumnIndexSupplier(
+    return new NestedFieldLiteralColumnIndexSupplier<>(
         new NestedLiteralTypeInfo.TypeSet(
             new NestedLiteralTypeInfo.MutableTypeSet().add(ColumnType.STRING).getByteValue()
         ),
@@ -1116,7 +1117,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
     );
   }
 
-  private NestedFieldLiteralColumnIndexSupplier makeSingleTypeLongSupplier() throws IOException
+  private NestedFieldLiteralColumnIndexSupplier<?> makeSingleTypeLongSupplier() throws IOException
   {
     ByteBuffer localDictionaryBuffer = ByteBuffer.allocate(1 << 12).order(ByteOrder.nativeOrder());
     ByteBuffer bitmapsBuffer = ByteBuffer.allocate(1 << 12);
@@ -1174,7 +1175,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
 
     GenericIndexed<ImmutableBitmap> bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy());
 
-    return new NestedFieldLiteralColumnIndexSupplier(
+    return new NestedFieldLiteralColumnIndexSupplier<>(
         new NestedLiteralTypeInfo.TypeSet(
             new NestedLiteralTypeInfo.MutableTypeSet().add(ColumnType.LONG).getByteValue()
         ),
@@ -1187,7 +1188,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
     );
   }
 
-  private NestedFieldLiteralColumnIndexSupplier makeSingleTypeLongSupplierWithNull() throws IOException
+  private NestedFieldLiteralColumnIndexSupplier<?> makeSingleTypeLongSupplierWithNull() throws IOException
   {
     ByteBuffer localDictionaryBuffer = ByteBuffer.allocate(1 << 12).order(ByteOrder.nativeOrder());
     ByteBuffer bitmapsBuffer = ByteBuffer.allocate(1 << 12);
@@ -1249,7 +1250,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
 
     GenericIndexed<ImmutableBitmap> bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy());
 
-    return new NestedFieldLiteralColumnIndexSupplier(
+    return new NestedFieldLiteralColumnIndexSupplier<>(
         new NestedLiteralTypeInfo.TypeSet(
             new NestedLiteralTypeInfo.MutableTypeSet().add(ColumnType.LONG).getByteValue()
         ),
@@ -1262,7 +1263,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
     );
   }
 
-  private NestedFieldLiteralColumnIndexSupplier makeSingleTypeDoubleSupplier() throws IOException
+  private NestedFieldLiteralColumnIndexSupplier<?> makeSingleTypeDoubleSupplier() throws IOException
   {
     ByteBuffer localDictionaryBuffer = ByteBuffer.allocate(1 << 12).order(ByteOrder.nativeOrder());
     ByteBuffer bitmapsBuffer = ByteBuffer.allocate(1 << 12);
@@ -1320,7 +1321,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
 
     GenericIndexed<ImmutableBitmap> bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy());
 
-    return new NestedFieldLiteralColumnIndexSupplier(
+    return new NestedFieldLiteralColumnIndexSupplier<>(
         new NestedLiteralTypeInfo.TypeSet(
             new NestedLiteralTypeInfo.MutableTypeSet().add(ColumnType.DOUBLE).getByteValue()
         ),
@@ -1333,7 +1334,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
     );
   }
 
-  private NestedFieldLiteralColumnIndexSupplier makeSingleTypeDoubleSupplierWithNull() throws IOException
+  private NestedFieldLiteralColumnIndexSupplier<?> makeSingleTypeDoubleSupplierWithNull() throws IOException
   {
     ByteBuffer localDictionaryBuffer = ByteBuffer.allocate(1 << 12).order(ByteOrder.nativeOrder());
     ByteBuffer bitmapsBuffer = ByteBuffer.allocate(1 << 12);
@@ -1395,7 +1396,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
 
     GenericIndexed<ImmutableBitmap> bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy());
 
-    return new NestedFieldLiteralColumnIndexSupplier(
+    return new NestedFieldLiteralColumnIndexSupplier<>(
         new NestedLiteralTypeInfo.TypeSet(
             new NestedLiteralTypeInfo.MutableTypeSet().add(ColumnType.DOUBLE).getByteValue()
         ),
@@ -1408,7 +1409,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
     );
   }
 
-  private NestedFieldLiteralColumnIndexSupplier makeVariantSupplierWithNull() throws IOException
+  private NestedFieldLiteralColumnIndexSupplier<?> makeVariantSupplierWithNull() throws IOException
   {
     ByteBuffer localDictionaryBuffer = ByteBuffer.allocate(1 << 12).order(ByteOrder.nativeOrder());
     ByteBuffer bitmapsBuffer = ByteBuffer.allocate(1 << 12);
@@ -1478,7 +1479,7 @@ public class NestedFieldLiteralColumnIndexSupplierTest extends InitializedNullHa
 
     GenericIndexed<ImmutableBitmap> bitmaps = GenericIndexed.read(bitmapsBuffer, roaringFactory.getObjectStrategy());
 
-    return new NestedFieldLiteralColumnIndexSupplier(
+    return new NestedFieldLiteralColumnIndexSupplier<>(
         new NestedLiteralTypeInfo.TypeSet(
             new NestedLiteralTypeInfo.MutableTypeSet().add(ColumnType.STRING)
                                                       .add(ColumnType.LONG)
diff --git a/website/.spelling b/website/.spelling
index afa6c881b2..bbabc3160e 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -238,6 +238,7 @@ blobstore
 boolean
 breakpoint
 broadcasted
+bucketSize
 checksums
 classpath
 clickstream
@@ -317,6 +318,7 @@ isAllowList
 jackson-jq
 javadoc
 joinable
+jsonCompression
 json_keys
 json_object
 json_paths
@@ -447,6 +449,7 @@ ssl
 sslmode
 stdout
 storages
+stringDictionaryEncoding
 stringified
 subarray
 subnet


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org