You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2020/03/27 19:17:54 UTC

[druid] branch 0.18.0 updated: error on value counter overflow instead of writing sad segments (#9559) (#9572)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.18.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.18.0 by this push:
     new 0fcdfb0  error on value counter overflow instead of writing sad segments (#9559) (#9572)
0fcdfb0 is described below

commit 0fcdfb083c7b9dcfd8810db0d36582e0ca948e1a
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Fri Mar 27 12:17:41 2020 -0700

    error on value counter overflow instead of writing sad segments (#9559) (#9572)
---
 .../FloatCompressionBenchmarkFileGenerator.java    |  36 ++--
 .../LongCompressionBenchmarkFileGenerator.java     |  36 ++--
 .../druid/segment/DoubleColumnSerializer.java      |   7 +-
 .../druid/segment/DoubleColumnSerializerV2.java    |   6 +
 .../druid/segment/FloatColumnSerializer.java       |   7 +-
 .../druid/segment/FloatColumnSerializerV2.java     |   6 +
 .../org/apache/druid/segment/IndexMergerV9.java    |   6 +
 .../apache/druid/segment/LongColumnSerializer.java |   7 +-
 .../druid/segment/LongColumnSerializerV2.java      |   6 +
 .../druid/segment/StringDimensionMergerV9.java     |   5 +-
 .../data/BlockLayoutColumnarDoublesSerializer.java |  12 ++
 .../data/BlockLayoutColumnarFloatsSerializer.java  |   6 +
 .../data/BlockLayoutColumnarLongsSerializer.java   |   6 +
 ...r.java => ColumnCapacityExceededException.java} |  24 ++-
 .../segment/data/ColumnarDoublesSerializer.java    |   1 +
 .../data/CompressedColumnarIntsSerializer.java     |   9 +-
 .../data/CompressedColumnarIntsSupplier.java       |  20 ++
 .../CompressedVSizeColumnarIntsSerializer.java     |  10 +
 .../data/CompressedVSizeColumnarIntsSupplier.java  |  29 +++
 .../druid/segment/data/CompressionFactory.java     |  38 +++-
 .../EntireLayoutColumnarDoublesSerializer.java     |  13 +-
 .../data/EntireLayoutColumnarFloatsSerializer.java |   9 +-
 .../data/EntireLayoutColumnarLongsSerializer.java  |   6 +
 .../data/IntermediateColumnarLongsSerializer.java  |   9 +-
 ...CompressedVSizeColumnarMultiIntsSerializer.java |  10 +
 ...V3CompressedVSizeColumnarMultiIntsSupplier.java |  21 ++
 .../data/VSizeColumnarMultiIntsSerializer.java     |  11 +-
 .../data/CompressedColumnarIntsSerializerTest.java | 125 +++++++----
 ...deTest.java => CompressedDoublesSerdeTest.java} | 140 +++++++-----
 .../segment/data/CompressedFloatsSerdeTest.java    |  40 ++++
 .../data/CompressedLongsAutoEncodingSerdeTest.java |   1 +
 .../segment/data/CompressedLongsSerdeTest.java     |  45 ++++
 .../CompressedVSizeColumnarIntsSerializerTest.java |  52 ++++-
 ...ressedVSizeColumnarMultiIntsSerializerTest.java | 237 +++++++++++++++------
 34 files changed, 779 insertions(+), 217 deletions(-)

diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java
index ef3bffc..1570619 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java
@@ -65,21 +65,26 @@ public class FloatCompressionBenchmarkFileGenerator
       dirPath = args[0];
     }
 
-    BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated("", ValueType.FLOAT, true, 1, 0d,
-                                                                                  ImmutableList.of(
-                                                                                      0f,
-                                                                                      1.1f,
-                                                                                      2.2f,
-                                                                                      3.3f,
-                                                                                      4.4f
-                                                                                  ),
-                                                                                  ImmutableList.of(
-                                                                                      0.95,
-                                                                                      0.001,
-                                                                                      0.0189,
-                                                                                      0.03,
-                                                                                      0.0001
-                                                                                  )
+    BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated(
+        "",
+        ValueType.FLOAT,
+        true,
+        1,
+        0d,
+        ImmutableList.of(
+            0f,
+            1.1f,
+            2.2f,
+            3.3f,
+            4.4f
+        ),
+        ImmutableList.of(
+            0.95,
+            0.001,
+            0.0189,
+            0.03,
+            0.0001
+        )
     );
     BenchmarkColumnSchema zipfLowSchema = BenchmarkColumnSchema.makeZipf(
         "",
@@ -151,6 +156,7 @@ public class FloatCompressionBenchmarkFileGenerator
         File dataFile = new File(dir, entry.getKey());
 
         ColumnarFloatsSerializer writer = CompressionFactory.getFloatSerializer(
+            "float-benchmark",
             new OffHeapMemorySegmentWriteOutMedium(),
             "float",
             ByteOrder.nativeOrder(),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java b/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java
index 31c39fe..ead3dd0 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java
@@ -66,21 +66,26 @@ public class LongCompressionBenchmarkFileGenerator
       dirPath = args[0];
     }
 
-    BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated("", ValueType.LONG, true, 1, 0d,
-                                                                                  ImmutableList.of(
-                                                                                      0,
-                                                                                      1,
-                                                                                      2,
-                                                                                      3,
-                                                                                      4
-                                                                                  ),
-                                                                                  ImmutableList.of(
-                                                                                      0.95,
-                                                                                      0.001,
-                                                                                      0.0189,
-                                                                                      0.03,
-                                                                                      0.0001
-                                                                                  )
+    BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated(
+        "",
+        ValueType.LONG,
+        true,
+        1,
+        0d,
+        ImmutableList.of(
+            0,
+            1,
+            2,
+            3,
+            4
+        ),
+        ImmutableList.of(
+            0.95,
+            0.001,
+            0.0189,
+            0.03,
+            0.0001
+        )
     );
     BenchmarkColumnSchema zipfLowSchema = BenchmarkColumnSchema.makeZipf("", ValueType.LONG, true, 1, 0d, -1, 1000, 1d);
     BenchmarkColumnSchema zipfHighSchema = BenchmarkColumnSchema.makeZipf(
@@ -144,6 +149,7 @@ public class LongCompressionBenchmarkFileGenerator
           File dataFile = new File(dir, entry.getKey());
 
           ColumnarLongsSerializer writer = CompressionFactory.getLongSerializer(
+              "long-benchmark",
               new OffHeapMemorySegmentWriteOutMedium(),
               "long",
               ByteOrder.nativeOrder(),
diff --git a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java
index 67b68ed..cc525cb 100644
--- a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java
@@ -33,14 +33,16 @@ import java.nio.channels.WritableByteChannel;
 public class DoubleColumnSerializer implements GenericColumnSerializer<Object>
 {
   public static DoubleColumnSerializer create(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       CompressionStrategy compression
   )
   {
-    return new DoubleColumnSerializer(segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression);
+    return new DoubleColumnSerializer(columnName, segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression);
   }
 
+  private final String columnName;
   private final SegmentWriteOutMedium segmentWriteOutMedium;
   private final String filenameBase;
   private final ByteOrder byteOrder;
@@ -48,12 +50,14 @@ public class DoubleColumnSerializer implements GenericColumnSerializer<Object>
   private ColumnarDoublesSerializer writer;
 
   private DoubleColumnSerializer(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder byteOrder,
       CompressionStrategy compression
   )
   {
+    this.columnName = columnName;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
     this.filenameBase = filenameBase;
     this.byteOrder = byteOrder;
@@ -64,6 +68,7 @@ public class DoubleColumnSerializer implements GenericColumnSerializer<Object>
   public void open() throws IOException
   {
     writer = CompressionFactory.getDoubleSerializer(
+        columnName,
         segmentWriteOutMedium,
         StringUtils.format("%s.double_column", filenameBase),
         byteOrder,
diff --git a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java
index 3078a6a..02903eb 100644
--- a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java
+++ b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java
@@ -45,6 +45,7 @@ import java.nio.channels.WritableByteChannel;
 public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
 {
   public static DoubleColumnSerializerV2 create(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       CompressionStrategy compression,
@@ -52,6 +53,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
   )
   {
     return new DoubleColumnSerializerV2(
+        columnName,
         segmentWriteOutMedium,
         filenameBase,
         IndexIO.BYTE_ORDER,
@@ -60,6 +62,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
     );
   }
 
+  private final String columnName;
   private final SegmentWriteOutMedium segmentWriteOutMedium;
   private final String filenameBase;
   private final ByteOrder byteOrder;
@@ -72,6 +75,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
   private int rowCount = 0;
 
   private DoubleColumnSerializerV2(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder byteOrder,
@@ -79,6 +83,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
       BitmapSerdeFactory bitmapSerdeFactory
   )
   {
+    this.columnName = columnName;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
     this.filenameBase = filenameBase;
     this.byteOrder = byteOrder;
@@ -90,6 +95,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
   public void open() throws IOException
   {
     writer = CompressionFactory.getDoubleSerializer(
+        columnName,
         segmentWriteOutMedium,
         StringUtils.format("%s.double_column", filenameBase),
         byteOrder,
diff --git a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java
index 8d413d5..b96d520 100644
--- a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java
@@ -33,14 +33,16 @@ import java.nio.channels.WritableByteChannel;
 public class FloatColumnSerializer implements GenericColumnSerializer<Object>
 {
   public static FloatColumnSerializer create(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       CompressionStrategy compression
   )
   {
-    return new FloatColumnSerializer(segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression);
+    return new FloatColumnSerializer(columnName, segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression);
   }
 
+  private final String columnName;
   private final SegmentWriteOutMedium segmentWriteOutMedium;
   private final String filenameBase;
   private final ByteOrder byteOrder;
@@ -48,12 +50,14 @@ public class FloatColumnSerializer implements GenericColumnSerializer<Object>
   private ColumnarFloatsSerializer writer;
 
   private FloatColumnSerializer(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder byteOrder,
       CompressionStrategy compression
   )
   {
+    this.columnName = columnName;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
     this.filenameBase = filenameBase;
     this.byteOrder = byteOrder;
@@ -64,6 +68,7 @@ public class FloatColumnSerializer implements GenericColumnSerializer<Object>
   public void open() throws IOException
   {
     writer = CompressionFactory.getFloatSerializer(
+        columnName,
         segmentWriteOutMedium,
         StringUtils.format("%s.float_column", filenameBase),
         byteOrder,
diff --git a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java
index 41e7b68..b5371a0 100644
--- a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java
+++ b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java
@@ -45,6 +45,7 @@ import java.nio.channels.WritableByteChannel;
 public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
 {
   public static FloatColumnSerializerV2 create(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       CompressionStrategy compression,
@@ -52,6 +53,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
   )
   {
     return new FloatColumnSerializerV2(
+        columnName,
         segmentWriteOutMedium,
         filenameBase,
         IndexIO.BYTE_ORDER,
@@ -60,6 +62,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
     );
   }
 
+  private final String columnName;
   private final SegmentWriteOutMedium segmentWriteOutMedium;
   private final String filenameBase;
   private final ByteOrder byteOrder;
@@ -72,6 +75,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
   private int rowCount = 0;
 
   private FloatColumnSerializerV2(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder byteOrder,
@@ -79,6 +83,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
       BitmapSerdeFactory bitmapSerdeFactory
   )
   {
+    this.columnName = columnName;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
     this.filenameBase = filenameBase;
     this.byteOrder = byteOrder;
@@ -90,6 +95,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
   public void open() throws IOException
   {
     writer = CompressionFactory.getFloatSerializer(
+        columnName,
         segmentWriteOutMedium,
         StringUtils.format("%s.float_column", filenameBase),
         byteOrder,
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
index 91e202a..e1a92f7 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
@@ -621,6 +621,7 @@ public class IndexMergerV9 implements IndexMerger
     // If using default values for null use LongColumnSerializer to allow rollback to previous versions.
     if (NullHandling.replaceWithDefault()) {
       return LongColumnSerializer.create(
+          columnName,
           segmentWriteOutMedium,
           columnName,
           indexSpec.getMetricCompression(),
@@ -628,6 +629,7 @@ public class IndexMergerV9 implements IndexMerger
       );
     } else {
       return LongColumnSerializerV2.create(
+          columnName,
           segmentWriteOutMedium,
           columnName,
           indexSpec.getMetricCompression(),
@@ -646,12 +648,14 @@ public class IndexMergerV9 implements IndexMerger
     // If using default values for null use DoubleColumnSerializer to allow rollback to previous versions.
     if (NullHandling.replaceWithDefault()) {
       return DoubleColumnSerializer.create(
+          columnName,
           segmentWriteOutMedium,
           columnName,
           indexSpec.getMetricCompression()
       );
     } else {
       return DoubleColumnSerializerV2.create(
+          columnName,
           segmentWriteOutMedium,
           columnName,
           indexSpec.getMetricCompression(),
@@ -669,12 +673,14 @@ public class IndexMergerV9 implements IndexMerger
     // If using default values for null use FloatColumnSerializer to allow rollback to previous versions.
     if (NullHandling.replaceWithDefault()) {
       return FloatColumnSerializer.create(
+          columnName,
           segmentWriteOutMedium,
           columnName,
           indexSpec.getMetricCompression()
       );
     } else {
       return FloatColumnSerializerV2.create(
+          columnName,
           segmentWriteOutMedium,
           columnName,
           indexSpec.getMetricCompression(),
diff --git a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java
index 8f4cc58..1f8d03b 100644
--- a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java
@@ -36,15 +36,17 @@ import java.nio.channels.WritableByteChannel;
 public class LongColumnSerializer implements GenericColumnSerializer<Object>
 {
   public static LongColumnSerializer create(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       CompressionStrategy compression,
       CompressionFactory.LongEncodingStrategy encoding
   )
   {
-    return new LongColumnSerializer(segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression, encoding);
+    return new LongColumnSerializer(columnName, segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression, encoding);
   }
 
+  private final String columnName;
   private final SegmentWriteOutMedium segmentWriteOutMedium;
   private final String filenameBase;
   private final ByteOrder byteOrder;
@@ -53,6 +55,7 @@ public class LongColumnSerializer implements GenericColumnSerializer<Object>
   private ColumnarLongsSerializer writer;
 
   private LongColumnSerializer(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder byteOrder,
@@ -60,6 +63,7 @@ public class LongColumnSerializer implements GenericColumnSerializer<Object>
       CompressionFactory.LongEncodingStrategy encoding
   )
   {
+    this.columnName = columnName;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
     this.filenameBase = filenameBase;
     this.byteOrder = byteOrder;
@@ -71,6 +75,7 @@ public class LongColumnSerializer implements GenericColumnSerializer<Object>
   public void open() throws IOException
   {
     writer = CompressionFactory.getLongSerializer(
+        columnName,
         segmentWriteOutMedium,
         StringUtils.format("%s.long_column", filenameBase),
         byteOrder,
diff --git a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
index cacac59..364a7af 100644
--- a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
+++ b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
@@ -45,6 +45,7 @@ import java.nio.channels.WritableByteChannel;
 public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
 {
   public static LongColumnSerializerV2 create(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       CompressionStrategy compression,
@@ -53,6 +54,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
   )
   {
     return new LongColumnSerializerV2(
+        columnName,
         segmentWriteOutMedium,
         filenameBase,
         IndexIO.BYTE_ORDER,
@@ -62,6 +64,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
     );
   }
 
+  private final String columnName;
   private final SegmentWriteOutMedium segmentWriteOutMedium;
   private final String filenameBase;
   private final ByteOrder byteOrder;
@@ -75,6 +78,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
   private int rowCount = 0;
 
   private LongColumnSerializerV2(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder byteOrder,
@@ -83,6 +87,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
       BitmapSerdeFactory bitmapSerdeFactory
   )
   {
+    this.columnName = columnName;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
     this.filenameBase = filenameBase;
     this.byteOrder = byteOrder;
@@ -95,6 +100,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
   public void open() throws IOException
   {
     writer = CompressionFactory.getLongSerializer(
+        columnName,
         segmentWriteOutMedium,
         StringUtils.format("%s.long_column", filenameBase),
         byteOrder,
diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
index 2ccc0dd..cff7ef9 100644
--- a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
@@ -224,17 +224,20 @@ public class StringDimensionMergerV9 implements DimensionMergerV9
     if (capabilities.hasMultipleValues()) {
       if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
         encodedValueSerializer = V3CompressedVSizeColumnarMultiIntsSerializer.create(
+            dimensionName,
             segmentWriteOutMedium,
             filenameBase,
             cardinality,
             compressionStrategy
         );
       } else {
-        encodedValueSerializer = new VSizeColumnarMultiIntsSerializer(segmentWriteOutMedium, cardinality);
+        encodedValueSerializer =
+            new VSizeColumnarMultiIntsSerializer(dimensionName, segmentWriteOutMedium, cardinality);
       }
     } else {
       if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
         encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
+            dimensionName,
             segmentWriteOutMedium,
             filenameBase,
             cardinality,
diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
index c2247ac..2b4612e 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
@@ -43,6 +43,7 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
       .writeInt(x -> CompressedPools.BUFFER_SIZE / Double.BYTES)
       .writeByte(x -> x.compression.getId());
 
+  private final String columnName;
   private final GenericIndexedWriter<ByteBuffer> flattener;
   private final CompressionStrategy compression;
 
@@ -51,12 +52,14 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
   private ByteBuffer endBuffer;
 
   BlockLayoutColumnarDoublesSerializer(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder byteOrder,
       CompressionStrategy compression
   )
   {
+    this.columnName = columnName;
     this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(
         segmentWriteOutMedium,
         filenameBase,
@@ -76,6 +79,12 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
   }
 
   @Override
+  public int size()
+  {
+    return numInserted;
+  }
+
+  @Override
   public void add(double value) throws IOException
   {
     if (endBuffer == null) {
@@ -89,6 +98,9 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
 
     endBuffer.putDouble(value);
     ++numInserted;
+    if (numInserted < 0) {
+      throw new ColumnCapacityExceededException(columnName);
+    }
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
index e247252..94a3ef6 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
@@ -43,6 +43,7 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
       .writeInt(x -> CompressedPools.BUFFER_SIZE / Float.BYTES)
       .writeByte(x -> x.compression.getId());
 
+  private final String columnName;
   private final GenericIndexedWriter<ByteBuffer> flattener;
   private final CompressionStrategy compression;
 
@@ -51,12 +52,14 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
   private ByteBuffer endBuffer;
 
   BlockLayoutColumnarFloatsSerializer(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder byteOrder,
       CompressionStrategy compression
   )
   {
+    this.columnName = columnName;
     this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(
         segmentWriteOutMedium,
         filenameBase,
@@ -94,6 +97,9 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
     }
     endBuffer.putFloat(value);
     ++numInserted;
+    if (numInserted < 0) {
+      throw new ColumnCapacityExceededException(columnName);
+    }
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
index cb40402..ff47a34 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
@@ -42,6 +42,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
       .writeInt(x -> x.sizePer)
       .writeSomething(CompressionFactory.longEncodingWriter(x -> x.writer, x -> x.compression));
 
+  private final String columnName;
   private final int sizePer;
   private final CompressionFactory.LongEncodingWriter writer;
   private final GenericIndexedWriter<ByteBuffer> flattener;
@@ -53,6 +54,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
   private ByteBuffer endBuffer;
 
   BlockLayoutColumnarLongsSerializer(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder byteOrder,
@@ -60,6 +62,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
       CompressionStrategy compression
   )
   {
+    this.columnName = columnName;
     this.sizePer = writer.getBlockSize(CompressedPools.BUFFER_SIZE);
     int bufferSize = writer.getNumBytes(sizePer);
     this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(segmentWriteOutMedium, filenameBase, compression, bufferSize);
@@ -100,6 +103,9 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
 
     writer.write(value);
     ++numInserted;
+    if (numInserted < 0) {
+      throw new ColumnCapacityExceededException(columnName);
+    }
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/ColumnCapacityExceededException.java
similarity index 62%
copy from processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
copy to processing/src/main/java/org/apache/druid/segment/data/ColumnCapacityExceededException.java
index da1b977..f7a2a60 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/ColumnCapacityExceededException.java
@@ -19,15 +19,21 @@
 
 package org.apache.druid.segment.data;
 
-import org.apache.druid.segment.serde.Serializer;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.StringUtils;
 
-import java.io.IOException;
-
-/**
- * Serializer that produces {@link ColumnarDoubles}.
- */
-public interface ColumnarDoublesSerializer extends Serializer
+public class ColumnCapacityExceededException extends RuntimeException
 {
-  void open() throws IOException;
-  void add(double value) throws IOException;
+  @VisibleForTesting
+  public static String formatMessage(String columnName)
+  {
+    return StringUtils.format(
+        "Too many values to store for %s column, try reducing maxRowsPerSegment",
+        columnName
+    );
+  }
+  public ColumnCapacityExceededException(String columnName)
+  {
+    super(formatMessage(columnName));
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
index da1b977..f33cdee 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
@@ -29,5 +29,6 @@ import java.io.IOException;
 public interface ColumnarDoublesSerializer extends Serializer
 {
   void open() throws IOException;
+  int size();
   void add(double value) throws IOException;
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
index 0d82b4c..a0442d9 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
@@ -25,7 +25,6 @@ import org.apache.druid.segment.serde.MetaSerdeHelper;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -44,6 +43,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
       .writeInt(x -> x.chunkFactor)
       .writeByte(x -> x.compression.getId());
 
+  private final String columnName;
   private final int chunkFactor;
   private final CompressionStrategy compression;
   private final GenericIndexedWriter<ByteBuffer> flattener;
@@ -53,6 +53,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
   private ByteBuffer endBuffer;
 
   CompressedColumnarIntsSerializer(
+      final String columnName,
       final SegmentWriteOutMedium segmentWriteOutMedium,
       final String filenameBase,
       final int chunkFactor,
@@ -61,6 +62,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
   )
   {
     this(
+        columnName,
         segmentWriteOutMedium,
         chunkFactor,
         byteOrder,
@@ -75,6 +77,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
   }
 
   CompressedColumnarIntsSerializer(
+      final String columnName,
       final SegmentWriteOutMedium segmentWriteOutMedium,
       final int chunkFactor,
       final ByteOrder byteOrder,
@@ -82,6 +85,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
       final GenericIndexedWriter<ByteBuffer> flattener
   )
   {
+    this.columnName = columnName;
     this.chunkFactor = chunkFactor;
     this.compression = compression;
     this.flattener = flattener;
@@ -110,6 +114,9 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
     }
     endBuffer.putInt(val);
     numInserted++;
+    if (numInserted < 0) {
+      throw new ColumnCapacityExceededException(columnName);
+    }
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java
index f58ea59..d9b3cf9 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java
@@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.guava.CloseQuietly;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.CompressedPools;
 import org.apache.druid.segment.serde.MetaSerdeHelper;
@@ -133,6 +134,25 @@ public class CompressedColumnarIntsSupplier implements WritableSupplier<Columnar
     throw new IAE("Unknown version[%s]", versionFromBuffer);
   }
 
+  public static CompressedColumnarIntsSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order, SmooshedFileMapper mapper)
+  {
+    byte versionFromBuffer = buffer.get();
+
+    if (versionFromBuffer == VERSION) {
+      final int totalSize = buffer.getInt();
+      final int sizePer = buffer.getInt();
+      final CompressionStrategy compression = CompressionStrategy.forId(buffer.get());
+      return new CompressedColumnarIntsSupplier(
+          totalSize,
+          sizePer,
+          GenericIndexed.read(buffer, new DecompressingByteBufferObjectStrategy(order, compression), mapper),
+          compression
+      );
+    }
+
+    throw new IAE("Unknown version[%s]", versionFromBuffer);
+  }
+
   @VisibleForTesting
   static CompressedColumnarIntsSupplier fromIntBuffer(
       final IntBuffer buffer,
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
index 0f6c54a..6060be2 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
@@ -47,6 +47,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
       .writeByte(x -> x.compression.getId());
 
   public static CompressedVSizeColumnarIntsSerializer create(
+      final String columnName,
       final SegmentWriteOutMedium segmentWriteOutMedium,
       final String filenameBase,
       final int maxValue,
@@ -54,6 +55,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
   )
   {
     return new CompressedVSizeColumnarIntsSerializer(
+        columnName,
         segmentWriteOutMedium,
         filenameBase,
         maxValue,
@@ -63,6 +65,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
     );
   }
 
+  private final String columnName;
   private final int numBytes;
   private final int chunkFactor;
   private final boolean isBigEndian;
@@ -75,6 +78,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
   private ByteBuffer endBuffer;
 
   CompressedVSizeColumnarIntsSerializer(
+      final String columnName,
       final SegmentWriteOutMedium segmentWriteOutMedium,
       final String filenameBase,
       final int maxValue,
@@ -84,6 +88,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
   )
   {
     this(
+        columnName,
         segmentWriteOutMedium,
         maxValue,
         chunkFactor,
@@ -99,6 +104,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
   }
 
   CompressedVSizeColumnarIntsSerializer(
+      final String columnName,
       final SegmentWriteOutMedium segmentWriteOutMedium,
       final int maxValue,
       final int chunkFactor,
@@ -107,6 +113,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
       final GenericIndexedWriter<ByteBuffer> flattener
   )
   {
+    this.columnName = columnName;
     this.numBytes = VSizeColumnarInts.getNumBytesForMax(maxValue);
     this.chunkFactor = chunkFactor;
     int chunkBytes = chunkFactor * numBytes;
@@ -149,6 +156,9 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
       endBuffer.put(intBuffer.array(), 0, numBytes);
     }
     numInserted++;
+    if (numInserted < 0) {
+      throw new ColumnCapacityExceededException(columnName);
+    }
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
index 323945c..84d8b8d 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.guava.CloseQuietly;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.CompressedPools;
 import org.apache.druid.segment.serde.MetaSerdeHelper;
@@ -167,6 +168,34 @@ public class CompressedVSizeColumnarIntsSupplier implements WritableSupplier<Col
     throw new IAE("Unknown version[%s]", versionFromBuffer);
   }
 
+  public static CompressedVSizeColumnarIntsSupplier fromByteBuffer(
+      ByteBuffer buffer,
+      ByteOrder order,
+      SmooshedFileMapper mapper
+  )
+  {
+    byte versionFromBuffer = buffer.get();
+
+    if (versionFromBuffer == VERSION) {
+      final int numBytes = buffer.get();
+      final int totalSize = buffer.getInt();
+      final int sizePer = buffer.getInt();
+
+      final CompressionStrategy compression = CompressionStrategy.forId(buffer.get());
+
+      return new CompressedVSizeColumnarIntsSupplier(
+          totalSize,
+          sizePer,
+          numBytes,
+          GenericIndexed.read(buffer, new DecompressingByteBufferObjectStrategy(order, compression), mapper),
+          compression
+      );
+
+    }
+
+    throw new IAE("Unknown version[%s]", versionFromBuffer);
+  }
+
   @VisibleForTesting
   public static CompressedVSizeColumnarIntsSupplier fromList(
       final IntList list,
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
index ddacb90..7bf647a 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
@@ -331,6 +331,7 @@ public class CompressionFactory
   }
 
   public static ColumnarLongsSerializer getLongSerializer(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder order,
@@ -339,12 +340,23 @@ public class CompressionFactory
   )
   {
     if (encodingStrategy == LongEncodingStrategy.AUTO) {
-      return new IntermediateColumnarLongsSerializer(segmentWriteOutMedium, filenameBase, order, compressionStrategy);
+      return new IntermediateColumnarLongsSerializer(
+          columnName,
+          segmentWriteOutMedium,
+          filenameBase,
+          order,
+          compressionStrategy
+      );
     } else if (encodingStrategy == LongEncodingStrategy.LONGS) {
       if (compressionStrategy == CompressionStrategy.NONE) {
-        return new EntireLayoutColumnarLongsSerializer(segmentWriteOutMedium, new LongsLongEncodingWriter(order));
+        return new EntireLayoutColumnarLongsSerializer(
+            columnName,
+            segmentWriteOutMedium,
+            new LongsLongEncodingWriter(order)
+        );
       } else {
         return new BlockLayoutColumnarLongsSerializer(
+            columnName,
             segmentWriteOutMedium,
             filenameBase,
             order,
@@ -375,6 +387,7 @@ public class CompressionFactory
   }
 
   public static ColumnarFloatsSerializer getFloatSerializer(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder order,
@@ -382,9 +395,15 @@ public class CompressionFactory
   )
   {
     if (compressionStrategy == CompressionStrategy.NONE) {
-      return new EntireLayoutColumnarFloatsSerializer(segmentWriteOutMedium, order);
+      return new EntireLayoutColumnarFloatsSerializer(columnName, segmentWriteOutMedium, order);
     } else {
-      return new BlockLayoutColumnarFloatsSerializer(segmentWriteOutMedium, filenameBase, order, compressionStrategy);
+      return new BlockLayoutColumnarFloatsSerializer(
+          columnName,
+          segmentWriteOutMedium,
+          filenameBase,
+          order,
+          compressionStrategy
+      );
     }
   }
 
@@ -406,6 +425,7 @@ public class CompressionFactory
   }
 
   public static ColumnarDoublesSerializer getDoubleSerializer(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder byteOrder,
@@ -413,9 +433,15 @@ public class CompressionFactory
   )
   {
     if (compression == CompressionStrategy.NONE) {
-      return new EntireLayoutColumnarDoublesSerializer(segmentWriteOutMedium, byteOrder);
+      return new EntireLayoutColumnarDoublesSerializer(columnName, segmentWriteOutMedium, byteOrder);
     } else {
-      return new BlockLayoutColumnarDoublesSerializer(segmentWriteOutMedium, filenameBase, byteOrder, compression);
+      return new BlockLayoutColumnarDoublesSerializer(
+          columnName,
+          segmentWriteOutMedium,
+          filenameBase,
+          byteOrder,
+          compression
+      );
     }
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java
index 540b0b7..b330319 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java
@@ -40,14 +40,16 @@ public class EntireLayoutColumnarDoublesSerializer implements ColumnarDoublesSer
       .writeInt(x -> 0)
       .writeByte(x -> CompressionStrategy.NONE.getId());
 
+  private final String columnName;
   private final SegmentWriteOutMedium segmentWriteOutMedium;
   private final ByteBuffer orderBuffer;
   private WriteOutBytes valuesOut;
 
   private int numInserted = 0;
 
-  public EntireLayoutColumnarDoublesSerializer(SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order)
+  public EntireLayoutColumnarDoublesSerializer(String columnName, SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order)
   {
+    this.columnName = columnName;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
     this.orderBuffer = ByteBuffer.allocate(Double.BYTES);
     orderBuffer.order(order);
@@ -60,12 +62,21 @@ public class EntireLayoutColumnarDoublesSerializer implements ColumnarDoublesSer
   }
 
   @Override
+  public int size()
+  {
+    return numInserted;
+  }
+
+  @Override
   public void add(double value) throws IOException
   {
     orderBuffer.rewind();
     orderBuffer.putDouble(value);
     valuesOut.write(orderBuffer.array());
     ++numInserted;
+    if (numInserted < 0) {
+      throw new ColumnCapacityExceededException(columnName);
+    }
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java
index 1209b76..75b7290 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java
@@ -39,16 +39,18 @@ public class EntireLayoutColumnarFloatsSerializer implements ColumnarFloatsSeria
       .writeInt(x -> 0)
       .writeByte(x -> CompressionStrategy.NONE.getId());
 
+  private final String columnName;
   private final boolean isLittleEndian;
   private final SegmentWriteOutMedium segmentWriteOutMedium;
   private WriteOutBytes valuesOut;
 
   private int numInserted = 0;
 
-  EntireLayoutColumnarFloatsSerializer(SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order)
+  EntireLayoutColumnarFloatsSerializer(String columnName, SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order)
   {
+    this.columnName = columnName;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
-    isLittleEndian = order.equals(ByteOrder.LITTLE_ENDIAN);
+    this.isLittleEndian = order.equals(ByteOrder.LITTLE_ENDIAN);
   }
 
   @Override
@@ -73,6 +75,9 @@ public class EntireLayoutColumnarFloatsSerializer implements ColumnarFloatsSeria
     }
     valuesOut.writeInt(valueBits);
     ++numInserted;
+    if (numInserted < 0) {
+      throw new ColumnCapacityExceededException(columnName);
+    }
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarLongsSerializer.java
index b29081b..9513836 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarLongsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarLongsSerializer.java
@@ -38,6 +38,7 @@ public class EntireLayoutColumnarLongsSerializer implements ColumnarLongsSeriali
       .writeInt(x -> 0)
       .writeSomething(CompressionFactory.longEncodingWriter(x -> x.writer, x -> CompressionStrategy.NONE));
 
+  private final String columnName;
   private final CompressionFactory.LongEncodingWriter writer;
   private final SegmentWriteOutMedium segmentWriteOutMedium;
   private WriteOutBytes valuesOut;
@@ -45,10 +46,12 @@ public class EntireLayoutColumnarLongsSerializer implements ColumnarLongsSeriali
   private int numInserted = 0;
 
   EntireLayoutColumnarLongsSerializer(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       CompressionFactory.LongEncodingWriter writer
   )
   {
+    this.columnName = columnName;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
     this.writer = writer;
   }
@@ -71,6 +74,9 @@ public class EntireLayoutColumnarLongsSerializer implements ColumnarLongsSeriali
   {
     writer.write(value);
     ++numInserted;
+    if (numInserted < 0) {
+      throw new ColumnCapacityExceededException(columnName);
+    }
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
index e08e463..c0f9355 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
@@ -40,6 +40,7 @@ import java.nio.channels.WritableByteChannel;
  */
 public class IntermediateColumnarLongsSerializer implements ColumnarLongsSerializer
 {
+  private final String columnName;
   private final SegmentWriteOutMedium segmentWriteOutMedium;
   private final String filenameBase;
   private final ByteOrder order;
@@ -59,12 +60,14 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali
   private ColumnarLongsSerializer delegate;
 
   IntermediateColumnarLongsSerializer(
+      String columnName,
       SegmentWriteOutMedium segmentWriteOutMedium,
       String filenameBase,
       ByteOrder order,
       CompressionStrategy compression
   )
   {
+    this.columnName = columnName;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
     this.filenameBase = filenameBase;
     this.order = order;
@@ -92,6 +95,9 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali
     }
     tempOut.add(value);
     ++numInserted;
+    if (numInserted < 0) {
+      throw new ColumnCapacityExceededException(columnName);
+    }
     if (uniqueValues.size() <= CompressionFactory.MAX_TABLE_SIZE && !uniqueValues.containsKey(value)) {
       uniqueValues.put(value, uniqueValues.size());
       valuesAddedInOrder.add(value);
@@ -127,9 +133,10 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali
     }
 
     if (compression == CompressionStrategy.NONE) {
-      delegate = new EntireLayoutColumnarLongsSerializer(segmentWriteOutMedium, writer);
+      delegate = new EntireLayoutColumnarLongsSerializer(columnName, segmentWriteOutMedium, writer);
     } else {
       delegate = new BlockLayoutColumnarLongsSerializer(
+          columnName,
           segmentWriteOutMedium,
           filenameBase,
           order,
diff --git a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java
index ebf0c4b..f669029 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java
@@ -36,6 +36,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI
   private static final byte VERSION = V3CompressedVSizeColumnarMultiIntsSupplier.VERSION;
 
   public static V3CompressedVSizeColumnarMultiIntsSerializer create(
+      final String columnName,
       final SegmentWriteOutMedium segmentWriteOutMedium,
       final String filenameBase,
       final int maxValue,
@@ -43,7 +44,9 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI
   )
   {
     return new V3CompressedVSizeColumnarMultiIntsSerializer(
+        columnName,
         new CompressedColumnarIntsSerializer(
+            columnName,
             segmentWriteOutMedium,
             filenameBase,
             CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER,
@@ -51,6 +54,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI
             compression
         ),
         new CompressedVSizeColumnarIntsSerializer(
+            columnName,
             segmentWriteOutMedium,
             filenameBase,
             maxValue,
@@ -61,16 +65,19 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI
     );
   }
 
+  private final String columnName;
   private final CompressedColumnarIntsSerializer offsetWriter;
   private final CompressedVSizeColumnarIntsSerializer valueWriter;
   private int offset;
   private boolean lastOffsetWritten = false;
 
   V3CompressedVSizeColumnarMultiIntsSerializer(
+      String columnName,
       CompressedColumnarIntsSerializer offsetWriter,
       CompressedVSizeColumnarIntsSerializer valueWriter
   )
   {
+    this.columnName = columnName;
     this.offsetWriter = offsetWriter;
     this.valueWriter = valueWriter;
     this.offset = 0;
@@ -95,6 +102,9 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI
       valueWriter.addValue(ints.get(i));
     }
     offset += numValues;
+    if (offset < 0) {
+      throw new ColumnCapacityExceededException(columnName);
+    }
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java
index 2beb43d..3bb934c 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java
@@ -24,6 +24,7 @@ import it.unimi.dsi.fastutil.ints.IntArrayList;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -73,6 +74,26 @@ public class V3CompressedVSizeColumnarMultiIntsSupplier implements WritableSuppl
     throw new IAE("Unknown version[%s]", versionFromBuffer);
   }
 
+  public static V3CompressedVSizeColumnarMultiIntsSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order, SmooshedFileMapper mapper)
+  {
+    byte versionFromBuffer = buffer.get();
+
+    if (versionFromBuffer == VERSION) {
+      CompressedColumnarIntsSupplier offsetSupplier = CompressedColumnarIntsSupplier.fromByteBuffer(
+          buffer,
+          order,
+          mapper
+      );
+      CompressedVSizeColumnarIntsSupplier valueSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
+          buffer,
+          order,
+          mapper
+      );
+      return new V3CompressedVSizeColumnarMultiIntsSupplier(offsetSupplier, valueSupplier);
+    }
+    throw new IAE("Unknown version[%s]", versionFromBuffer);
+  }
+
   @VisibleForTesting
   public static V3CompressedVSizeColumnarMultiIntsSupplier fromIterable(
       final Iterable<IndexedInts> objectsIterable,
diff --git a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java
index 16fe48d..088f2b5 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java
@@ -81,6 +81,7 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize
     abstract void write(WriteOutBytes out, int v) throws IOException;
   }
 
+  private final String columnName;
   private final int maxId;
   private final WriteInt writeInt;
 
@@ -92,8 +93,13 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize
   private int numWritten = 0;
   private boolean numBytesForMaxWritten = false;
 
-  public VSizeColumnarMultiIntsSerializer(SegmentWriteOutMedium segmentWriteOutMedium, int maxId)
+  public VSizeColumnarMultiIntsSerializer(
+      String columnName,
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      int maxId
+  )
   {
+    this.columnName = columnName;
     this.segmentWriteOutMedium = segmentWriteOutMedium;
     this.maxId = maxId;
     this.writeInt = WriteInt.values()[VSizeColumnarInts.getNumBytesForMax(maxId) - 1];
@@ -120,6 +126,9 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize
     headerOut.writeInt(Ints.checkedCast(valuesOut.size()));
 
     ++numWritten;
+    if (numWritten < 0) {
+      throw new ColumnCapacityExceededException(columnName);
+    }
   }
 
   @Override
diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
index 9c852ed..43ea5eb 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
@@ -33,22 +33,27 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import org.apache.druid.segment.writeout.WriteOutBytes;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 
 @RunWith(Parameterized.class)
 public class CompressedColumnarIntsSerializerTest
@@ -64,6 +69,9 @@ public class CompressedColumnarIntsSerializerTest
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   public CompressedColumnarIntsSerializerTest(
       CompressionStrategy compressionStrategy,
       ByteOrder byteOrder
@@ -99,6 +107,77 @@ public class CompressedColumnarIntsSerializerTest
     segmentWriteOutMedium.close();
   }
 
+  @Test
+  public void testSmallData() throws Exception
+  {
+    // less than one chunk
+    for (int maxValue : MAX_VALUES) {
+      for (int chunkFactor : CHUNK_FACTORS) {
+        generateVals(rand.nextInt(chunkFactor), maxValue);
+        checkSerializedSizeAndData(chunkFactor);
+      }
+    }
+  }
+
+  @Test
+  public void testLargeData() throws Exception
+  {
+    // more than one chunk
+    for (int maxValue : MAX_VALUES) {
+      for (int chunkFactor : CHUNK_FACTORS) {
+        generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue);
+        checkSerializedSizeAndData(chunkFactor);
+      }
+    }
+  }
+
+  @Test
+  public void testWriteEmpty() throws Exception
+  {
+    vals = new int[0];
+    checkSerializedSizeAndData(2);
+  }
+
+  @Test
+  public void testMultiValueFileLargeData() throws Exception
+  {
+    // more than one chunk
+    for (int maxValue : MAX_VALUES) {
+      for (int chunkFactor : CHUNK_FACTORS) {
+        generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue);
+        checkV2SerializedSizeAndData(chunkFactor);
+      }
+    }
+  }
+
+  // this test takes ~30 minutes to run
+  @Ignore
+  @Test
+  public void testTooManyValues() throws IOException
+  {
+    expectedException.expect(ColumnCapacityExceededException.class);
+    expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
+    try (
+        SegmentWriteOutMedium segmentWriteOutMedium =
+            TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
+    ) {
+      CompressedColumnarIntsSerializer serializer = new CompressedColumnarIntsSerializer(
+          "test",
+          segmentWriteOutMedium,
+          "test",
+          CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER,
+          byteOrder,
+          compressionStrategy
+      );
+      serializer.open();
+
+      final long numRows = Integer.MAX_VALUE + 100L;
+      for (long i = 0L; i < numRows; i++) {
+        serializer.addValue(ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE));
+      }
+    }
+  }
+
   private void generateVals(final int totalSize, final int maxValue)
   {
     vals = new int[totalSize];
@@ -112,6 +191,7 @@ public class CompressedColumnarIntsSerializerTest
     FileSmoosher smoosher = new FileSmoosher(temporaryFolder.newFolder());
 
     CompressedColumnarIntsSerializer writer = new CompressedColumnarIntsSerializer(
+        "test",
         segmentWriteOutMedium,
         "test",
         chunkFactor,
@@ -149,44 +229,13 @@ public class CompressedColumnarIntsSerializerTest
     CloseQuietly.close(columnarInts);
   }
 
-  @Test
-  public void testSmallData() throws Exception
-  {
-    // less than one chunk
-    for (int maxValue : MAX_VALUES) {
-      for (int chunkFactor : CHUNK_FACTORS) {
-        generateVals(rand.nextInt(chunkFactor), maxValue);
-        checkSerializedSizeAndData(chunkFactor);
-      }
-    }
-  }
-
-  @Test
-  public void testLargeData() throws Exception
-  {
-    // more than one chunk
-    for (int maxValue : MAX_VALUES) {
-      for (int chunkFactor : CHUNK_FACTORS) {
-        generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue);
-        checkSerializedSizeAndData(chunkFactor);
-      }
-    }
-  }
-
-  @Test
-  public void testWriteEmpty() throws Exception
-  {
-    vals = new int[0];
-    checkSerializedSizeAndData(2);
-  }
-
-
   private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception
   {
     File tmpDirectory = FileUtils.createTempDir(StringUtils.format("CompressedIntsIndexedWriterTest_%d", chunkFactor));
     FileSmoosher smoosher = new FileSmoosher(tmpDirectory);
 
     CompressedColumnarIntsSerializer writer = new CompressedColumnarIntsSerializer(
+        "test",
         segmentWriteOutMedium,
         chunkFactor,
         byteOrder,
@@ -223,16 +272,4 @@ public class CompressedColumnarIntsSerializerTest
     CloseQuietly.close(columnarInts);
     mapper.close();
   }
-
-  @Test
-  public void testMultiValueFileLargeData() throws Exception
-  {
-    // more than one chunk
-    for (int maxValue : MAX_VALUES) {
-      for (int chunkFactor : CHUNK_FACTORS) {
-        generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue);
-        checkV2SerializedSizeAndData(chunkFactor);
-      }
-    }
-  }
 }
diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
similarity index 64%
copy from processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
copy to processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
index 4b78b52..284aea3 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
@@ -20,13 +20,19 @@
 package org.apache.druid.segment.data;
 
 import com.google.common.base.Supplier;
-import com.google.common.primitives.Floats;
+import com.google.common.primitives.Doubles;
 import it.unimi.dsi.fastutil.ints.IntArrays;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.CloseQuietly;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -42,8 +48,15 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * This is a copy-pasta of {@link CompressedFloatsSerdeTest} without {@link CompressedFloatsSerdeTest#testSupplierSerde}
+ * because doubles do not have a supplier serde (e.g. {@link CompressedColumnarFloatsSupplier} or
+ * {@link CompressedColumnarLongsSupplier}).
+ *
+ * It is not important that it remain a copy, the committer is just lazy
+ */
 @RunWith(Parameterized.class)
-public class CompressedFloatsSerdeTest
+public class CompressedDoublesSerdeTest
 {
   @Parameterized.Parameters(name = "{0} {1} {2}")
   public static Iterable<Object[]> compressionStrategies()
@@ -58,22 +71,35 @@ public class CompressedFloatsSerdeTest
 
   private static final double DELTA = 0.00001;
 
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   protected final CompressionStrategy compressionStrategy;
   protected final ByteOrder order;
 
-  private final float[] values0 = {};
-  private final float[] values1 = {0f, 1f, 1f, 0f, 1f, 1f, 1f, 1f, 0f, 0f, 1f, 1f};
-  private final float[] values2 = {13.2f, 6.1f, 0.001f, 123f, 12572f, 123.1f, 784.4f, 6892.8634f, 8.341111f};
-  private final float[] values3 = {0.001f, 0.001f, 0.001f, 0.001f, 0.001f, 100f, 100f, 100f, 100f, 100f};
-  private final float[] values4 = {0f, 0f, 0f, 0f, 0.01f, 0f, 0f, 0f, 21.22f, 0f, 0f, 0f, 0f, 0f, 0f};
-  private final float[] values5 = {123.16f, 1.12f, 62.00f, 462.12f, 517.71f, 56.54f, 971.32f, 824.22f, 472.12f, 625.26f};
-  private final float[] values6 = {1000000f, 1000001f, 1000002f, 1000003f, 1000004f, 1000005f, 1000006f, 1000007f, 1000008f};
-  private final float[] values7 = {
-      Float.POSITIVE_INFINITY, Float.NEGATIVE_INFINITY, 12378.5734f, -12718243.7496f, -93653653.1f, 12743153.385534f,
-      21431.414538f, 65487435436632.123f, -43734526234564.65f
+  private final double[] values0 = {};
+  private final double[] values1 = {0, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1};
+  private final double[] values2 = {13.2, 6.1, 0.001, 123, 12572, 123.1, 784.4, 6892.8634, 8.341111};
+  private final double[] values3 = {0.001, 0.001, 0.001, 0.001, 0.001, 100, 100, 100, 100, 100};
+  private final double[] values4 = {0, 0, 0, 0, 0.01, 0, 0, 0, 21.22, 0, 0, 0, 0, 0, 0};
+  private final double[] values5 = {123.16, 1.12, 62.00, 462.12, 517.71, 56.54, 971.32, 824.22, 472.12, 625.26};
+  private final double[] values6 = {1000000, 1000001, 1000002, 1000003, 1000004, 1000005, 1000006, 1000007, 1000008};
+  private final double[] values7 = {
+      Double.POSITIVE_INFINITY,
+      Double.NEGATIVE_INFINITY,
+      12378.5734,
+      -12718243.7496,
+      -93653653.1,
+      12743153.385534,
+      21431.414538,
+      65487435436632.123,
+      -43734526234564.65
   };
 
-  public CompressedFloatsSerdeTest(
+  public CompressedDoublesSerdeTest(
       CompressionStrategy compressionStrategy,
       ByteOrder order
   )
@@ -98,16 +124,44 @@ public class CompressedFloatsSerdeTest
   @Test
   public void testChunkSerde() throws Exception
   {
-    float[] chunk = new float[10000];
+    double[] chunk = new double[10000];
     for (int i = 0; i < 10000; i++) {
       chunk[i] = i;
     }
     testWithValues(chunk);
   }
 
-  public void testWithValues(float[] values) throws Exception
+  // this test takes ~45 minutes to run
+  @Ignore
+  @Test
+  public void testTooManyValues() throws IOException
+  {
+    expectedException.expect(ColumnCapacityExceededException.class);
+    expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
+    try (
+        SegmentWriteOutMedium segmentWriteOutMedium =
+            TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
+    ) {
+      ColumnarDoublesSerializer serializer = CompressionFactory.getDoubleSerializer(
+          "test",
+          segmentWriteOutMedium,
+          "test",
+          order,
+          compressionStrategy
+      );
+      serializer.open();
+
+      final long numRows = Integer.MAX_VALUE + 100L;
+      for (long i = 0L; i < numRows; i++) {
+        serializer.add(ThreadLocalRandom.current().nextDouble());
+      }
+    }
+  }
+
+  public void testWithValues(double[] values) throws Exception
   {
-    ColumnarFloatsSerializer serializer = CompressionFactory.getFloatSerializer(
+    ColumnarDoublesSerializer serializer = CompressionFactory.getDoubleSerializer(
+        "test",
         new OffHeapMemorySegmentWriteOutMedium(),
         "test",
         order,
@@ -115,7 +169,7 @@ public class CompressedFloatsSerdeTest
     );
     serializer.open();
 
-    for (float value : values) {
+    for (double value : values) {
       serializer.add(value);
     }
     Assert.assertEquals(values.length, serializer.size());
@@ -123,27 +177,26 @@ public class CompressedFloatsSerdeTest
     final ByteArrayOutputStream baos = new ByteArrayOutputStream();
     serializer.writeTo(Channels.newChannel(baos), null);
     Assert.assertEquals(baos.size(), serializer.getSerializedSize());
-    CompressedColumnarFloatsSupplier supplier = CompressedColumnarFloatsSupplier
+    Supplier<ColumnarDoubles> supplier = CompressedColumnarDoublesSuppliers
         .fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order);
-    ColumnarFloats floats = supplier.get();
+    ColumnarDoubles doubles = supplier.get();
 
-    assertIndexMatchesVals(floats, values);
+    assertIndexMatchesVals(doubles, values);
     for (int i = 0; i < 10; i++) {
       int a = (int) (ThreadLocalRandom.current().nextDouble() * values.length);
       int b = (int) (ThreadLocalRandom.current().nextDouble() * values.length);
       int start = a < b ? a : b;
       int end = a < b ? b : a;
-      tryFill(floats, values, start, end - start);
+      tryFill(doubles, values, start, end - start);
     }
-    testSupplierSerde(supplier, values);
-    testConcurrentThreadReads(supplier, floats, values);
+    testConcurrentThreadReads(supplier, doubles, values);
 
-    floats.close();
+    doubles.close();
   }
 
-  private void tryFill(ColumnarFloats indexed, float[] vals, final int startIndex, final int size)
+  private void tryFill(ColumnarDoubles indexed, double[] vals, final int startIndex, final int size)
   {
-    float[] filled = new float[size];
+    double[] filled = new double[size];
     indexed.get(filled, startIndex, filled.length);
 
     for (int i = startIndex; i < filled.length; i++) {
@@ -151,7 +204,7 @@ public class CompressedFloatsSerdeTest
     }
   }
 
-  private void assertIndexMatchesVals(ColumnarFloats indexed, float[] vals)
+  private void assertIndexMatchesVals(ColumnarDoubles indexed, double[] vals)
   {
     Assert.assertEquals(vals.length, indexed.size());
 
@@ -171,25 +224,12 @@ public class CompressedFloatsSerdeTest
     }
   }
 
-  private void testSupplierSerde(CompressedColumnarFloatsSupplier supplier, float[] vals) throws IOException
-  {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    supplier.writeTo(Channels.newChannel(baos), null);
-
-    final byte[] bytes = baos.toByteArray();
-    Assert.assertEquals(supplier.getSerializedSize(), bytes.length);
-    CompressedColumnarFloatsSupplier anotherSupplier = CompressedColumnarFloatsSupplier.fromByteBuffer(
-        ByteBuffer.wrap(bytes), order
-    );
-    ColumnarFloats indexed = anotherSupplier.get();
-    assertIndexMatchesVals(indexed, vals);
-  }
-
   // This test attempts to cause a race condition with the DirectByteBuffers, it's non-deterministic in causing it,
   // which sucks but I can't think of a way to deterministically cause it...
   private void testConcurrentThreadReads(
-      final Supplier<ColumnarFloats> supplier,
-      final ColumnarFloats indexed, final float[] vals
+      final Supplier<ColumnarDoubles> supplier,
+      final ColumnarDoubles indexed,
+      final double[] vals
   ) throws Exception
   {
     final AtomicReference<String> reason = new AtomicReference<String>("none");
@@ -216,9 +256,9 @@ public class CompressedFloatsSerdeTest
         try {
           for (int i = 0; i < numRuns; ++i) {
             for (int j = 0; j < indexed.size(); ++j) {
-              final float val = vals[j];
-              final float indexedVal = indexed.get(j);
-              if (Floats.compare(val, indexedVal) != 0) {
+              final double val = vals[j];
+              final double indexedVal = indexed.get(j);
+              if (Doubles.compare(val, indexedVal) != 0) {
                 failureHappened.set(true);
                 reason.set(StringUtils.format("Thread1[%d]: %f != %f", j, val, indexedVal));
                 stopLatch.countDown();
@@ -237,7 +277,7 @@ public class CompressedFloatsSerdeTest
       }
     }).start();
 
-    final ColumnarFloats indexed2 = supplier.get();
+    final ColumnarDoubles indexed2 = supplier.get();
     try {
       new Thread(new Runnable()
       {
@@ -255,9 +295,9 @@ public class CompressedFloatsSerdeTest
           try {
             for (int i = 0; i < numRuns; ++i) {
               for (int j = indexed2.size() - 1; j >= 0; --j) {
-                final float val = vals[j];
-                final float indexedVal = indexed2.get(j);
-                if (Floats.compare(val, indexedVal) != 0) {
+                final double val = vals[j];
+                final double indexedVal = indexed2.get(j);
+                if (Doubles.compare(val, indexedVal) != 0) {
                   failureHappened.set(true);
                   reason.set(StringUtils.format("Thread2[%d]: %f != %f", j, val, indexedVal));
                   stopLatch.countDown();
diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
index 4b78b52..d11c089 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
@@ -25,8 +25,14 @@ import it.unimi.dsi.fastutil.ints.IntArrays;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.CloseQuietly;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -58,6 +64,12 @@ public class CompressedFloatsSerdeTest
 
   private static final double DELTA = 0.00001;
 
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   protected final CompressionStrategy compressionStrategy;
   protected final ByteOrder order;
 
@@ -105,9 +117,37 @@ public class CompressedFloatsSerdeTest
     testWithValues(chunk);
   }
 
+  // this test takes ~30 minutes to run
+  @Ignore
+  @Test
+  public void testTooManyValues() throws IOException
+  {
+    expectedException.expect(ColumnCapacityExceededException.class);
+    expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
+    try (
+        SegmentWriteOutMedium segmentWriteOutMedium =
+            TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
+    ) {
+      ColumnarFloatsSerializer serializer = CompressionFactory.getFloatSerializer(
+          "test",
+          segmentWriteOutMedium,
+          "test",
+          order,
+          compressionStrategy
+      );
+      serializer.open();
+
+      final long numRows = Integer.MAX_VALUE + 100L;
+      for (long i = 0L; i < numRows; i++) {
+        serializer.add(ThreadLocalRandom.current().nextFloat());
+      }
+    }
+  }
+
   public void testWithValues(float[] values) throws Exception
   {
     ColumnarFloatsSerializer serializer = CompressionFactory.getFloatSerializer(
+        "test",
         new OffHeapMemorySegmentWriteOutMedium(),
         "test",
         order,
diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
index ca6dd69..1186d32 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
@@ -95,6 +95,7 @@ public class CompressedLongsAutoEncodingSerdeTest
   public void testValues(long[] values) throws Exception
   {
     ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer(
+        "test",
         new OffHeapMemorySegmentWriteOutMedium(),
         "test",
         order,
diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
index 0fde252..675c494 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
@@ -25,8 +25,14 @@ import it.unimi.dsi.fastutil.ints.IntArrays;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.CloseQuietly;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -58,6 +64,12 @@ public class CompressedLongsSerdeTest
     return data;
   }
 
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   protected final CompressionFactory.LongEncodingStrategy encodingStrategy;
   protected final CompressionStrategy compressionStrategy;
   protected final ByteOrder order;
@@ -121,6 +133,38 @@ public class CompressedLongsSerdeTest
     testWithValues(chunk);
   }
 
+  // this test takes ~50 minutes to run (even skipping 'auto')
+  @Ignore
+  @Test
+  public void testTooManyValues() throws IOException
+  {
+    // uncomment this if 'auto' encoded long unbounded heap usage gets put in check and this can actually pass
+    if (encodingStrategy.equals(CompressionFactory.LongEncodingStrategy.AUTO)) {
+      return;
+    }
+    expectedException.expect(ColumnCapacityExceededException.class);
+    expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
+    try (
+        SegmentWriteOutMedium segmentWriteOutMedium =
+            TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
+    ) {
+      ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer(
+          "test",
+          segmentWriteOutMedium,
+          "test",
+          order,
+          encodingStrategy,
+          compressionStrategy
+      );
+      serializer.open();
+
+      final long numRows = Integer.MAX_VALUE + 100L;
+      for (long i = 0L; i < numRows; i++) {
+        serializer.add(ThreadLocalRandom.current().nextLong());
+      }
+    }
+  }
+
   public void testWithValues(long[] values) throws Exception
   {
     testValues(values);
@@ -130,6 +174,7 @@ public class CompressedLongsSerdeTest
   public void testValues(long[] values) throws Exception
   {
     ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer(
+        "test",
         new OffHeapMemorySegmentWriteOutMedium(),
         "test",
         order,
diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java
index 150dd0d..5ba8420 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java
@@ -32,22 +32,27 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import org.apache.druid.segment.writeout.WriteOutBytes;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 
 @RunWith(Parameterized.class)
 public class CompressedVSizeColumnarIntsSerializerTest
@@ -62,6 +67,9 @@ public class CompressedVSizeColumnarIntsSerializerTest
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   public CompressedVSizeColumnarIntsSerializerTest(
       CompressionStrategy compressionStrategy,
       ByteOrder byteOrder
@@ -108,8 +116,9 @@ public class CompressedVSizeColumnarIntsSerializerTest
   private void checkSerializedSizeAndData(int chunkSize) throws Exception
   {
     FileSmoosher smoosher = new FileSmoosher(temporaryFolder.newFolder());
-
+    final String columnName = "test";
     CompressedVSizeColumnarIntsSerializer writer = new CompressedVSizeColumnarIntsSerializer(
+        columnName,
         segmentWriteOutMedium,
         "test",
         vals.length > 0 ? Ints.max(vals) : 0,
@@ -170,6 +179,44 @@ public class CompressedVSizeColumnarIntsSerializerTest
     }
   }
 
+
+  // this test takes ~18 minutes to run
+  @Ignore
+  @Test
+  public void testTooManyValues() throws IOException
+  {
+    final int maxValue = 0x0FFFFFFF;
+    final int maxChunkSize = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
+    expectedException.expect(ColumnCapacityExceededException.class);
+    expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
+    try (
+        SegmentWriteOutMedium segmentWriteOutMedium =
+            TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
+    ) {
+      GenericIndexedWriter genericIndexed = GenericIndexedWriter.ofCompressedByteBuffers(
+          segmentWriteOutMedium,
+          "test",
+          compressionStrategy,
+          Long.BYTES * 10000
+      );
+      CompressedVSizeColumnarIntsSerializer serializer = new CompressedVSizeColumnarIntsSerializer(
+          "test",
+          segmentWriteOutMedium,
+          maxValue,
+          maxChunkSize,
+          byteOrder,
+          compressionStrategy,
+          genericIndexed
+      );
+      serializer.open();
+
+      final long numRows = Integer.MAX_VALUE + 100L;
+      for (long i = 0L; i < numRows; i++) {
+        serializer.addValue(ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE));
+      }
+    }
+  }
+
   @Test
   public void testEmpty() throws Exception
   {
@@ -181,7 +228,7 @@ public class CompressedVSizeColumnarIntsSerializerTest
   {
     File tmpDirectory = temporaryFolder.newFolder();
     FileSmoosher smoosher = new FileSmoosher(tmpDirectory);
-
+    final String columnName = "test";
     GenericIndexedWriter genericIndexed = GenericIndexedWriter.ofCompressedByteBuffers(
         segmentWriteOutMedium,
         "test",
@@ -189,6 +236,7 @@ public class CompressedVSizeColumnarIntsSerializerTest
         Long.BYTES * 10000
     );
     CompressedVSizeColumnarIntsSerializer writer = new CompressedVSizeColumnarIntsSerializer(
+        columnName,
         segmentWriteOutMedium,
         vals.length > 0 ? Ints.max(vals) : 0,
         chunkSize,
diff --git a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java
index bb9f0b2..de6485f 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java
@@ -32,11 +32,14 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
 import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import org.apache.druid.segment.writeout.WriteOutBytes;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -54,6 +57,7 @@ import java.util.stream.IntStream;
 @RunWith(Parameterized.class)
 public class V3CompressedVSizeColumnarMultiIntsSerializerTest
 {
+  private static final String TEST_COLUMN_NAME = "test";
   private static final int[] OFFSET_CHUNK_FACTORS = new int[]{
       1,
       2,
@@ -69,6 +73,9 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
   @Rule
   public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   public V3CompressedVSizeColumnarMultiIntsSerializerTest(
       CompressionStrategy compressionStrategy,
       ByteOrder byteOrder
@@ -92,19 +99,101 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
     );
   }
 
-  private void generateVals(final int totalSize, final int maxValue)
+  @Before
+  public void setUp()
+  {
+    vals = null;
+  }
+
+  @Test
+  public void testSmallData() throws Exception
+  {
+    // less than one chunk
+    for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
+      for (int maxValue : MAX_VALUES) {
+        final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
+        generateVals(rand.nextInt(valueChunk), maxValue, 2);
+        checkSerializedSizeAndData(offsetChunk, valueChunk);
+      }
+    }
+  }
+
+  @Test
+  public void testLargeData() throws Exception
   {
-    vals = new ArrayList<>(totalSize);
-    for (int i = 0; i < totalSize; ++i) {
-      int len = rand.nextInt(2) + 1;
-      int[] subVals = new int[len];
-      for (int j = 0; j < len; ++j) {
-        subVals[j] = rand.nextInt(maxValue);
+    // more than one chunk
+    for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
+      for (int maxValue : MAX_VALUES) {
+        final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
+        generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue, 2);
+        checkSerializedSizeAndData(offsetChunk, valueChunk);
+      }
+    }
+  }
+
+  @Test
+  public void testEmpty() throws Exception
+  {
+    vals = new ArrayList<>();
+    checkSerializedSizeAndData(1, 2);
+  }
+
+  @Test
+  public void testMultiValueFileLargeData() throws Exception
+  {
+    // more than one chunk
+    for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
+      for (int maxValue : MAX_VALUES) {
+        final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
+        generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue, 2);
+        checkV2SerializedSizeAndData(offsetChunk, valueChunk);
       }
-      vals.add(subVals);
     }
   }
 
+  // this test takes ~30 minutes to run
+  @Ignore
+  @Test
+  public void testTooManyValues() throws Exception
+  {
+    expectedException.expect(ColumnCapacityExceededException.class);
+    expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
+    // more than one chunk
+    final int offsetChunk = CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER;
+    final int maxValue = 0x0FFFFFFF;
+    final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
+    final int numRows = 10000000;
+    final int maxValuesPerRow = 1000;
+    generateV2SerializedSizeAndData(numRows, maxValue, maxValuesPerRow, offsetChunk, valueChunk);
+  }
+
+  private void generateVals(final int rowCount, final int maxValue, final int numValuesPerRow)
+  {
+    vals = new ArrayList<>(rowCount);
+    for (int i = 0; i < rowCount; ++i) {
+      vals.add(generateRow(rand, maxValue, numValuesPerRow));
+    }
+  }
+
+  private int[] generateRow(Random rand, final int maxValue, final int numValuesPerRow)
+  {
+    int len = rand.nextInt(numValuesPerRow) + 1;
+    int[] subVals = new int[len];
+    for (int j = 0; j < len; ++j) {
+      subVals[j] = rand.nextInt(maxValue);
+    }
+    return subVals;
+  }
+
+  private int getMaxValue(final List<int[]> vals)
+  {
+    return vals
+        .stream()
+        .mapToInt(array -> IntStream.of(array).max().orElse(0))
+        .max()
+        .orElseThrow(NoSuchElementException::new);
+  }
+
   private void checkSerializedSizeAndData(int offsetChunkFactor, int valueChunkFactor) throws Exception
   {
     FileSmoosher smoosher = new FileSmoosher(temporaryFolder.newFolder());
@@ -112,6 +201,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
     try (SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium()) {
       int maxValue = vals.size() > 0 ? getMaxValue(vals) : 0;
       CompressedColumnarIntsSerializer offsetWriter = new CompressedColumnarIntsSerializer(
+          TEST_COLUMN_NAME,
           segmentWriteOutMedium,
           "offset",
           offsetChunkFactor,
@@ -119,6 +209,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
           compressionStrategy
       );
       CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer(
+          TEST_COLUMN_NAME,
           segmentWriteOutMedium,
           "value",
           maxValue,
@@ -127,7 +218,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
           compressionStrategy
       );
       V3CompressedVSizeColumnarMultiIntsSerializer writer =
-          new V3CompressedVSizeColumnarMultiIntsSerializer(offsetWriter, valueWriter);
+          new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, offsetWriter, valueWriter);
       V3CompressedVSizeColumnarMultiIntsSupplier supplierFromIterable =
           V3CompressedVSizeColumnarMultiIntsSupplier.fromIterable(
               Iterables.transform(vals, ArrayBasedIndexedInts::new),
@@ -167,54 +258,6 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
     }
   }
 
-  private int getMaxValue(final List<int[]> vals)
-  {
-    return vals
-        .stream()
-        .mapToInt(array -> IntStream.of(array).max().orElse(0))
-        .max()
-        .orElseThrow(NoSuchElementException::new);
-  }
-
-  @Before
-  public void setUp()
-  {
-    vals = null;
-  }
-
-  @Test
-  public void testSmallData() throws Exception
-  {
-    // less than one chunk
-    for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
-      for (int maxValue : MAX_VALUES) {
-        final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
-        generateVals(rand.nextInt(valueChunk), maxValue);
-        checkSerializedSizeAndData(offsetChunk, valueChunk);
-      }
-    }
-  }
-
-  @Test
-  public void testLargeData() throws Exception
-  {
-    // more than one chunk
-    for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
-      for (int maxValue : MAX_VALUES) {
-        final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
-        generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue);
-        checkSerializedSizeAndData(offsetChunk, valueChunk);
-      }
-    }
-  }
-
-  @Test
-  public void testEmpty() throws Exception
-  {
-    vals = new ArrayList<>();
-    checkSerializedSizeAndData(1, 2);
-  }
-
   private void checkV2SerializedSizeAndData(int offsetChunkFactor, int valueChunkFactor) throws Exception
   {
     File tmpDirectory = FileUtils.createTempDir(StringUtils.format(
@@ -227,6 +270,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
 
     try (SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium()) {
       CompressedColumnarIntsSerializer offsetWriter = new CompressedColumnarIntsSerializer(
+          TEST_COLUMN_NAME,
           segmentWriteOutMedium,
           offsetChunkFactor,
           byteOrder,
@@ -246,6 +290,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
           Long.BYTES * 250000
       );
       CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer(
+          TEST_COLUMN_NAME,
           segmentWriteOutMedium,
           maxValue,
           valueChunkFactor,
@@ -254,7 +299,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
           genericIndexed
       );
       V3CompressedVSizeColumnarMultiIntsSerializer writer =
-          new V3CompressedVSizeColumnarMultiIntsSerializer(offsetWriter, valueWriter);
+          new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, offsetWriter, valueWriter);
       writer.open();
       for (int[] val : vals) {
         writer.addValues(new ArrayBasedIndexedInts(val));
@@ -282,16 +327,76 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
     }
   }
 
-  @Test
-  public void testMultiValueFileLargeData() throws Exception
+  private void generateV2SerializedSizeAndData(long numRows, int maxValue, int maxValuesPerRow, int offsetChunkFactor, int valueChunkFactor) throws Exception
   {
-    // more than one chunk
-    for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
-      for (int maxValue : MAX_VALUES) {
-        final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
-        generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue);
-        checkV2SerializedSizeAndData(offsetChunk, valueChunk);
+    File tmpDirectory = FileUtils.createTempDir(StringUtils.format(
+        "CompressedVSizeIndexedV3WriterTest_%d_%d",
+        offsetChunkFactor,
+        offsetChunkFactor
+    ));
+    FileSmoosher smoosher = new FileSmoosher(tmpDirectory);
+
+    try (
+        SegmentWriteOutMedium segmentWriteOutMedium =
+            TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
+    ) {
+      CompressedColumnarIntsSerializer offsetWriter = new CompressedColumnarIntsSerializer(
+          TEST_COLUMN_NAME,
+          segmentWriteOutMedium,
+          offsetChunkFactor,
+          byteOrder,
+          compressionStrategy,
+          GenericIndexedWriter.ofCompressedByteBuffers(
+              segmentWriteOutMedium,
+              "offset",
+              compressionStrategy,
+              Long.BYTES * 250000
+          )
+      );
+
+      GenericIndexedWriter genericIndexed = GenericIndexedWriter.ofCompressedByteBuffers(
+          segmentWriteOutMedium,
+          "value",
+          compressionStrategy,
+          Long.BYTES * 250000
+      );
+      CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer(
+          TEST_COLUMN_NAME,
+          segmentWriteOutMedium,
+          maxValue,
+          valueChunkFactor,
+          byteOrder,
+          compressionStrategy,
+          genericIndexed
+      );
+      V3CompressedVSizeColumnarMultiIntsSerializer writer =
+          new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, offsetWriter, valueWriter);
+      writer.open();
+      for (long l = 0L; l < numRows; l++) {
+        writer.addValues(new ArrayBasedIndexedInts(generateRow(rand, maxValue, maxValuesPerRow)));
       }
+
+      final SmooshedWriter channel = smoosher.addWithSmooshedWriter("test", writer.getSerializedSize());
+      writer.writeTo(channel, smoosher);
+      channel.close();
+      smoosher.close();
+      SmooshedFileMapper mapper = Smoosh.map(tmpDirectory);
+
+      V3CompressedVSizeColumnarMultiIntsSupplier supplierFromByteBuffer =
+          V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(mapper.mapFile("test"), byteOrder, mapper);
+      ColumnarMultiInts columnarMultiInts = supplierFromByteBuffer.get();
+      Assert.assertEquals(columnarMultiInts.size(), numRows);
+      Random verifier = new Random(0);
+      for (int i = 0; i < numRows; ++i) {
+        IndexedInts subVals = columnarMultiInts.get(i);
+        int[] expected = generateRow(verifier, maxValue, maxValuesPerRow);
+        Assert.assertEquals(subVals.size(), expected.length);
+        for (int j = 0, size = subVals.size(); j < size; ++j) {
+          Assert.assertEquals(subVals.get(j), expected[j]);
+        }
+      }
+      CloseQuietly.close(columnarMultiInts);
+      mapper.close();
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org