You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ch...@apache.org on 2022/09/07 03:01:05 UTC

[druid] branch master updated: Improve String Last/First Storage Efficiency (#12879)

This is an automated email from the ASF dual-hosted git repository.

cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ed26e2d634 Improve String Last/First Storage Efficiency (#12879)
ed26e2d634 is described below

commit ed26e2d6348772e91977ae67b747105ac0e5e5f3
Author: sr <sa...@gmail.com>
AuthorDate: Tue Sep 6 20:00:54 2022 -0700

    Improve String Last/First Storage Efficiency (#12879)
    
    -Add classes for writing cell values in LZ4 block compressed format.
    Payloads are indexed by element number for efficient random lookup
    -update SerializablePairLongStringComplexMetricSerde to use block
    compression
    -SerializablePairLongStringComplexMetricSerde also uses delta encoding
    of the Long by doing 2-pass encoding: buffers first to find min/max
    numbers and delta-encodes as integers if possible
    
    Entry points for doing block-compressed storage of byte[] payloads
    are the CellWriter and CellReader class. See
    SerializablePairLongStringComplexMetricSerde for how these are used
    along with how to do full column-based storage (delta encoding here)
    which includes 2-pass encoding to compute a column header
---
 .../apache/druid/jackson/AggregatorsModule.java    |   4 +-
 .../SerializablePairLongStringBufferStore.java     | 162 ++++++++
 .../SerializablePairLongStringColumnHeader.java    | 111 ++++++
 ...SerializablePairLongStringColumnSerializer.java | 123 +++++++
 .../SerializablePairLongStringComplexColumn.java   | 133 +++++++
 ...ializablePairLongStringComplexMetricSerde.java} |  78 ++--
 ...zablePairLongStringDeltaEncodedStagedSerde.java | 124 +++++++
 ...erializablePairLongStringSimpleStagedSerde.java |  95 +++++
 .../druid/query/aggregation/SerializedStorage.java | 153 ++++++++
 .../serde/cell/BlockCompressedPayloadBuffer.java   | 122 +++++++
 .../cell/BlockCompressedPayloadBufferFactory.java  |  65 ++++
 .../serde/cell/BlockCompressedPayloadReader.java   | 173 +++++++++
 .../cell/BlockCompressedPayloadSerializer.java     |  58 +++
 .../serde/cell/BlockCompressedPayloadWriter.java   | 125 +++++++
 .../druid/segment/serde/cell/BlockIndexWriter.java |  30 ++
 .../segment/serde/cell/ByteBufferProvider.java     |  34 ++
 .../druid/segment/serde/cell/CellIndexReader.java  |  54 +++
 .../druid/segment/serde/cell/CellIndexWriter.java  |  76 ++++
 .../druid/segment/serde/cell/CellReader.java       | 176 +++++++++
 .../druid/segment/serde/cell/CellWriter.java       | 239 ++++++++++++
 .../druid/segment/serde/cell/IOIterator.java       |  33 ++
 .../druid/segment/serde/cell/IndexWriter.java      |  80 ++++
 .../druid/segment/serde/cell/IntIndexView.java     |  69 ++++
 .../druid/segment/serde/cell/IntSerializer.java    |  45 +++
 .../druid/segment/serde/cell/LongSerializer.java   |  43 +++
 .../cell/NativeClearedByteBufferProvider.java      |  45 +++
 .../druid/segment/serde/cell/NumberSerializer.java |  29 ++
 .../druid/segment/serde/cell/PayloadEntrySpan.java |  42 +++
 .../druid/segment/serde/cell/StagedSerde.java      | 103 ++++++
 .../druid/segment/serde/cell/StorableBuffer.java   |  54 +++
 .../SerializablePairLongStringBufferStoreTest.java | 388 ++++++++++++++++++++
 ...izablePairLongStringComplexMetricSerdeTest.java | 237 ++++++++++++
 ...ePairLongStringDeltaEncodedStagedSerdeTest.java |  99 +++++
 ...lizablePairLongStringSimpleStagedSerdeTest.java |  70 ++++
 .../SingleValueColumnValueSelector.java            |  79 ++++
 .../first/StringFirstTimeseriesQueryTest.java      |   4 +-
 .../last/StringLastTimeseriesQueryTest.java        |   4 +-
 .../druid/segment/serde/ComplexMetricsTest.java    |   6 +-
 .../BlockCompressedPayloadWriterReaderTest.java    |  61 ++++
 .../BlockCompressedPayloadWriterToBytesWriter.java |  98 +++++
 .../segment/serde/cell/ByteWriterTestHelper.java   | 406 +++++++++++++++++++++
 .../segment/serde/cell/BytesReadWriteTest.java     |  49 +++
 .../segment/serde/cell/BytesReadWriteTestBase.java | 254 +++++++++++++
 .../serde/cell/BytesReadWriteTestCases.java        |  28 ++
 .../druid/segment/serde/cell/BytesWriter.java      |  43 +++
 .../segment/serde/cell/BytesWriterBuilder.java     |  38 ++
 .../segment/serde/cell/CellWriterReaderTest.java   | 110 ++++++
 .../serde/cell/CellWriterToBytesWriter.java        |  98 +++++
 .../segment/serde/cell/RandomStringUtils.java      |  49 +++
 .../druid/segment/serde/cell/TestCaseResult.java   |  54 +++
 .../druid/segment/serde/cell/TestCasesConfig.java  | 199 ++++++++++
 51 files changed, 5002 insertions(+), 50 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
index 155b8e7e7b..9f79273050 100644
--- a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
+++ b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
@@ -39,7 +39,7 @@ import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
 import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.aggregation.PostAggregator;
-import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
+import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde;
 import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory;
 import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory;
 import org.apache.druid.query.aggregation.any.LongAnyAggregatorFactory;
@@ -81,7 +81,7 @@ public class AggregatorsModule extends SimpleModule
 
     ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
     ComplexMetrics.registerSerde("preComputedHyperUnique", new PreComputedHyperUniquesSerde());
-    ComplexMetrics.registerSerde("serializablePairLongString", new SerializablePairLongStringSerde());
+    ComplexMetrics.registerSerde("serializablePairLongString", new SerializablePairLongStringComplexMetricSerde());
 
     setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
     setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStore.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStore.java
new file mode 100644
index 0000000000..843e1a4eff
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStore.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.serde.Serializer;
+import org.apache.druid.segment.serde.cell.ByteBufferProvider;
+import org.apache.druid.segment.serde.cell.CellWriter;
+import org.apache.druid.segment.serde.cell.IOIterator;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+public class SerializablePairLongStringBufferStore
+{
+  private final SerializedStorage<SerializablePairLongString> serializedStorage;
+
+  private long minValue = Long.MAX_VALUE;
+  private long maxValue = Long.MIN_VALUE;
+
+  public SerializablePairLongStringBufferStore(SerializedStorage<SerializablePairLongString> serializedStorage)
+  {
+    this.serializedStorage = serializedStorage;
+  }
+
+  public void store(@Nullable SerializablePairLongString pairLongString) throws IOException
+  {
+    if (pairLongString != null && pairLongString.lhs != null) {
+      minValue = Math.min(minValue, pairLongString.lhs);
+      maxValue = Math.max(maxValue, pairLongString.lhs);
+    }
+
+    serializedStorage.store(pairLongString);
+  }
+
+  /**
+   * each call transfers the temporary buffer into an encoded, block-compessed buffer of the segment. It is ready to be
+   * transferred to a {@link WritableByteChannel}
+   *
+   * @param byteBufferProvider    - provides a ByteBuffer used for block compressed encoding
+   * @param segmentWriteOutMedium - used to create temporary storage
+   * @return encoded buffer ready to be stored
+   * @throws IOException
+   */
+  public TransferredBuffer transferToRowWriter(
+      ByteBufferProvider byteBufferProvider,
+      SegmentWriteOutMedium segmentWriteOutMedium
+  ) throws IOException
+  {
+    SerializablePairLongStringColumnHeader columnHeader = createColumnHeader();
+    SerializablePairLongStringDeltaEncodedStagedSerde serde =
+        new SerializablePairLongStringDeltaEncodedStagedSerde(
+            columnHeader.getMinValue(),
+            columnHeader.isUseIntegerDeltas()
+        );
+
+    // try-with-resources will call cellWriter.close() an extra time in the normal case, but it protects against
+    // buffer leaking in the case of an exception (close() is idempotent). In the normal path, close() performs some
+    // finalization of the CellWriter object. We want that object state finalized before creating the TransferredBuffer
+    // as a point of good style (though strictly speaking, it works fine to pass it in before calling close since
+    // TransferredBuffer does not do anything in the constructor with the object)
+    try (CellWriter cellWriter =
+             new CellWriter.Builder(segmentWriteOutMedium).setByteBufferProvider(byteBufferProvider).build()) {
+      try (IOIterator<SerializablePairLongString> bufferIterator = iterator()) {
+        while (bufferIterator.hasNext()) {
+          SerializablePairLongString pairLongString = bufferIterator.next();
+          byte[] serialized = serde.serialize(pairLongString);
+
+          cellWriter.write(serialized);
+        }
+
+        cellWriter.close();
+
+        return new TransferredBuffer(cellWriter, columnHeader);
+      }
+    }
+  }
+
+  @Nonnull
+  public SerializablePairLongStringColumnHeader createColumnHeader()
+  {
+    long maxDelta = maxValue - minValue;
+    SerializablePairLongStringColumnHeader columnHeader;
+
+    if (minValue < maxValue && maxDelta < 0 || minValue > maxValue) {
+      // true iff
+      // 1. we have overflow in our range || 2. we have only seen null values
+      // in this case, effectively disable delta encoding by using longs and a min value of 0
+      maxDelta = Long.MAX_VALUE;
+      minValue = 0;
+    }
+
+    if (maxDelta <= Integer.MAX_VALUE) {
+      columnHeader = new SerializablePairLongStringColumnHeader(
+          SerializablePairLongStringComplexMetricSerde.EXPECTED_VERSION,
+          true,
+          minValue
+      );
+    } else {
+      columnHeader = new SerializablePairLongStringColumnHeader(
+          SerializablePairLongStringComplexMetricSerde.EXPECTED_VERSION,
+          false,
+          minValue
+      );
+    }
+    return columnHeader;
+  }
+
+  public IOIterator<SerializablePairLongString> iterator() throws IOException
+  {
+    return serializedStorage.iterator();
+  }
+
+  /**
+   * contains serialized data that is compressed and delta-encoded (Long)
+   * It's ready to be transferred to a {@link WritableByteChannel}
+   */
+  public static class TransferredBuffer implements Serializer
+  {
+    private final CellWriter cellWriter;
+    private final SerializablePairLongStringColumnHeader columnHeader;
+
+    public TransferredBuffer(CellWriter cellWriter, SerializablePairLongStringColumnHeader columnHeader)
+    {
+      this.cellWriter = cellWriter;
+      this.columnHeader = columnHeader;
+    }
+
+    @Override
+    public void writeTo(WritableByteChannel channel, @Nullable FileSmoosher smoosher) throws IOException
+    {
+      columnHeader.transferTo(channel);
+      cellWriter.writeTo(channel, smoosher);
+    }
+
+    @Override
+    public long getSerializedSize()
+    {
+      return columnHeader.getSerializedSize() + cellWriter.getSerializedSize();
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnHeader.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnHeader.java
new file mode 100644
index 0000000000..e0aa21af03
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnHeader.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import org.apache.druid.segment.serde.cell.LongSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+public class SerializablePairLongStringColumnHeader
+{
+  // header size is 4 bytes for word alignment for LZ4 (minmatch) compression
+  private static final int HEADER_SIZE_BYTES = 4;
+  private static final int USE_INTEGER_MASK = 0x80;
+  private static final int VERSION_INDEX = 0;
+  private static final int ENCODING_INDEX = 1;
+
+  private final byte[] bytes;
+  private final long minValue;
+
+  private SerializablePairLongStringColumnHeader(byte[] bytes, long minTimestamp)
+  {
+    this.bytes = bytes;
+    this.minValue = minTimestamp;
+  }
+
+  public SerializablePairLongStringColumnHeader(int version, boolean useIntegerDeltas, long minTimestamp)
+  {
+    this.minValue = minTimestamp;
+    bytes = new byte[HEADER_SIZE_BYTES];
+    Preconditions.checkArgument(version <= 255, "max version 255");
+    bytes[VERSION_INDEX] = (byte) version;
+
+    if (useIntegerDeltas) {
+      bytes[ENCODING_INDEX] |= USE_INTEGER_MASK;
+    }
+  }
+
+  public static SerializablePairLongStringColumnHeader fromBuffer(ByteBuffer byteBuffer)
+  {
+    byte[] bytes = new byte[HEADER_SIZE_BYTES];
+
+    byteBuffer.get(bytes);
+
+    long minTimestamp = byteBuffer.getLong();
+
+    return new SerializablePairLongStringColumnHeader(bytes, minTimestamp);
+  }
+
+  public SerializablePairLongStringDeltaEncodedStagedSerde createSerde()
+  {
+    return new SerializablePairLongStringDeltaEncodedStagedSerde(minValue, isUseIntegerDeltas());
+  }
+
+  public void transferTo(WritableByteChannel channel) throws IOException
+  {
+    LongSerializer longSerializer = new LongSerializer();
+
+    channel.write(ByteBuffer.wrap(bytes));
+    channel.write(longSerializer.serialize(minValue));
+  }
+
+  public int getVersion()
+  {
+    return 0XFF & bytes[VERSION_INDEX];
+  }
+
+  public boolean isUseIntegerDeltas()
+  {
+    return (bytes[ENCODING_INDEX] & USE_INTEGER_MASK) != 0;
+  }
+
+  public long getMinValue()
+  {
+    return minValue;
+  }
+
+  public int getSerializedSize()
+  {
+    return HEADER_SIZE_BYTES + Long.BYTES;
+  }
+
+  @Override
+  public String toString()
+  {
+    return Objects.toStringHelper(this)
+                  .add("bytes", bytes)
+                  .add("minValue", minValue)
+                  .toString();
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnSerializer.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnSerializer.java
new file mode 100644
index 0000000000..3930ee6060
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnSerializer.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.GenericColumnSerializer;
+import org.apache.druid.segment.serde.cell.ByteBufferProvider;
+import org.apache.druid.segment.serde.cell.StagedSerde;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * valid call sequence
+ * <p>
+ * open()+serialize()*(getSerializedSize()|writeTo())*
+ * <p>
+ * getSerializedSize() / writeTo() effectively function as a close call, but each may be called multiple times and has
+ * no effect on one another.
+ */
+@SuppressWarnings("NotNullFieldNotInitialized")
+public class SerializablePairLongStringColumnSerializer implements GenericColumnSerializer<SerializablePairLongString>
+{
+  public static final StagedSerde<SerializablePairLongString> STAGED_SERDE =
+      new SerializablePairLongStringSimpleStagedSerde();
+
+  private final SegmentWriteOutMedium segmentWriteOutMedium;
+  private final ByteBufferProvider byteBufferProvider;
+
+  private State state = State.START;
+  private SerializablePairLongStringBufferStore bufferStore;
+  private SerializablePairLongStringBufferStore.TransferredBuffer transferredBuffer;
+
+  public SerializablePairLongStringColumnSerializer(
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      ByteBufferProvider byteBufferProvider
+  )
+  {
+    this.segmentWriteOutMedium = segmentWriteOutMedium;
+    this.byteBufferProvider = byteBufferProvider;
+  }
+
+  @Override
+  public void open() throws IOException
+  {
+    Preconditions.checkState(state == State.START || state == State.OPEN, "open called in invalid state %s", state);
+
+    if (state == State.START) {
+      bufferStore = new SerializablePairLongStringBufferStore(
+          new SerializedStorage<>(segmentWriteOutMedium.makeWriteOutBytes(), STAGED_SERDE)
+      );
+      state = State.OPEN;
+    }
+  }
+
+  @Override
+  public void serialize(ColumnValueSelector<? extends SerializablePairLongString> selector) throws IOException
+  {
+    Preconditions.checkState(state == State.OPEN, "serialize called in invalid state %s", state);
+
+    SerializablePairLongString pairLongString = selector.getObject();
+
+    bufferStore.store(pairLongString);
+  }
+
+  @Override
+  public long getSerializedSize() throws IOException
+  {
+    Preconditions.checkState(
+        state != State.START,
+        "getSerializedSize called in invalid state %s (must have opened at least)",
+        state
+    );
+
+    transferToRowWriterIfNecessary();
+
+    return transferredBuffer.getSerializedSize();
+  }
+
+  @Override
+  public void writeTo(WritableByteChannel channel, @Nullable FileSmoosher smoosher) throws IOException
+  {
+    Preconditions.checkState(state != State.START, "writeTo called in invalid state %s", state);
+    transferToRowWriterIfNecessary();
+    transferredBuffer.writeTo(channel, smoosher);
+  }
+
+  private void transferToRowWriterIfNecessary() throws IOException
+  {
+    if (state == State.OPEN) {
+      transferredBuffer = bufferStore.transferToRowWriter(byteBufferProvider, segmentWriteOutMedium);
+      state = State.CLOSED;
+    }
+  }
+
+  private enum State
+  {
+    START,
+    OPEN,
+    CLOSED,
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexColumn.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexColumn.java
new file mode 100644
index 0000000000..d3b4dcfbb4
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexColumn.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.column.ComplexColumn;
+import org.apache.druid.segment.serde.cell.ByteBufferProvider;
+import org.apache.druid.segment.serde.cell.CellReader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class SerializablePairLongStringComplexColumn implements ComplexColumn
+{
+  private final Closer closer;
+  private final int serializedSize;
+  private final CellReader cellReader;
+  private final SerializablePairLongStringDeltaEncodedStagedSerde serde;
+
+  public SerializablePairLongStringComplexColumn(
+      CellReader cellReader,
+      SerializablePairLongStringDeltaEncodedStagedSerde serde,
+      Closer closer,
+      int serializedSize
+  )
+  {
+    this.cellReader = cellReader;
+    this.serde = serde;
+    this.closer = closer;
+    this.serializedSize = serializedSize;
+  }
+
+  @Override
+  public Class<?> getClazz()
+  {
+    return SerializablePairLongString.class;
+  }
+
+  @Override
+  public String getTypeName()
+  {
+    return SerializablePairLongStringComplexMetricSerde.TYPE_NAME;
+  }
+
+  @SuppressWarnings("ConstantConditions")
+  @Override
+  public Object getRowValue(int rowNum)
+  {
+    // This can return nulls, meaning that it is expected that anything reading from this does
+    // something "good" with null.  At time of writing, the relevan taggregators handle null properly
+    return serde.deserialize(cellReader.getCell(rowNum));
+  }
+
+  @Override
+  public int getLength()
+  {
+    return serializedSize;
+  }
+
+  @Override
+  public void close()
+  {
+    try {
+      closer.close();
+    }
+    catch (IOException e) {
+      throw new RE(e, "error closing " + getClass().getName());
+    }
+  }
+
+  public static class Builder
+  {
+    private final int serializedSize;
+    private final SerializablePairLongStringDeltaEncodedStagedSerde serde;
+    private final CellReader.Builder cellReaderBuilder;
+
+    public Builder(ByteBuffer buffer)
+    {
+      ByteBuffer masterByteBuffer = buffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder());
+
+      serializedSize = masterByteBuffer.remaining();
+
+      SerializablePairLongStringColumnHeader columnHeader =
+          SerializablePairLongStringColumnHeader.fromBuffer(masterByteBuffer);
+
+      Preconditions.checkArgument(
+          columnHeader.getVersion() == SerializablePairLongStringComplexMetricSerde.EXPECTED_VERSION,
+          "version %s expected, got %s",
+          SerializablePairLongStringComplexMetricSerde.EXPECTED_VERSION,
+          columnHeader.getVersion()
+      );
+      serde = columnHeader.createSerde();
+      cellReaderBuilder = new CellReader.Builder(masterByteBuffer);
+    }
+
+    public Builder setByteBufferProvider(ByteBufferProvider byteBufferProvider)
+    {
+      cellReaderBuilder.setByteBufferProvider(byteBufferProvider);
+
+      return this;
+    }
+
+    public SerializablePairLongStringComplexColumn build()
+    {
+      Closer closer = Closer.create();
+      CellReader cellReader = cellReaderBuilder.build();
+
+      closer.register(cellReader);
+
+      return new SerializablePairLongStringComplexColumn(cellReader, serde, closer, serializedSize);
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java
similarity index 61%
rename from processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSerde.java
rename to processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java
index 49300ff531..deb1623701 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSerde.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java
@@ -21,7 +21,6 @@ package org.apache.druid.query.aggregation;
 
 import org.apache.druid.collections.SerializablePair;
 import org.apache.druid.data.input.InputRow;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.GenericColumnSerializer;
 import org.apache.druid.segment.column.ColumnBuilder;
 import org.apache.druid.segment.data.GenericIndexed;
@@ -29,7 +28,7 @@ import org.apache.druid.segment.data.ObjectStrategy;
 import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
 import org.apache.druid.segment.serde.ComplexMetricExtractor;
 import org.apache.druid.segment.serde.ComplexMetricSerde;
-import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
+import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
 import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
 
 import javax.annotation.Nullable;
@@ -39,14 +38,19 @@ import java.util.Comparator;
 /**
  * The SerializablePairLongStringSerde serializes a Long-String pair (SerializablePairLongString).
  * The serialization structure is: Long:Integer:String
+ * The Long is delta-encoded for the column in order to potentially reduce the size to an integer so it may be stored
+ * as: Integer:Integer:String
+ * <p>
+ * Future work: dictionary encoding of the String may be performed
  * <p>
  * The class is used on first/last String aggregators to store the time and the first/last string.
- * Long:Integer:String -> Timestamp:StringSize:StringData
+ * [Integer|Long]:Integer:String -> delta:StringSize:StringData --(delta decoded)--> TimeStamp:StringSize:StringData
+ * (see {@link SerializablePairLongStringDeltaEncodedStagedSerde )}
  */
-public class SerializablePairLongStringSerde extends ComplexMetricSerde
+public class SerializablePairLongStringComplexMetricSerde extends ComplexMetricSerde
 {
-
-  private static final String TYPE_NAME = "serializablePairLongString";
+  public static final int EXPECTED_VERSION = 3;
+  public static final String TYPE_NAME = "serializablePairLongString";
   // Null SerializablePairLongString values are put first
   private static final Comparator<SerializablePairLongString> COMPARATOR = Comparator.nullsFirst(
       // assumes that the LHS of the pair will never be null
@@ -54,6 +58,9 @@ public class SerializablePairLongStringSerde extends ComplexMetricSerde
                 .thenComparing(SerializablePair::getRhs, Comparator.nullsFirst(Comparator.naturalOrder()))
   );
 
+  private static final SerializablePairLongStringSimpleStagedSerde SERDE =
+      new SerializablePairLongStringSimpleStagedSerde();
+
   @Override
   public String getTypeName()
   {
@@ -61,9 +68,9 @@ public class SerializablePairLongStringSerde extends ComplexMetricSerde
   }
 
   @Override
-  public ComplexMetricExtractor getExtractor()
+  public ComplexMetricExtractor<?> getExtractor()
   {
-    return new ComplexMetricExtractor()
+    return new ComplexMetricExtractor<Object>()
     {
       @Override
       public Class<SerializablePairLongString> extractedClass()
@@ -82,12 +89,21 @@ public class SerializablePairLongStringSerde extends ComplexMetricSerde
   @Override
   public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder)
   {
-    final GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper());
-    columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
+    byte version = buffer.get(buffer.position());
+
+    if (version == 0 || version == 1 || version == 2) {
+      GenericIndexed<?> column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper());
+      columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
+    } else {
+      SerializablePairLongStringComplexColumn.Builder builder =
+          new SerializablePairLongStringComplexColumn.Builder(buffer)
+              .setByteBufferProvider(NativeClearedByteBufferProvider.INSTANCE);
+      columnBuilder.setComplexColumnSupplier(builder::build);
+    }
   }
 
   @Override
-  public ObjectStrategy getObjectStrategy()
+  public ObjectStrategy<?> getObjectStrategy()
   {
     return new ObjectStrategy<SerializablePairLongString>()
     {
@@ -106,48 +122,28 @@ public class SerializablePairLongStringSerde extends ComplexMetricSerde
       @Override
       public SerializablePairLongString fromByteBuffer(ByteBuffer buffer, int numBytes)
       {
-        final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
-
-        long lhs = readOnlyBuffer.getLong();
-        int stringSize = readOnlyBuffer.getInt();
+        ByteBuffer readOnlyByteBuffer = buffer.asReadOnlyBuffer().order(buffer.order());
 
-        String lastString = null;
-        if (stringSize > 0) {
-          byte[] stringBytes = new byte[stringSize];
-          readOnlyBuffer.get(stringBytes, 0, stringSize);
-          lastString = StringUtils.fromUtf8(stringBytes);
-        }
+        readOnlyByteBuffer.limit(buffer.position() + numBytes);
 
-        return new SerializablePairLongString(lhs, lastString);
+        return SERDE.deserialize(readOnlyByteBuffer);
       }
 
+      @SuppressWarnings("NullableProblems")
       @Override
       public byte[] toBytes(SerializablePairLongString val)
       {
-        String rhsString = val.rhs;
-        ByteBuffer bbuf;
-
-        if (rhsString != null) {
-          byte[] rhsBytes = StringUtils.toUtf8(rhsString);
-          bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + rhsBytes.length);
-          bbuf.putLong(val.lhs);
-          bbuf.putInt(Long.BYTES, rhsBytes.length);
-          bbuf.position(Long.BYTES + Integer.BYTES);
-          bbuf.put(rhsBytes);
-        } else {
-          bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
-          bbuf.putLong(val.lhs);
-          bbuf.putInt(Long.BYTES, 0);
-        }
-
-        return bbuf.array();
+        return SERDE.serialize(val);
       }
     };
   }
 
   @Override
-  public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
+  public GenericColumnSerializer<?> getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
   {
-    return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
+    return new SerializablePairLongStringColumnSerializer(
+        segmentWriteOutMedium,
+        NativeClearedByteBufferProvider.INSTANCE
+    );
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerde.java
new file mode 100644
index 0000000000..e1c070127b
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerde.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.serde.cell.StagedSerde;
+import org.apache.druid.segment.serde.cell.StorableBuffer;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * serializes a Long/String pair in the context of a column/segment. Uses the minValue to perform delta
+ * encoding/decoding and if the range of the segment fits in an integer (useIntegerDelta), the format is
+ * Integer:Integer:bytes
+ *
+ * otherwise
+ * Long:Integer:bytes
+ */
+public class SerializablePairLongStringDeltaEncodedStagedSerde implements StagedSerde<SerializablePairLongString>
+{
+  private final long minValue;
+  private final boolean useIntegerDelta;
+
+  public SerializablePairLongStringDeltaEncodedStagedSerde(long minValue, boolean useIntegerDelta)
+  {
+    this.minValue = minValue;
+    this.useIntegerDelta = useIntegerDelta;
+  }
+
+  @Override
+  public StorableBuffer serializeDelayed(@Nullable SerializablePairLongString value)
+  {
+    if (value == null) {
+      return StorableBuffer.EMPTY;
+    }
+
+    String rhsString = value.rhs;
+    byte[] rhsBytes = StringUtils.toUtf8WithNullToEmpty(rhsString);
+
+    return new StorableBuffer()
+    {
+      @Override
+      public void store(ByteBuffer byteBuffer)
+      {
+        Preconditions.checkNotNull(value.lhs, "Long in SerializablePairLongString must be non-null");
+
+        long delta = value.lhs - minValue;
+
+        Preconditions.checkState(delta >= 0 || delta == value.lhs);
+
+        if (useIntegerDelta) {
+          byteBuffer.putInt(Ints.checkedCast(delta));
+        } else {
+          byteBuffer.putLong(delta);
+        }
+
+        byteBuffer.putInt(rhsBytes.length);
+
+        if (rhsBytes.length > 0) {
+          byteBuffer.put(rhsBytes);
+        }
+      }
+
+      @Override
+      public int getSerializedSize()
+      {
+        return (useIntegerDelta ? Integer.BYTES : Long.BYTES) + Integer.BYTES + rhsBytes.length;
+      }
+    };
+  }
+
+  @Nullable
+  @Override
+  public SerializablePairLongString deserialize(ByteBuffer byteBuffer)
+  {
+    if (byteBuffer.remaining() == 0) {
+      return null;
+    }
+
+    ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder());
+    long lhs;
+
+    if (useIntegerDelta) {
+      lhs = readOnlyBuffer.getInt();
+    } else {
+      lhs = readOnlyBuffer.getLong();
+    }
+
+    lhs += minValue;
+
+    int stringSize = readOnlyBuffer.getInt();
+    String lastString = null;
+
+    if (stringSize > 0) {
+      byte[] stringBytes = new byte[stringSize];
+
+      readOnlyBuffer.get(stringBytes, 0, stringSize);
+      lastString = StringUtils.fromUtf8(stringBytes);
+    }
+
+    return new SerializablePairLongString(lhs, lastString);
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerde.java
new file mode 100644
index 0000000000..acdd937d2a
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerde.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.serde.cell.StagedSerde;
+import org.apache.druid.segment.serde.cell.StorableBuffer;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * serializes a Long/String pair as
+ * Long:Integer:bytes
+ * <p>
+ * or
+ * Long:StringSize:StringData
+ */
+public class SerializablePairLongStringSimpleStagedSerde implements StagedSerde<SerializablePairLongString>
+{
+  @Override
+  public StorableBuffer serializeDelayed(@Nullable SerializablePairLongString value)
+  {
+    if (value == null) {
+      return StorableBuffer.EMPTY;
+    }
+
+    String rhsString = value.rhs;
+    byte[] rhsBytes = StringUtils.toUtf8WithNullToEmpty(rhsString);
+
+    return new StorableBuffer()
+    {
+      @Override
+      public void store(ByteBuffer byteBuffer)
+      {
+        Preconditions.checkNotNull(value.lhs, "Long in SerializablePairLongString must be non-null");
+
+        byteBuffer.putLong(value.lhs);
+        byteBuffer.putInt(rhsBytes.length);
+
+        if (rhsBytes.length > 0) {
+          byteBuffer.put(rhsBytes);
+        }
+      }
+
+      @Override
+      public int getSerializedSize()
+      {
+        return Long.BYTES + Integer.BYTES + rhsBytes.length;
+      }
+    };
+  }
+
+  @Nullable
+  @Override
+  public SerializablePairLongString deserialize(ByteBuffer byteBuffer)
+  {
+    if (byteBuffer.remaining() == 0) {
+      return null;
+    }
+
+    ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder());
+    long lhs = readOnlyBuffer.getLong();
+    int stringSize = readOnlyBuffer.getInt();
+    String lastString = null;
+
+    if (stringSize > 0) {
+      byte[] stringBytes = new byte[stringSize];
+
+      readOnlyBuffer.get(stringBytes, 0, stringSize);
+      lastString = StringUtils.fromUtf8(stringBytes);
+    }
+
+    return new SerializablePairLongString(lhs, lastString);
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java
new file mode 100644
index 0000000000..a6fee46b3c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.segment.serde.cell.IOIterator;
+import org.apache.druid.segment.serde.cell.IntSerializer;
+import org.apache.druid.segment.serde.cell.StagedSerde;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import javax.annotation.Nullable;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.NoSuchElementException;
+
+/**
+ * simple utility class useful for when multiple passes of input are needed for encoding (e.g. delta or dictionary
+ * encoding).
+ * <p/>
+ * This allows objects to be serialized to some temporary storage and iterated over for final processing.
+ * <p/>
+ * @param <T>
+ */
+public class SerializedStorage<T>
+{
+  private final WriteOutBytes writeOutBytes;
+  private final StagedSerde<T> serde;
+  private final IntSerializer intSerializer = new IntSerializer();
+
+  public SerializedStorage(WriteOutBytes writeOutBytes, StagedSerde<T> serde)
+  {
+    this.writeOutBytes = writeOutBytes;
+    this.serde = serde;
+  }
+
+  public void store(@Nullable T value) throws IOException
+  {
+    byte[] bytes = serde.serialize(value);
+
+    writeOutBytes.write(intSerializer.serialize(bytes.length));
+    writeOutBytes.write(bytes);
+  }
+
+  public IOIterator<T> iterator() throws IOException
+  {
+    return new DeserializingIOIterator<>(writeOutBytes.asInputStream(), serde);
+  }
+
+  private static class DeserializingIOIterator<T> implements IOIterator<T>
+  {
+    private static final int NEEDS_READ = -2;
+    private static final int EOF = -1;
+
+    private final byte[] intBytes;
+    private final BufferedInputStream inputStream;
+    private final StagedSerde<T> serde;
+
+    private int nextSize;
+
+    public DeserializingIOIterator(InputStream inputStream, StagedSerde<T> serde)
+    {
+      this.inputStream = new BufferedInputStream(inputStream);
+      this.serde = serde;
+      intBytes = new byte[Integer.BYTES];
+      nextSize = NEEDS_READ;
+    }
+
+    @Override
+    public boolean hasNext() throws IOException
+    {
+      return getNextSize() > EOF;
+    }
+
+    @Override
+    public T next() throws IOException
+    {
+      int currentNextSize = getNextSize();
+
+      if (currentNextSize == -1) {
+        throw new NoSuchElementException("end of buffer reached");
+      }
+
+      byte[] nextBytes = new byte[currentNextSize];
+      int bytesRead = 0;
+
+      while (bytesRead < currentNextSize) {
+        int result = inputStream.read(nextBytes, bytesRead, currentNextSize - bytesRead);
+
+        if (result == -1) {
+          throw new NoSuchElementException("unexpected end of buffer reached");
+        }
+
+        bytesRead += result;
+      }
+
+      Preconditions.checkState(bytesRead == currentNextSize);
+      T value = serde.deserialize(nextBytes);
+
+      nextSize = NEEDS_READ;
+
+      return value;
+    }
+
+    private int getNextSize() throws IOException
+    {
+      if (nextSize == NEEDS_READ) {
+        int bytesRead = 0;
+
+        while (bytesRead < Integer.BYTES) {
+          int result = inputStream.read(intBytes, bytesRead, Integer.BYTES - bytesRead);
+
+          if (result == -1) {
+            nextSize = EOF;
+            return EOF;
+          } else {
+            bytesRead += result;
+          }
+        }
+        Preconditions.checkState(bytesRead == Integer.BYTES);
+
+        nextSize = ByteBuffer.wrap(intBytes).order(ByteOrder.nativeOrder()).getInt();
+      }
+
+      return nextSize;
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+      inputStream.close();
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadBuffer.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadBuffer.java
new file mode 100644
index 0000000000..cf5309b6a3
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadBuffer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class BlockCompressedPayloadBuffer implements Closeable
+{
+  private final ByteBuffer currentBlock;
+  private final ByteBuffer compressedByteBuffer;
+  private final BlockIndexWriter blockIndexWriter;
+  private final WriteOutBytes dataOutBytes;
+  private final Closer closer;
+  private final CompressionStrategy.Compressor compressor;
+
+  private boolean open = true;
+
+  public BlockCompressedPayloadBuffer(
+      ByteBuffer currentBlock,
+      ByteBuffer compressedByteBuffer,
+      BlockIndexWriter blockIndexWriter,
+      WriteOutBytes dataOutBytes,
+      Closer closer,
+      CompressionStrategy.Compressor compressor
+  )
+  {
+    currentBlock.clear();
+    compressedByteBuffer.clear();
+    this.currentBlock = currentBlock;
+    this.compressedByteBuffer = compressedByteBuffer;
+    this.closer = closer;
+    this.blockIndexWriter = blockIndexWriter;
+    this.dataOutBytes = dataOutBytes;
+    this.compressor = compressor;
+  }
+
+  public void write(byte[] payload) throws IOException
+  {
+    Preconditions.checkNotNull(payload);
+    write(ByteBuffer.wrap(payload).order(ByteOrder.nativeOrder()));
+  }
+
+  public void write(ByteBuffer masterPayload) throws IOException
+  {
+    Preconditions.checkNotNull(masterPayload);
+    Preconditions.checkState(open, "cannot write to closed BlockCompressedPayloadWriter");
+    ByteBuffer payload = masterPayload.asReadOnlyBuffer().order(masterPayload.order());
+
+    while (payload.hasRemaining()) {
+      int writeSize = Math.min(payload.remaining(), currentBlock.remaining());
+
+      payload.limit(payload.position() + writeSize);
+      currentBlock.put(payload);
+
+      if (!currentBlock.hasRemaining()) {
+        flush();
+      }
+
+      payload.limit(masterPayload.limit());
+    }
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    closer.close();
+  }
+
+  public BlockCompressedPayloadSerializer closeToSerializer() throws IOException
+  {
+    if (open) {
+      if (currentBlock.position() > 0) {
+        flush();
+      }
+
+      blockIndexWriter.close();
+      closer.close();
+      open = false;
+    }
+
+    return new BlockCompressedPayloadSerializer(blockIndexWriter, dataOutBytes);
+  }
+
+  private void flush() throws IOException
+  {
+    Preconditions.checkState(open, "flush() on closed BlockCompressedPayloadWriter");
+    currentBlock.flip();
+
+    ByteBuffer actualCompressedByteBuffer = compressor.compress(currentBlock, compressedByteBuffer);
+    int compressedBlockSize = actualCompressedByteBuffer.limit();
+
+    blockIndexWriter.persistAndIncrement(compressedBlockSize);
+    dataOutBytes.write(actualCompressedByteBuffer);
+    currentBlock.clear();
+    compressedByteBuffer.clear();
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadBufferFactory.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadBufferFactory.java
new file mode 100644
index 0000000000..d4bd6a4b9d
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadBufferFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class BlockCompressedPayloadBufferFactory
+{
+  private final ByteBufferProvider byteBufferProvider;
+  private final SegmentWriteOutMedium writeOutMedium;
+  private final CompressionStrategy.Compressor compressor;
+
+  public BlockCompressedPayloadBufferFactory(
+      ByteBufferProvider byteBufferProvider,
+      SegmentWriteOutMedium writeOutMedium,
+      CompressionStrategy.Compressor compressor
+  )
+  {
+    this.byteBufferProvider = byteBufferProvider;
+    this.writeOutMedium = writeOutMedium;
+    this.compressor = compressor;
+  }
+
+  public BlockCompressedPayloadBuffer create() throws IOException
+  {
+    Closer closer = Closer.create();
+    ResourceHolder<ByteBuffer> currentBlockHolder = byteBufferProvider.get();
+
+    closer.register(currentBlockHolder);
+
+    ByteBuffer compressedBlockByteBuffer = compressor.allocateOutBuffer(currentBlockHolder.get().limit(), closer);
+
+    return new BlockCompressedPayloadBuffer(
+        currentBlockHolder.get(),
+        compressedBlockByteBuffer,
+        new BlockIndexWriter(writeOutMedium.makeWriteOutBytes()),
+        writeOutMedium.makeWriteOutBytes(),
+        closer,
+        compressor
+    );
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadReader.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadReader.java
new file mode 100644
index 0000000000..2c6263be58
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadReader.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.data.CompressionStrategy;
+
+import javax.annotation.Nonnull;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class BlockCompressedPayloadReader implements Closeable
+{
+  private static final ByteBuffer NULL_CELL = ByteBuffer.wrap(new byte[0]);
+  private final IntIndexView blockIndexView;
+  private final ByteBuffer compressedBlocksByteBuffer;
+  private final ByteBuffer uncompressedByteBuffer;
+  private final Closer closer;
+  private final int blockSize;
+  private final long maxValidUncompressedOffset;
+  private final CompressionStrategy.Decompressor decompressor;
+
+  private int currentUncompressedBlockNumber = -1;
+
+  private BlockCompressedPayloadReader(
+      IntIndexView blockIndexView,
+      ByteBuffer compressedBlocksByteBuffer,
+      ByteBuffer uncompressedByteBuffer,
+      CompressionStrategy.Decompressor decompressor,
+      Closer closer
+  )
+  {
+    this.blockIndexView = blockIndexView;
+    this.compressedBlocksByteBuffer = compressedBlocksByteBuffer;
+    this.uncompressedByteBuffer = uncompressedByteBuffer;
+    this.closer = closer;
+    uncompressedByteBuffer.clear();
+    blockSize = uncompressedByteBuffer.remaining();
+    maxValidUncompressedOffset = Integer.MAX_VALUE * (long) blockSize;
+    this.decompressor = decompressor;
+  }
+
+  /**
+   * @param originalByteBuffer - buffer as written byte {@link BlockCompressedPayloadWriter}. Not modified.
+   * @param byteBufferProvider - should be native ordered ByteBuffer
+   * @param decompressor       - decompressor for block compression
+   * @return BlockCompressedPayloadReader
+   */
+  public static BlockCompressedPayloadReader create(
+      ByteBuffer originalByteBuffer,
+      ByteBufferProvider byteBufferProvider,
+      CompressionStrategy.Decompressor decompressor
+  )
+  {
+    ByteBuffer masterByteBuffer = originalByteBuffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder());
+
+    int blockIndexSize = masterByteBuffer.getInt();
+    ByteBuffer blockIndexBuffer = masterByteBuffer.asReadOnlyBuffer().order(masterByteBuffer.order());
+    blockIndexBuffer.limit(blockIndexBuffer.position() + blockIndexSize);
+
+    masterByteBuffer.position(masterByteBuffer.position() + blockIndexSize);
+
+    int dataStreamSize = masterByteBuffer.getInt();
+    ByteBuffer compressedBlockStreamByteBuffer = masterByteBuffer.asReadOnlyBuffer().order(masterByteBuffer.order());
+    compressedBlockStreamByteBuffer.limit(compressedBlockStreamByteBuffer.position() + dataStreamSize);
+
+    Closer closer = Closer.create();
+    ResourceHolder<ByteBuffer> byteBufferResourceHolder = byteBufferProvider.get();
+
+    closer.register(byteBufferResourceHolder);
+
+    return new BlockCompressedPayloadReader(
+        new IntIndexView(blockIndexBuffer),
+        compressedBlockStreamByteBuffer,
+        byteBufferResourceHolder.get(),
+        decompressor,
+        closer
+    );
+  }
+
+  public ByteBuffer read(long uncompressedStart, int size)
+  {
+    if (size == 0) {
+      return NULL_CELL;
+    }
+
+    Preconditions.checkArgument(uncompressedStart + size < maxValidUncompressedOffset);
+
+    int blockNumber = (int) (uncompressedStart / blockSize);
+    int blockOffset = (int) (uncompressedStart % blockSize);
+    ByteBuffer currentUncompressedBlock = getUncompressedBlock(blockNumber);
+
+    currentUncompressedBlock.position(blockOffset);
+
+    if (size <= currentUncompressedBlock.remaining()) {
+      ByteBuffer resultByteBuffer = currentUncompressedBlock.asReadOnlyBuffer().order(ByteOrder.nativeOrder());
+
+      resultByteBuffer.limit(blockOffset + size);
+
+      return resultByteBuffer;
+    } else {
+      byte[] payload = readMultiBlock(size, blockNumber, blockOffset);
+
+      return ByteBuffer.wrap(payload).order(ByteOrder.nativeOrder());
+    }
+  }
+
+  @Nonnull
+  private byte[] readMultiBlock(int size, int blockNumber, int blockOffset)
+  {
+    byte[] payload = new byte[size];
+    int bytesRead = 0;
+
+    do {
+      ByteBuffer currentUncompressedBlock = getUncompressedBlock(blockNumber);
+
+      currentUncompressedBlock.position(blockOffset);
+
+      int readSizeBytes = Math.min(size - bytesRead, currentUncompressedBlock.remaining());
+
+      currentUncompressedBlock.get(payload, bytesRead, readSizeBytes);
+      bytesRead += readSizeBytes;
+      blockNumber++;
+      blockOffset = 0;
+    } while (bytesRead < size);
+
+    return payload;
+  }
+
+  private ByteBuffer getUncompressedBlock(int blockNumber)
+  {
+    if (currentUncompressedBlockNumber != blockNumber) {
+      IntIndexView.EntrySpan span = blockIndexView.getEntrySpan(blockNumber);
+      ByteBuffer compressedBlock = compressedBlocksByteBuffer.asReadOnlyBuffer()
+                                                             .order(compressedBlocksByteBuffer.order());
+      compressedBlock.position(compressedBlock.position() + span.getStart());
+      compressedBlock.limit(compressedBlock.position() + span.getSize());
+      uncompressedByteBuffer.clear();
+
+      decompressor.decompress(compressedBlock, span.getSize(), uncompressedByteBuffer);
+      currentUncompressedBlockNumber = blockNumber;
+    }
+
+    return uncompressedByteBuffer;
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    closer.close();
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadSerializer.java
new file mode 100644
index 0000000000..9b3f4c4d30
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadSerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.serde.Serializer;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+public class BlockCompressedPayloadSerializer implements Serializer
+{
+  private final IntSerializer intSerializer = new IntSerializer();
+  private final BlockIndexWriter blockIndexWriter;
+  private final WriteOutBytes dataOutBytes;
+
+  public BlockCompressedPayloadSerializer(BlockIndexWriter blockIndexWriter, WriteOutBytes dataOutBytes)
+  {
+    this.blockIndexWriter = blockIndexWriter;
+    this.dataOutBytes = dataOutBytes;
+  }
+
+  @Override
+  public void writeTo(WritableByteChannel channel, @Nullable FileSmoosher smoosher) throws IOException
+  {
+    blockIndexWriter.transferTo(channel);
+    channel.write(intSerializer.serialize(dataOutBytes.size()));
+    dataOutBytes.writeTo(channel);
+  }
+
+  @Override
+  public long getSerializedSize()
+  {
+    return blockIndexWriter.getSerializedSize()
+           + intSerializer.getSerializedSize()
+           + Ints.checkedCast(dataOutBytes.size());
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriter.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriter.java
new file mode 100644
index 0000000000..794272fb24
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.serde.Serializer;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+public class BlockCompressedPayloadWriter implements Serializer, Closeable
+{
+  private final BlockCompressedPayloadBuffer buffer;
+  private BlockCompressedPayloadSerializer serializer;
+  private State state = State.OPEN;
+
+  private BlockCompressedPayloadWriter(BlockCompressedPayloadBuffer buffer)
+  {
+    this.buffer = buffer;
+  }
+
+  public void write(byte[] payload) throws IOException
+  {
+    Preconditions.checkState(state == State.OPEN);
+    buffer.write(payload);
+  }
+
+  public void write(ByteBuffer payload) throws IOException
+  {
+    Preconditions.checkState(state == State.OPEN);
+    buffer.write(payload);
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    if (state == State.OPEN) {
+      serializer = buffer.closeToSerializer();
+      state = State.CLOSED;
+    }
+  }
+
+  @Override
+  public void writeTo(WritableByteChannel channel, @Nullable FileSmoosher smoosher) throws IOException
+  {
+    Preconditions.checkState(state == State.CLOSED);
+    serializer.writeTo(channel, smoosher);
+  }
+
+  @Override
+  public long getSerializedSize()
+  {
+    Preconditions.checkState(state == State.CLOSED);
+    return serializer.getSerializedSize();
+  }
+
+  private enum State
+  {
+    OPEN,
+    CLOSED
+  }
+
+  public static class Builder
+  {
+    private ByteBufferProvider byteBufferProvider = NativeClearedByteBufferProvider.INSTANCE;
+    private final SegmentWriteOutMedium writeOutMedium;
+
+    private CompressionStrategy compressionStrategy = CompressionStrategy.LZ4;
+
+    public Builder(SegmentWriteOutMedium writeOutMedium)
+    {
+      this.writeOutMedium = writeOutMedium;
+    }
+
+    public Builder setCompressionStrategy(CompressionStrategy compressionStrategy)
+    {
+      this.compressionStrategy = compressionStrategy;
+
+      return this;
+    }
+
+    public Builder setByteBufferProvider(ByteBufferProvider byteBufferProvider)
+    {
+      this.byteBufferProvider = byteBufferProvider;
+
+      return this;
+    }
+
+    public BlockCompressedPayloadWriter build() throws IOException
+    {
+      BlockCompressedPayloadBufferFactory bufferFactory = new BlockCompressedPayloadBufferFactory(
+          byteBufferProvider,
+          writeOutMedium,
+          compressionStrategy.getCompressor()
+      );
+      BlockCompressedPayloadBuffer payloadBuffer = bufferFactory.create();
+      BlockCompressedPayloadWriter payloadWriter = new BlockCompressedPayloadWriter(payloadBuffer);
+
+      return payloadWriter;
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockIndexWriter.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockIndexWriter.java
new file mode 100644
index 0000000000..986628690c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockIndexWriter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+public class BlockIndexWriter extends IndexWriter
+{
+  public BlockIndexWriter(WriteOutBytes outBytes)
+  {
+    super(outBytes, new IntSerializer());
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/ByteBufferProvider.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/ByteBufferProvider.java
new file mode 100644
index 0000000000..170e046ec8
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/ByteBufferProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.collections.ResourceHolder;
+
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+
+public interface ByteBufferProvider extends Supplier<ResourceHolder<ByteBuffer>>
+{
+  /**
+   * @return a resource holder of a ByteBuffer
+   */
+  @Override
+  ResourceHolder<ByteBuffer> get();
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/CellIndexReader.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellIndexReader.java
new file mode 100644
index 0000000000..7346dab24f
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellIndexReader.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.primitives.Ints;
+
+import javax.annotation.Nonnull;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class CellIndexReader implements Closeable
+{
+  private final BlockCompressedPayloadReader payloadReader;
+
+  public CellIndexReader(BlockCompressedPayloadReader payloadReader)
+  {
+    this.payloadReader = payloadReader;
+  }
+
+  @Nonnull
+  public PayloadEntrySpan getEntrySpan(int entryNumber)
+  {
+    int position = entryNumber * Long.BYTES;
+    ByteBuffer payload = payloadReader.read(position, 2 * Long.BYTES);
+    long payloadValue = payload.getLong();
+    long nextPayloadValue = payload.getLong();
+
+    return new PayloadEntrySpan(payloadValue, Ints.checkedCast(nextPayloadValue - payloadValue));
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    payloadReader.close();
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/CellIndexWriter.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellIndexWriter.java
new file mode 100644
index 0000000000..1f311f56de
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellIndexWriter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.serde.Serializer;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+
+public class CellIndexWriter implements Serializer, Closeable
+{
+  private final LongSerializer longSerializer = new LongSerializer();
+  private final BlockCompressedPayloadWriter payloadWriter;
+
+  private long position = 0;
+  private boolean open = true;
+
+  public CellIndexWriter(BlockCompressedPayloadWriter payloadWriter)
+  {
+    this.payloadWriter = payloadWriter;
+  }
+
+  public void persistAndIncrement(int increment) throws IOException
+  {
+    Preconditions.checkArgument(increment >= 0);
+    Preconditions.checkState(open, "cannot write to closed CellIndex");
+    payloadWriter.write(longSerializer.serialize(position));
+    position += increment;
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    if (open) {
+      payloadWriter.write(longSerializer.serialize(position));
+      payloadWriter.close();
+      open = false;
+    }
+  }
+
+  @Override
+  public void writeTo(WritableByteChannel channel, @Nullable FileSmoosher smoosher) throws IOException
+  {
+    Preconditions.checkState(!open, "cannot transfer a CellIndex that is not closed and finalized");
+
+    payloadWriter.writeTo(channel, smoosher);
+  }
+
+  @Override
+  public long getSerializedSize()
+  {
+    return payloadWriter.getSerializedSize();
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/CellReader.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellReader.java
new file mode 100644
index 0000000000..678920e6d2
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellReader.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.data.CompressionStrategy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ *  CellReader is intended to read the data written byte {@link CellWriter}. The {@code CellWriter.writeTo()} method's
+ *  output must be made available as a ByteBuffer. While this provides relatively efficient random access, it is
+ *  optimized for sequential access by caching the last decompressed block in both the index (which is
+ *  block-compressed) and the data.
+ *  <p/>
+ *  A random access incurs the following costs:
+ *  <pre>
+ *    1. seek to compressed block location in index
+ *    2. decompress index block
+ *    3. read data location
+ *    4. decompress data block
+ *    5. wrap or copy data from uncompressed block (copy for data that spans more than one block)<br>
+ *  </pre>
+ *  Sequential access amortizes the decompression cost by storing the last decompressed block (cache of size 1,
+ *  effectively).
+ *  <p/>
+ *  Note also that the index itself is compressed, so random accesses potentially incur an additional decompression
+ *  step for large datasets.
+ *  <p/>
+ *  <pre>{@code
+ *  ByteBuffer byteBuffer = ....; // ByteBuffer created from writableChannel output of CellWriter.writeTo()
+ *  try (CellRead cellReader = new CellReader.Builder(byteBuffer).build()) {
+ *    for (int i = 0; i < numPayloads; i++) {
+ *      byte[] payload = cellReader.getCell(i);
+ *
+ *      processPayload(payload); // may deserialize and peform work
+ *    }
+ *  }
+ *  </pre>
+ *
+ *  While you may allocate your own 64k buffers, it is recommended you use {@code NativeClearedByteBufferProvider}
+ *  which provides direct 64k ByteBuffers from a pool, wrapped in a ResourceHolder. These objects may be
+ *  registered in a Closer
+ *
+ *  To enhance future random accesss, a decompressed block cache may be added of some size k (=10, etc)
+ *  At present, we effecitively have a block cache of size 1
+ */
+public class CellReader implements Closeable
+{
+  private final CellIndexReader cellIndexReader;
+  private final BlockCompressedPayloadReader dataReader;
+  private final Closer closer;
+
+  private CellReader(CellIndexReader cellIndexReader, BlockCompressedPayloadReader dataReader, Closer closer)
+  {
+    this.cellIndexReader = cellIndexReader;
+    this.dataReader = dataReader;
+    this.closer = closer;
+  }
+
+  public ByteBuffer getCell(int rowNumber)
+  {
+    PayloadEntrySpan payloadEntrySpan = cellIndexReader.getEntrySpan(rowNumber);
+    ByteBuffer payload = dataReader.read(payloadEntrySpan.getStart(), payloadEntrySpan.getSize());
+
+    return payload;
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    closer.close();
+  }
+
+  public static Builder builder(ByteBuffer originalByteBuffer)
+  {
+    return new Builder(originalByteBuffer);
+  }
+
+  public static class Builder
+  {
+    private final ByteBuffer cellIndexBuffer;
+    private final ByteBuffer dataStorageBuffer;
+
+    private CompressionStrategy compressionStrategy = CompressionStrategy.LZ4;
+    private ByteBufferProvider byteBufferProvider = NativeClearedByteBufferProvider.INSTANCE;
+
+    /**
+     * The default block size is 64k as provided by NativeClearedByteBufferProvider. You may change this, but
+     * be sure the size of the ByteBuffers match what the CelLWriter used as this is the block size stored. All
+     * reading will fail unpredictably if a different block size is used when reading
+     *
+     * @param originalByteBuffer - buffer from {@code CellWriter.writeTo()} as written to the WritableChannel
+     */
+    public Builder(ByteBuffer originalByteBuffer)
+    {
+      ByteBuffer masterByteBuffer = originalByteBuffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder());
+
+      int cellIndexSize = masterByteBuffer.getInt();
+      cellIndexBuffer = masterByteBuffer.asReadOnlyBuffer().order(masterByteBuffer.order());
+      cellIndexBuffer.limit(cellIndexBuffer.position() + cellIndexSize);
+
+      masterByteBuffer.position(masterByteBuffer.position() + cellIndexSize);
+
+      int dataStorageSize = masterByteBuffer.getInt();
+      dataStorageBuffer = masterByteBuffer.asReadOnlyBuffer().order(masterByteBuffer.order());
+      dataStorageBuffer.limit(dataStorageBuffer.position() + dataStorageSize);
+    }
+
+    /**
+     *
+     * @param compressionStrategy - this must match the CellWriter compressionStrategy. Default LZ4
+     * @return
+     */
+    public Builder setCompressionStrategy(CompressionStrategy compressionStrategy)
+    {
+      this.compressionStrategy = compressionStrategy;
+
+      return this;
+    }
+
+    /**
+     *
+     * @param byteBufferProvider - ByteBuffers returned must match the size used in the CellWriter
+     * @return Builder
+     */
+    public Builder setByteBufferProvider(ByteBufferProvider byteBufferProvider)
+    {
+      this.byteBufferProvider = byteBufferProvider;
+
+      return this;
+    }
+
+    public CellReader build()
+    {
+      Closer closer = Closer.create();
+      CellIndexReader cellIndexReader = new CellIndexReader(BlockCompressedPayloadReader.create(
+          cellIndexBuffer,
+          byteBufferProvider,
+          compressionStrategy.getDecompressor()
+      ));
+      BlockCompressedPayloadReader dataReader = BlockCompressedPayloadReader.create(
+          dataStorageBuffer,
+          byteBufferProvider,
+          compressionStrategy.getDecompressor()
+      );
+
+      closer.register(cellIndexReader);
+      closer.register(dataReader);
+
+      CellReader cellReader = new CellReader(cellIndexReader, dataReader, closer);
+
+      return cellReader;
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/CellWriter.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellWriter.java
new file mode 100644
index 0000000000..2c732d806c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellWriter.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.serde.Serializer;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * <h2>usage:</h2>
+ * <p/>
+ * CellReader effectively stores a list of byte[] payloads that are retrievable randomly by index. The entirety of
+ * the data is block compressed. For reading, see {@link CellReader}. Example usage:
+ *<p/>
+ * <pre>{@code
+ *
+ * StagedSerde<Fuu> fuuSerDe = new ...
+ * // note that cellWriter.close() *must* be called before writeTo() in order to finalize the index
+ * try (CellWriter cellWriter = new CellWriter.Builder(segmentWriteOutMedium).build()) {
+ *
+ *    fuuList.stream().map(fuuSerDe:serialize).forEach(cellWriter::write);
+ *  }
+ *  // at this point cellWriter contains the index and compressed data
+ *
+ *
+ *  // transfers the index and compressed data in the format specified below. This method is idempotent and copies
+ *  // the data each time.
+ *  cellWriter.writeTo(writableChannel, fileSmoosher); // 2nd argument currently unused, may be null
+ *
+ * } </pre>
+ * <p/>
+ * Note that for use with CellReader, the contents written to the writableChannel must be available as a ByteBuffer
+ * <p/>
+ * <h2>Internal Storage Details</h2>
+ * <p/>
+ * <pre>
+ * serialized data is of the form:
+ *
+ *    [cell index]
+ *    [payload storage]
+ *
+ * each of these items is stored in compressed streams of blocks with a block index.
+ *
+ * A BlockCompressedPayloadWriter stores byte[] payloads. These may be accessed by creating a
+ * BlockCompressedPayloadReader over the produced ByteBuffer. Reads may be done by giving a location in the
+ * uncompressed stream and a size
+ *
+ * NOTE: {@link BlockCompressedPayloadBuffer} does not store nulls on write(). However, the cellIndex stores an entry
+ * with a size of 0 for nulls and {@link CellReader} will return null for any null written
+ *
+ *  [blockIndexSize:int]
+ * |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ * |      block index
+ * |      compressed block # -> block start in compressed stream position (relative to data start)
+ * |
+ * |      0: [block position: int]
+ * |      1: [block position: int]
+ * |      ...
+ * |      i: [block position: int]
+ * |      ...
+ * |      n: [block position: int]
+ * |      n+1: [total compressed size ] // stored to simplify invariant of n+1 - n = length(n)
+ * |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ * [dataSize:int]
+ * |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ * | [compressed payload block 1]
+ * | [compressed payload block 2]
+ * | ...
+ * | [compressed paylod block n]
+ * |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ * the CellIndexWriter stores an array of longs using the BlockCompressedPayloadWriter
+ *
+ * logically this an array of longs
+ *
+ * |    0: start_0 : long
+ * |    1: start_1 : long
+ * |    ...
+ * |    n: start_n : long
+ * |    n+1: start_n + length_n : long  //ie, next position that would have been written to
+ * |                                   //used again for invariant of length_i = row_i+1 - row_i
+ * |
+ * |    but this will be stored as block compressed. Reads are done by addressing it as a long array of bytes
+ * |
+ * |    [block index size]
+ * |    [block index>
+ * |
+ * |    [data stream size]
+ * |    [block compressed payload stream]
+ *
+ * resulting in
+ *
+ * |    [cell index size]
+ * | ----cell index------------------------
+ * |    [block index size]
+ * |    [block index]
+ * |    [data stream size]
+ * |    [block compressed payload stream]
+ * | -------------------------------------
+ * |    [data stream size]
+ * | ----data stream------------------------
+ * |    [block index size]
+ * |    [block index]
+ * |    [data stream size]
+ * |    [block compressed payload stream]
+ * | -------------------------------------
+ * </pre>
+ */
+
+public class CellWriter implements Serializer, Closeable
+{
+  private final IntSerializer intSerializer = new IntSerializer();
+  private final CellIndexWriter cellIndexWriter;
+  private final BlockCompressedPayloadWriter payloadWriter;
+
+  private CellWriter(CellIndexWriter cellIndexWriter, BlockCompressedPayloadWriter payloadWriter)
+  {
+    this.cellIndexWriter = cellIndexWriter;
+    this.payloadWriter = payloadWriter;
+  }
+
+  public void write(byte[] cellBytes) throws IOException
+  {
+    if (cellBytes == null) {
+      cellIndexWriter.persistAndIncrement(0);
+    } else {
+      cellIndexWriter.persistAndIncrement(cellBytes.length);
+      payloadWriter.write(cellBytes);
+    }
+  }
+
+  public void write(ByteBuffer cellByteBuffer) throws IOException
+  {
+    if (cellByteBuffer == null) {
+      cellIndexWriter.persistAndIncrement(0);
+    } else {
+      cellIndexWriter.persistAndIncrement(cellByteBuffer.remaining());
+      payloadWriter.write(cellByteBuffer);
+    }
+  }
+
+  @Override
+  public void writeTo(WritableByteChannel channel, @Nullable FileSmoosher smoosher) throws IOException
+  {
+    channel.write(intSerializer.serialize(cellIndexWriter.getSerializedSize()));
+    cellIndexWriter.writeTo(channel, smoosher);
+    channel.write(intSerializer.serialize(payloadWriter.getSerializedSize()));
+    payloadWriter.writeTo(channel, smoosher);
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    cellIndexWriter.close();
+    payloadWriter.close();
+  }
+
+  @Override
+  public long getSerializedSize()
+  {
+    return intSerializer.getSerializedSize()
+           + cellIndexWriter.getSerializedSize()
+           + intSerializer.getSerializedSize()
+           + payloadWriter.getSerializedSize();
+  }
+
+  public static Builder builder(SegmentWriteOutMedium segmentWriteOutMedium)
+  {
+    return new Builder(segmentWriteOutMedium);
+  }
+
+  public static class Builder
+  {
+    private final BlockCompressedPayloadWriter.Builder blockCompressedPayloadWriterBuilder;
+
+    /**
+     * Default instance with a {@link NativeClearedByteBufferProvider}
+     *
+     * @param segmentWriteOutMedium - used store block-compressed index and data
+     */
+    public Builder(SegmentWriteOutMedium segmentWriteOutMedium)
+    {
+      blockCompressedPayloadWriterBuilder =
+          new BlockCompressedPayloadWriter.Builder(segmentWriteOutMedium);
+    }
+
+    /**
+     * change the compression strategy. The default is LZ4
+     *
+     * @param compressionStrategy - a valid {@link CompressionStrategy}
+     * @return
+     */
+    public Builder setCompressionStrategy(CompressionStrategy compressionStrategy)
+    {
+      blockCompressedPayloadWriterBuilder.setCompressionStrategy(compressionStrategy);
+
+      return this;
+    }
+
+    public Builder setByteBufferProvider(ByteBufferProvider byteBufferProvider)
+    {
+      blockCompressedPayloadWriterBuilder.setByteBufferProvider(byteBufferProvider);
+
+      return this;
+    }
+
+    public CellWriter build() throws IOException
+    {
+      BlockCompressedPayloadWriter cellIndexPayloadWriter = blockCompressedPayloadWriterBuilder.build();
+      BlockCompressedPayloadWriter payloadWriter = blockCompressedPayloadWriterBuilder.build();
+      CellIndexWriter cellIndexWriter = new CellIndexWriter(cellIndexPayloadWriter);
+
+      return new CellWriter(cellIndexWriter, payloadWriter);
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java
new file mode 100644
index 0000000000..887f1fb65a
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface IOIterator<T> extends Closeable
+{
+  boolean hasNext() throws IOException;
+
+  T next() throws IOException;
+
+  @Override
+  void close() throws IOException;
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/IndexWriter.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/IndexWriter.java
new file mode 100644
index 0000000000..46f26ad988
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/IndexWriter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+public class IndexWriter
+{
+  private final WriteOutBytes outBytes;
+  private final NumberSerializer positionSerializer;
+  private final NumberSerializer indexSizeSerializer;
+
+  private boolean open = true;
+  private long position = 0;
+
+  public IndexWriter(
+      WriteOutBytes outBytes,
+      NumberSerializer positionSerializer,
+      NumberSerializer indexSizeSerializer
+  )
+  {
+    this.outBytes = outBytes;
+    this.positionSerializer = positionSerializer;
+    this.indexSizeSerializer = indexSizeSerializer;
+  }
+
+  public IndexWriter(WriteOutBytes outBytes, NumberSerializer positionSerializer)
+  {
+    this(outBytes, positionSerializer, new IntSerializer());
+  }
+
+  public void persistAndIncrement(int increment) throws IOException
+  {
+    Preconditions.checkArgument(increment >= 0, "increment must be non-negative");
+    Preconditions.checkState(open, "peristAndIncrement() must be called when open");
+    outBytes.write(positionSerializer.serialize(position));
+    position += increment;
+  }
+
+  public void close() throws IOException
+  {
+    // when done, write an n+1'th entry for the next unused block; this lets us the use invariant
+    // of length of block i = entry i+1 - entry i for all i < n
+    outBytes.write(positionSerializer.serialize(position));
+    open = false;
+  }
+
+  public void transferTo(WritableByteChannel channel) throws IOException
+  {
+    channel.write(indexSizeSerializer.serialize(outBytes.size()));
+    outBytes.writeTo(channel);
+  }
+
+  public long getSerializedSize()
+  {
+    return indexSizeSerializer.getSerializedSize() + Ints.checkedCast(outBytes.size());
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/IntIndexView.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/IntIndexView.java
new file mode 100644
index 0000000000..fd40cd22d6
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/IntIndexView.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.base.Preconditions;
+
+import java.nio.ByteBuffer;
+
+public class IntIndexView
+{
+  private final ByteBuffer byteBuffer;
+  private final int numberOfEntries;
+
+  public IntIndexView(ByteBuffer byteBuffer)
+  {
+    this.byteBuffer = byteBuffer;
+    numberOfEntries = byteBuffer.remaining() / Integer.BYTES;
+  }
+
+  public EntrySpan getEntrySpan(int entryNumber)
+  {
+    Preconditions.checkArgument(
+        entryNumber < numberOfEntries, "invalid entry number %s [%s]", entryNumber, numberOfEntries
+    );
+    int start = byteBuffer.getInt(byteBuffer.position() + entryNumber * Integer.BYTES);
+    int nextStart = byteBuffer.getInt(byteBuffer.position() + ((entryNumber + 1) * Integer.BYTES));
+
+    return new EntrySpan(start, nextStart - start);
+  }
+
+  public static class EntrySpan
+  {
+    private final int start;
+    private final int size;
+
+    public EntrySpan(int start, int size)
+    {
+      this.start = start;
+      this.size = size;
+    }
+
+    public int getStart()
+    {
+      return start;
+    }
+
+    public int getSize()
+    {
+      return size;
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/IntSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/IntSerializer.java
new file mode 100644
index 0000000000..20e893500c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/IntSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.primitives.Ints;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class IntSerializer implements NumberSerializer
+{
+  private final ByteBuffer intValueByteBuffer = ByteBuffer.allocate(Integer.BYTES).order(ByteOrder.nativeOrder());
+
+  @Override
+  public ByteBuffer serialize(long value)
+  {
+    intValueByteBuffer.clear();
+    intValueByteBuffer.putInt(Ints.checkedCast(value)).flip();
+
+    return intValueByteBuffer;
+  }
+
+  @Override
+  public int getSerializedSize()
+  {
+    return Integer.BYTES;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/LongSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/LongSerializer.java
new file mode 100644
index 0000000000..cfcaad5b8d
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/LongSerializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class LongSerializer implements NumberSerializer
+{
+  private final ByteBuffer longValueByteBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.nativeOrder());
+
+  @Override
+  public ByteBuffer serialize(long value)
+  {
+    longValueByteBuffer.clear();
+    longValueByteBuffer.putLong(value).flip();
+
+    return longValueByteBuffer;
+  }
+
+  @Override
+  public int getSerializedSize()
+  {
+    return Long.BYTES;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/NativeClearedByteBufferProvider.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/NativeClearedByteBufferProvider.java
new file mode 100644
index 0000000000..8ee4bc33d1
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/NativeClearedByteBufferProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.segment.CompressedPools;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * supplies direct, 64k, clear()'d ByteBuffers wrapped in a ResourceHolder. Caller is responsible for calling
+ * close() on the {@link ResourceHolder} in order to return it to the pool
+ */
+public class NativeClearedByteBufferProvider implements ByteBufferProvider
+{
+  public static final NativeClearedByteBufferProvider INSTANCE = new NativeClearedByteBufferProvider();
+
+  @Override
+  public ResourceHolder<ByteBuffer> get()
+  {
+    ResourceHolder<ByteBuffer> byteBufferResourceHolder = CompressedPools.getByteBuf(ByteOrder.nativeOrder());
+
+    byteBufferResourceHolder.get().clear();
+
+    return byteBufferResourceHolder;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/NumberSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/NumberSerializer.java
new file mode 100644
index 0000000000..b4deafe548
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/NumberSerializer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import java.nio.ByteBuffer;
+
+public interface NumberSerializer
+{
+  ByteBuffer serialize(long value);
+
+  int getSerializedSize();
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/PayloadEntrySpan.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/PayloadEntrySpan.java
new file mode 100644
index 0000000000..86ff673af5
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/PayloadEntrySpan.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+public class PayloadEntrySpan
+{
+  private final long start;
+  private final int size;
+
+  public PayloadEntrySpan(long start, int size)
+  {
+    this.start = start;
+    this.size = size;
+  }
+
+  public long getStart()
+  {
+    return start;
+  }
+
+  public int getSize()
+  {
+    return size;
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java
new file mode 100644
index 0000000000..ffbf00a9c4
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * {@code StagedSerde} is useful when you have objects that have their own internal logic to serialize, but you wish to
+ * compose the results of multiple serialized objects into a single ByteBuffer (or wrapped {@code byte[]}). Serializers
+ * can implement {@code serializeDelayed} and return a {@code StorableBuffer}. This object allows the serialization to
+ * be broken up so that serializers do whatever work is necessary to report how many bytes are needed. The caller can
+ * then allocate a large enough byte[], wrap it in a ByteBuffer, and use the {@code StorableBuffer.store()} method.
+ * <p/>
+ * This results in superior efficiency over a {@code byte[] toBytes()} method because repeated copies of byte[] are
+ * avoided.
+ * <p/>
+ * Since any serialization that returns a byte[] must reach a point in its serialization that it allocates
+ * said byte[], that code may be executed to create the {@code StorableBuffer}. What code would have written to
+ * a byte[] then makes calls to a ByteBuffer in the store() method.
+ * <p/>
+ * For the cases when it is not easy to break apart the serialization code, increased efficiency may be obtained by
+ * overriding serialize() and directly returning bytes
+ *
+ * <p/>
+ * example
+ * <pre>{@code
+ *   StagedSerde<Fuu> fuuSerde = new ...;
+ *   StagedSerde<Bar> barerde = new ...;
+ *   StorableBuffer fuuBuffer = fuuSerde.serializeDelayed(fuuInstance);
+ *   StorableBuffer barBuffer = barSerde.serializeDelayed(barInstance);
+ *   int size = fuuBuffer.getSerializedSize() + barBuffer.getSerializedSize();
+ *   byte[] bytes = new byte[size];
+ *   ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+ *
+ *   fuuBuffer.store(buffer);
+ *   barBuffer.store(buffer);
+ *
+ * }</pre>
+ * <p/>
+ * Note that for a common case in which you want a byte[] for a single object, a default implementation is provided
+ * that does the above code for a single object.
+ * <p/>
+ *
+ * @param <T>
+ */
+public interface StagedSerde<T>
+{
+  /**
+   * Useful method when some computation is necessary to prepare for serialization without actually writing out
+   * all the bytes in order to determine the serialized size. It allows encapsulation of the size computation and
+   * the final logical to actually store into a ByteBuffer. It also allows for callers to pack multiple serialized
+   * objects into a single ByteBuffer without extra copies of a byte[]/ByteBuffer by using the {@link StorableBuffer}
+   * instance returned
+   *
+   * @param value - object to serialize
+   * @return an object that reports its serialized size and how to serialize the object to a ByteBuffer
+   */
+  StorableBuffer serializeDelayed(@Nullable T value);
+
+  /**
+   * Default implementation for when a byte[] is desired. Typically, this default should suffice. Implementing
+   * serializeDelayed() includes the logic of how to store into a ByteBuffer
+   *
+   * @param value - object to serialize
+   * @return serialized byte[] of value
+   */
+  default byte[] serialize(T value)
+  {
+    StorableBuffer storableBuffer = serializeDelayed(value);
+    ByteBuffer byteBuffer = ByteBuffer.allocate(storableBuffer.getSerializedSize()).order(ByteOrder.nativeOrder());
+
+    storableBuffer.store(byteBuffer);
+
+    return byteBuffer.array();
+  }
+
+  @Nullable
+  T deserialize(ByteBuffer byteBuffer);
+
+  default T deserialize(byte[] bytes)
+  {
+    return deserialize(ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder()));
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/StorableBuffer.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/StorableBuffer.java
new file mode 100644
index 0000000000..f228a81904
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/StorableBuffer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import java.nio.ByteBuffer;
+
+/**
+ * It's useful to return this object when multiple serializable objects are to be composed into a single ByteBuffer
+ * or byte[]. This allows serializable objects to report their size so that callers may get the total size of all
+ * objects and allocate a sufficiently large ByteBuffer/byte[]. Then the store() methods may be used to serialize.
+ * <p/>
+ * This avoids extra copies and wasted memory/gc pressure in the case of just returning a byte[].
+ * <p/>
+ * The getSerializedSize() method is provided because an object may need to use private data in order to calculate
+ * the size needed.
+ *
+ **/
+public interface StorableBuffer
+{
+  StorableBuffer EMPTY = new StorableBuffer()
+  {
+    @Override
+    public void store(ByteBuffer byteBuffer)
+    {
+    }
+
+    @Override
+    public int getSerializedSize()
+    {
+      return 0;
+    }
+  };
+
+  void store(ByteBuffer byteBuffer);
+
+  int getSerializedSize();
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStoreTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStoreTest.java
new file mode 100644
index 0000000000..8075e273c2
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStoreTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.primitives.Ints;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.column.ColumnBuilder;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.serde.cell.IOIterator;
+import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
+import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+
+public class SerializablePairLongStringBufferStoreTest
+{
+  private final Random random = new Random(0);
+  private static final int MIN_INTEGER = 100;
+  private static final long MIN_LONG = 0L;
+  private final SerializablePairLongString[] integerRangeArr = new SerializablePairLongString[]{
+      new SerializablePairLongString((long) MIN_INTEGER, "fuu"),
+      new SerializablePairLongString(101L, "bar"),
+      new SerializablePairLongString(102L, "baz"),
+      };
+  private final SerializablePairLongString[] longRangeArr = new SerializablePairLongString[]{
+      new SerializablePairLongString(MIN_LONG, "fuu"),
+      new SerializablePairLongString(100L, "bar"),
+      new SerializablePairLongString((long) Integer.MAX_VALUE, "baz"),
+      new SerializablePairLongString(Long.MAX_VALUE, "fuubarbaz"),
+      };
+
+  private final SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium();
+
+  private SerializablePairLongStringBufferStore bufferStore;
+
+  @Before
+  public void setup() throws Exception
+  {
+    bufferStore = new SerializablePairLongStringBufferStore(
+        new SerializedStorage<>(
+            writeOutMedium.makeWriteOutBytes(),
+            SerializablePairLongStringColumnSerializer.STAGED_SERDE
+        ));
+  }
+
+  @Test
+  public void testIteratorSimple() throws Exception
+  {
+    for (SerializablePairLongString value : integerRangeArr) {
+      bufferStore.store(value);
+    }
+
+    IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+
+    int i = 0;
+    while (iterator.hasNext()) {
+      Assert.assertEquals(integerRangeArr[i], iterator.next());
+      i++;
+    }
+  }
+
+  @Test
+  public void testIteratorEmptyBuffer() throws Exception
+  {
+    IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void testIteratorNull() throws Exception
+  {
+    bufferStore.store(null);
+    IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+    Assert.assertTrue(iterator.hasNext());
+    Assert.assertNull(iterator.next());
+  }
+
+  @Test
+  public void testIteratorIdempotentHasNext() throws Exception
+  {
+    bufferStore.store(integerRangeArr[0]);
+
+    IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+
+    Assert.assertTrue(iterator.hasNext());
+    // expect hasNext() to not modify state
+    Assert.assertTrue(iterator.hasNext());
+  }
+
+  @Test(expected = NoSuchElementException.class)
+  public void testIteratorEmptyThrows() throws Exception
+  {
+    IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+    iterator.next();
+  }
+
+  @Test
+  public void testIteratorEmptyHasNext() throws Exception
+  {
+    IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void testMinValueUsesInteger() throws Exception
+  {
+    for (SerializablePairLongString value : integerRangeArr) {
+      bufferStore.store(value);
+    }
+
+    SerializablePairLongStringColumnHeader columnHeader = bufferStore.createColumnHeader();
+    Assert.assertEquals(integerRangeArr[0].lhs.longValue(), columnHeader.getMinValue());
+    Assert.assertTrue(columnHeader.isUseIntegerDeltas());
+  }
+
+  @Test
+  public void testMinValueUsesLong() throws Exception
+  {
+    for (SerializablePairLongString value : longRangeArr) {
+      bufferStore.store(value);
+    }
+
+    SerializablePairLongStringColumnHeader columnHeader = bufferStore.createColumnHeader();
+    Assert.assertEquals(MIN_LONG, columnHeader.getMinValue());
+    Assert.assertFalse(columnHeader.isUseIntegerDeltas());
+  }
+
+  @Test
+  public void testMinValueUsesIntegerSerialization() throws Exception
+  {
+    for (SerializablePairLongString value : integerRangeArr) {
+      bufferStore.store(value);
+    }
+
+    SerializablePairLongStringColumnHeader columnHeader = bufferStore.createColumnHeader();
+
+    HeapByteBufferWriteOutBytes channel = new HeapByteBufferWriteOutBytes();
+    try (ResourceHolder<ByteBuffer> resourceHolder = NativeClearedByteBufferProvider.INSTANCE.get()) {
+      columnHeader.transferTo(channel);
+
+      ByteBuffer byteBuffer = resourceHolder.get();
+      channel.writeTo(byteBuffer);
+      byteBuffer.flip();
+
+      SerializablePairLongStringColumnHeader deserializedColumnhHeader =
+          SerializablePairLongStringColumnHeader.fromBuffer(byteBuffer);
+      Assert.assertEquals(MIN_INTEGER, deserializedColumnhHeader.getMinValue());
+      Assert.assertTrue(deserializedColumnhHeader.isUseIntegerDeltas());
+    }
+  }
+
+  @Test
+  public void testMinValueSerialization() throws Exception
+
+  {
+    for (SerializablePairLongString value : longRangeArr) {
+      bufferStore.store(value);
+    }
+
+    SerializablePairLongStringColumnHeader columnHeader = bufferStore.createColumnHeader();
+
+    HeapByteBufferWriteOutBytes channel = new HeapByteBufferWriteOutBytes();
+    try (ResourceHolder<ByteBuffer> resourceHolder = NativeClearedByteBufferProvider.INSTANCE.get()) {
+      columnHeader.transferTo(channel);
+
+      ByteBuffer byteBuffer = resourceHolder.get();
+
+      channel.writeTo(byteBuffer);
+      byteBuffer.flip();
+
+      SerializablePairLongStringColumnHeader deserializedColumnhHeader =
+          SerializablePairLongStringColumnHeader.fromBuffer(byteBuffer);
+      Assert.assertEquals(MIN_LONG, deserializedColumnhHeader.getMinValue());
+      Assert.assertFalse(deserializedColumnhHeader.isUseIntegerDeltas());
+    }
+  }
+
+  @Test
+  public void testVariedSize() throws Exception
+  {
+    int rowCount = 100;
+    int maxStringSize = 1024 * 1024;
+    int minStringSize = 1024;
+    List<SerializablePairLongString> input = new ArrayList<>(rowCount);
+    int totalCount = 0;
+
+    for (int i = 0; i < rowCount; i++) {
+      long longValue = random.nextLong();
+      SerializablePairLongString value =
+          new SerializablePairLongString(longValue, RandomStringUtils.randomAlphabetic(minStringSize, maxStringSize));
+
+      input.add(value);
+      totalCount += longValue;
+      totalCount = Math.max(totalCount, 0);
+
+      bufferStore.store(value);
+    }
+
+    IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+    int i = 0;
+
+    while (iterator.hasNext()) {
+      Assert.assertEquals(input.get(i), iterator.next());
+      i++;
+    }
+  }
+
+  @Test
+  public void testLargeBuffer() throws Exception
+  {
+    // note: tests single element larger than 64k
+    int stringSize = 128 * 1024;
+    SerializablePairLongString value =
+        new SerializablePairLongString(Long.MAX_VALUE, RandomStringUtils.randomAlphabetic(stringSize));
+
+    bufferStore.store(value);
+
+    IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+
+    Assert.assertTrue(iterator.hasNext());
+    Assert.assertEquals(value, iterator.next());
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void testLargeValueCount() throws Exception
+  {
+    List<SerializablePairLongString> valueList = new ArrayList<>();
+
+    for (int i = 0; i < 10000; i++) {
+      valueList.add(new SerializablePairLongString(Integer.MAX_VALUE + (long) i, "the same string"));
+    }
+
+    assertBufferedValuesEqual(valueList);
+  }
+
+  @Test
+  public void testOverflowTransfer() throws Exception
+  {
+    bufferStore.store(new SerializablePairLongString(Long.MIN_VALUE, "fuu"));
+    bufferStore.store(new SerializablePairLongString(Long.MAX_VALUE, "fuu"));
+
+    SerializablePairLongStringColumnHeader columnHeader = bufferStore.createColumnHeader();
+
+    Assert.assertEquals(0, columnHeader.getMinValue());
+
+    SerializablePairLongStringBufferStore.TransferredBuffer transferredBuffer = bufferStore.transferToRowWriter(
+        NativeClearedByteBufferProvider.INSTANCE,
+        writeOutMedium
+    );
+
+    Assert.assertEquals(94, transferredBuffer.getSerializedSize());
+  }
+
+  @Test
+  public void testNullOnlyTransfer() throws Exception
+  {
+    bufferStore.store(null);
+
+    bufferStore.store(null);
+
+    bufferStore.store(null);
+
+    SerializablePairLongStringColumnHeader columnHeader = bufferStore.createColumnHeader();
+
+    Assert.assertEquals(0, columnHeader.getMinValue());
+
+    SerializablePairLongStringBufferStore.TransferredBuffer transferredBuffer = bufferStore.transferToRowWriter(
+        NativeClearedByteBufferProvider.INSTANCE,
+        writeOutMedium
+    );
+
+    Assert.assertEquals(59, transferredBuffer.getSerializedSize());
+  }
+
+  @Test
+  public void testTransferIntegerRange() throws Exception
+  {
+    for (SerializablePairLongString value : integerRangeArr) {
+      bufferStore.store(value);
+    }
+
+    Assert.assertTrue(bufferStore.createColumnHeader().isUseIntegerDeltas());
+
+    assertTransferredValuesEqual(integerRangeArr);
+  }
+
+  @Test
+  public void testTransferLongRange() throws Exception
+  {
+    for (SerializablePairLongString value : longRangeArr) {
+      bufferStore.store(value);
+    }
+
+    Assert.assertFalse(bufferStore.createColumnHeader().isUseIntegerDeltas());
+
+    assertTransferredValuesEqual(longRangeArr);
+  }
+
+  private void assertBufferedValuesEqual(List<SerializablePairLongString> input) throws IOException
+  {
+    for (SerializablePairLongString pairLongString : input) {
+      bufferStore.store(pairLongString);
+    }
+
+    IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+    int i = 0;
+
+    while (iterator.hasNext()) {
+      Assert.assertEquals(input.get(i), iterator.next());
+      i++;
+    }
+
+    Assert.assertEquals(
+        StringUtils.format("element count mismatch: expected %s, got %s", input.size(), i),
+        input.size(),
+        i
+    );
+  }
+
+  private void assertTransferredValuesEqual(SerializablePairLongString[] input) throws IOException
+  {
+    SerializablePairLongStringBufferStore.TransferredBuffer transferredBuffer =
+        bufferStore.transferToRowWriter(NativeClearedByteBufferProvider.INSTANCE, writeOutMedium);
+    HeapByteBufferWriteOutBytes resultChannel = new HeapByteBufferWriteOutBytes();
+
+    transferredBuffer.writeTo(resultChannel, null);
+
+    try (SerializablePairLongStringComplexColumn column = createComplexColumn(transferredBuffer, resultChannel)) {
+      for (int i = 0; i < input.length; i++) {
+        Assert.assertEquals(input[i], column.getRowValue(i));
+      }
+    }
+  }
+
+  private static SerializablePairLongStringComplexColumn createComplexColumn(
+      SerializablePairLongStringBufferStore.TransferredBuffer transferredBuffer,
+      HeapByteBufferWriteOutBytes resultChannel
+  )
+  {
+    ByteBuffer byteBuffer = ByteBuffer.allocate(Ints.checkedCast(transferredBuffer.getSerializedSize()));
+
+    resultChannel.readFully(0, byteBuffer);
+    byteBuffer.flip();
+
+    SerializablePairLongStringComplexMetricSerde complexMetricSerde = new SerializablePairLongStringComplexMetricSerde();
+    ColumnBuilder builder = new ColumnBuilder();
+
+    complexMetricSerde.deserializeColumn(byteBuffer, builder);
+    builder.setType(ValueType.COMPLEX);
+
+    ColumnHolder columnHolder = builder.build();
+    SerializablePairLongStringComplexColumn column = (SerializablePairLongStringComplexColumn) columnHolder.getColumn();
+
+    return column;
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java
new file mode 100644
index 0000000000..c8605f4695
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.segment.GenericColumnSerializer;
+import org.apache.druid.segment.column.ColumnBuilder;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.serde.cell.RandomStringUtils;
+import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+public class SerializablePairLongStringComplexMetricSerdeTest
+{
+  private static final SerializablePairLongStringComplexMetricSerde COMPLEX_METRIC_SERDE =
+      new SerializablePairLongStringComplexMetricSerde();
+
+  // want deterministic test input
+  private final Random random = new Random(0);
+  private final RandomStringUtils randomStringUtils = new RandomStringUtils(random);
+
+  private GenericColumnSerializer<SerializablePairLongString> serializer;
+
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setup()
+  {
+    SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium();
+    serializer = (GenericColumnSerializer<SerializablePairLongString>) COMPLEX_METRIC_SERDE.getSerializer(
+        writeOutMedium,
+        "not-used"
+    );
+  }
+
+  @Test
+  public void testSingle() throws Exception
+  {
+    assertExpected(ImmutableList.of(new SerializablePairLongString(100L, "fuu")), 77);
+  }
+
+  @Test
+  public void testLargeString() throws Exception
+  {
+    // single entry spans more than one block in underlying storage
+    assertExpected(ImmutableList.of(new SerializablePairLongString(
+        100L,
+        randomStringUtils.randomAlphanumeric(2 * 1024 * 1024)
+    )), 2103140);
+  }
+
+  @Test
+  public void testCompressable() throws Exception
+  {
+    int numStrings = 10;
+    List<SerializablePairLongString> valueList = new ArrayList<>();
+    List<String> stringList = new ArrayList<>();
+
+    for (int i = 0; i < numStrings; i++) {
+      stringList.add(randomStringUtils.randomAlphanumeric(1024));
+    }
+    for (int i = 0; i < 10000; i++) {
+      valueList.add(new SerializablePairLongString(Integer.MAX_VALUE + (long) i, stringList.get(i % numStrings)));
+    }
+
+    //actual input bytes in naive encoding is ~10mb
+    assertExpected(valueList, 1746026);
+  }
+
+  @Test
+  public void testHighlyCompressable() throws Exception
+  {
+    List<SerializablePairLongString> valueList = new ArrayList<>();
+
+    String stringValue = randomStringUtils.randomAlphanumeric(1024);
+    for (int i = 0; i < 10000; i++) {
+      valueList.add(new SerializablePairLongString(Integer.MAX_VALUE + (long) i, stringValue));
+    }
+
+    //actual input bytes in naive encoding is ~10mb
+    assertExpected(valueList, 289645);
+  }
+
+  @Test
+  public void testRandom() throws Exception
+  {
+    List<SerializablePairLongString> valueList = new ArrayList<>();
+
+    for (int i = 0; i < 10000; i++) {
+      valueList.add(new SerializablePairLongString(random.nextLong(), randomStringUtils.randomAlphanumeric(1024)));
+    }
+
+    assertExpected(valueList, 10428975);
+  }
+
+  @Test
+  public void testNullString() throws Exception
+  {
+    assertExpected(ImmutableList.of(new SerializablePairLongString(100L, null)), 74);
+  }
+
+  @Test
+  public void testEmpty() throws Exception
+  {
+    // minimum size for empty data
+    assertExpected(Collections.emptyList(), 57);
+  }
+
+  @Test
+  public void testSingleNull() throws Exception
+  {
+    assertExpected(Arrays.asList(new SerializablePairLongString[]{null}), 58);
+  }
+
+  @Test
+  public void testMultipleNull() throws Exception
+  {
+    assertExpected(Arrays.asList(null, null, null, null), 59);
+  }
+
+  private void assertExpected(List<SerializablePairLongString> expected) throws IOException
+  {
+    assertExpected(expected, -1);
+  }
+
+  private void assertExpected(List<SerializablePairLongString> expected, int expectedSize) throws IOException
+  {
+    List<SerializablePairLongStringValueSelector> valueSelectors =
+        expected.stream().map(SerializablePairLongStringValueSelector::new).collect(Collectors.toList());
+    ByteBuffer byteBuffer = serializeAllValuesToByteBuffer(valueSelectors, serializer, expectedSize);
+
+    try (SerializablePairLongStringComplexColumn complexColumn = createComplexColumn(byteBuffer)) {
+      for (int i = 0; i < valueSelectors.size(); i++) {
+        Assert.assertEquals(expected.get(i), complexColumn.getRowValue(i));
+      }
+    }
+  }
+
+  private SerializablePairLongStringComplexColumn createComplexColumn(ByteBuffer byteBuffer)
+  {
+    ColumnBuilder builder = new ColumnBuilder();
+    int serializedSize = byteBuffer.remaining();
+
+    COMPLEX_METRIC_SERDE.deserializeColumn(byteBuffer, builder);
+    builder.setType(ValueType.COMPLEX);
+
+    ColumnHolder columnHolder = builder.build();
+
+    SerializablePairLongStringComplexColumn column = (SerializablePairLongStringComplexColumn) columnHolder.getColumn();
+
+    Assert.assertEquals(serializedSize, column.getLength());
+    Assert.assertEquals("serializablePairLongString", column.getTypeName());
+    Assert.assertEquals(SerializablePairLongString.class, column.getClazz());
+
+    return column;
+  }
+
+
+  private static ByteBuffer serializeAllValuesToByteBuffer(
+      Collection<SerializablePairLongStringValueSelector> valueSelectors,
+      GenericColumnSerializer<SerializablePairLongString> serializer,
+      int expectedSize
+  ) throws IOException
+  {
+    serializer.open();
+
+    for (SerializablePairLongStringValueSelector valueSelector : valueSelectors) {
+      serializer.serialize(valueSelector);
+    }
+
+    return serializeToByteBuffer(serializer, expectedSize);
+  }
+
+  private static ByteBuffer serializeToByteBuffer(
+      GenericColumnSerializer<SerializablePairLongString> serializer,
+      int expectedSize
+  ) throws IOException
+  {
+    HeapByteBufferWriteOutBytes channel = new HeapByteBufferWriteOutBytes();
+
+    serializer.writeTo(channel, null);
+
+    ByteBuffer byteBuffer = ByteBuffer.allocate((int) channel.size()).order(ByteOrder.nativeOrder());
+
+    channel.readFully(0, byteBuffer);
+    byteBuffer.flip();
+
+    if (expectedSize > -1) {
+      Assert.assertEquals(expectedSize, serializer.getSerializedSize());
+    }
+
+    Assert.assertEquals(serializer.getSerializedSize(), byteBuffer.limit());
+
+    return byteBuffer;
+  }
+
+  private static class SerializablePairLongStringValueSelector
+      extends SingleValueColumnValueSelector<SerializablePairLongString>
+  {
+    public SerializablePairLongStringValueSelector(SerializablePairLongString value)
+    {
+      super(SerializablePairLongString.class, value);
+    }
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerdeTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerdeTest.java
new file mode 100644
index 0000000000..d0489cf928
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerdeTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.segment.serde.cell.RandomStringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+public class SerializablePairLongStringDeltaEncodedStagedSerdeTest
+{
+  private static final SerializablePairLongStringDeltaEncodedStagedSerde INTEGER_SERDE =
+      new SerializablePairLongStringDeltaEncodedStagedSerde(0L, true);
+
+  private static final SerializablePairLongStringDeltaEncodedStagedSerde LONG_SERDE =
+      new SerializablePairLongStringDeltaEncodedStagedSerde(0L, false);
+
+  private final RandomStringUtils randomStringUtils = new RandomStringUtils();
+
+  @Test
+  public void testNull()
+  {
+    assertValueEquals(null, 0, INTEGER_SERDE);
+  }
+
+  @Test
+  public void testSimpleInteger()
+  {
+    assertValueEquals(new SerializablePairLongString(100L, "fuu"), 11, INTEGER_SERDE);
+  }
+
+  @Test
+  public void testNullStringInteger()
+  {
+    assertValueEquals(new SerializablePairLongString(100L, null), 8, INTEGER_SERDE);
+  }
+
+  @Test
+  public void testLargeStringInteger()
+  {
+    assertValueEquals(
+        new SerializablePairLongString(100L, randomStringUtils.randomAlphanumeric(1024 * 1024)),
+        1048584,
+        INTEGER_SERDE
+    );
+  }
+
+  @Test
+  public void testSimpleLong()
+  {
+    assertValueEquals(new SerializablePairLongString(100L, "fuu"), 15, LONG_SERDE);
+  }
+
+  @Test
+  public void testNullStringLong()
+  {
+    assertValueEquals(new SerializablePairLongString(100L, null), 12, LONG_SERDE);
+  }
+
+  @Test
+  public void testLargeStringLong()
+  {
+    assertValueEquals(
+        new SerializablePairLongString(100L, randomStringUtils.randomAlphanumeric(10 * 1024 * 1024)),
+        10485772,
+        LONG_SERDE
+    );
+  }
+
+  private static void assertValueEquals(
+      @Nullable SerializablePairLongString value,
+      int size,
+      SerializablePairLongStringDeltaEncodedStagedSerde serde
+  )
+  {
+    byte[] bytes = serde.serialize(value);
+    Assert.assertEquals(size, bytes.length);
+    SerializablePairLongString deserialized = serde.deserialize(bytes);
+    Assert.assertEquals(value, deserialized);
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerdeTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerdeTest.java
new file mode 100644
index 0000000000..23d57f0aa9
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerdeTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.segment.serde.cell.RandomStringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Random;
+
+public class SerializablePairLongStringSimpleStagedSerdeTest
+{
+  private static final SerializablePairLongStringSimpleStagedSerde SERDE =
+      new SerializablePairLongStringSimpleStagedSerde();
+
+  private final RandomStringUtils randomStringUtils = new RandomStringUtils(new Random(0));
+
+  @Test
+  public void testSimple()
+  {
+    assertValueEquals(new SerializablePairLongString(Long.MAX_VALUE, "fuu"), 15);
+  }
+
+  @Test
+  public void testNull()
+  {
+    assertValueEquals(null, 0);
+  }
+
+  @Test
+  public void testNullString()
+  {
+    assertValueEquals(new SerializablePairLongString(Long.MAX_VALUE, null), 12);
+  }
+
+  @Test
+  public void testLargeString()
+  {
+    assertValueEquals(
+        new SerializablePairLongString(Long.MAX_VALUE, randomStringUtils.randomAlphanumeric(1024 * 1024)),
+        1048588
+    );
+  }
+
+  private static void assertValueEquals(@Nullable SerializablePairLongString value, int size)
+  {
+    byte[] bytes = SERDE.serialize(value);
+    Assert.assertEquals(size, bytes.length);
+    SerializablePairLongString deserialized = SERDE.deserialize(bytes);
+    Assert.assertEquals(value, deserialized);
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SingleValueColumnValueSelector.java b/processing/src/test/java/org/apache/druid/query/aggregation/SingleValueColumnValueSelector.java
new file mode 100644
index 0000000000..8c9d232ca9
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/SingleValueColumnValueSelector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+public class SingleValueColumnValueSelector<T> implements ColumnValueSelector<T>
+{
+  private final Class<T> valueClass;
+  private final T value;
+
+  public SingleValueColumnValueSelector(Class<T> valueClass, T value)
+  {
+    this.valueClass = valueClass;
+    this.value = value;
+  }
+
+  @Override
+  public double getDouble()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public float getFloat()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getLong()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+  {
+  }
+
+  @Override
+  public boolean isNull()
+  {
+    return false;
+  }
+
+  @Nullable
+  @Override
+  public T getObject()
+  {
+    return value;
+  }
+
+  @Override
+  public Class<? extends T> classOfObject()
+  {
+    return valueClass;
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
index 250525ca1b..267fa52ab6 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
@@ -30,7 +30,7 @@ import org.apache.druid.query.QueryRunnerTestHelper;
 import org.apache.druid.query.Result;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.SerializablePairLongString;
-import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
+import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde;
 import org.apache.druid.query.timeseries.DefaultTimeseriesQueryMetrics;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
@@ -68,7 +68,7 @@ public class StringFirstTimeseriesQueryTest extends InitializedNullHandlingTest
   @Before
   public void setUp() throws IndexSizeExceededException
   {
-    final SerializablePairLongStringSerde serde = new SerializablePairLongStringSerde();
+    final SerializablePairLongStringComplexMetricSerde serde = new SerializablePairLongStringComplexMetricSerde();
     ComplexMetrics.registerSerde(serde.getTypeName(), serde);
 
     incrementalIndex = new OnheapIncrementalIndex.Builder()
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
index 6c017bab4a..33bff8146f 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
@@ -30,7 +30,7 @@ import org.apache.druid.query.QueryRunnerTestHelper;
 import org.apache.druid.query.Result;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.SerializablePairLongString;
-import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
+import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde;
 import org.apache.druid.query.timeseries.DefaultTimeseriesQueryMetrics;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
@@ -67,7 +67,7 @@ public class StringLastTimeseriesQueryTest
   @Before
   public void setUp() throws IndexSizeExceededException
   {
-    final SerializablePairLongStringSerde serde = new SerializablePairLongStringSerde();
+    final SerializablePairLongStringComplexMetricSerde serde = new SerializablePairLongStringComplexMetricSerde();
     ComplexMetrics.registerSerde(serde.getTypeName(), serde);
 
     incrementalIndex = new OnheapIncrementalIndex.Builder()
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/ComplexMetricsTest.java b/processing/src/test/java/org/apache/druid/segment/serde/ComplexMetricsTest.java
index df580a5c3a..d9e6b7ba54 100644
--- a/processing/src/test/java/org/apache/druid/segment/serde/ComplexMetricsTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/serde/ComplexMetricsTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.druid.segment.serde;
 
-import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
+import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde;
 import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -67,9 +67,9 @@ public class ComplexMetricsTest
     Assert.assertTrue(serde instanceof HyperUniquesSerde);
 
     expectedException.expect(IllegalStateException.class);
-    expectedException.expectMessage("Incompatible serializer for type[hyperUnique] already exists. Expected [org.apache.druid.query.aggregation.SerializablePairLongStringSerde], found [org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde");
+    expectedException.expectMessage("Incompatible serializer for type[hyperUnique] already exists. Expected [org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde], found [org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde");
 
-    ComplexMetrics.registerSerde("hyperUnique", new SerializablePairLongStringSerde());
+    ComplexMetrics.registerSerde("hyperUnique", new SerializablePairLongStringComplexMetricSerde());
 
     serde = ComplexMetrics.getSerdeForType("hyperUnique");
     Assert.assertNotNull(serde);
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriterReaderTest.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriterReaderTest.java
new file mode 100644
index 0000000000..9e23d8e30f
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriterReaderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+
+/**
+ * tests both {@link BlockCompressedPayloadWriter} and {@link BlockCompressedPayloadReader}
+ */
+public class BlockCompressedPayloadWriterReaderTest extends BytesReadWriteTestBase
+{
+  public BlockCompressedPayloadWriterReaderTest()
+  {
+    super(
+        new BlockCompressedPayloadWriterToBytesWriter.Builder(
+            new BlockCompressedPayloadWriter.Builder(
+                new OnHeapMemorySegmentWriteOutMedium()
+            )
+        ),
+        ByteWriterTestHelper.ValidationFunctionBuilder.PAYLOAD_WRITER_VALIDATION_FUNCTION_FACTORY,
+        new BytesReadWriteTestCases()
+            .setTestCaseValue(BytesReadWriteTest::testSingleWriteBytes, TestCaseResult.of(4115))
+            .setTestCaseValue(BytesReadWriteTest::testSingleMultiBlockWriteBytes, TestCaseResult.of(1049169))
+            .setTestCaseValue(BytesReadWriteTest::testSingleMultiBlockWriteBytesWithPrelude, TestCaseResult.of(1053238))
+            // BytesReadWriteTest::testEmptyByteArray -> compression header is 12-bytes
+            .setTestCaseValue(BytesReadWriteTest::testEmptyByteArray, TestCaseResult.of(12))
+            .setTestCaseValue(BytesReadWriteTest::testNull, TestCaseResult.of(new NullPointerException()))
+            .setTestCaseValue(BytesReadWriteTest::testSingleLong, TestCaseResult.of(25))
+            .setTestCaseValue(BytesReadWriteTest::testVariableSizedCompressablePayloads, TestCaseResult.of(1180))
+            .setTestCaseValue(
+                BytesReadWriteTest::testOutliersInNormalDataUncompressablePayloads,
+                TestCaseResult.of(574302)
+            )
+            .setTestCaseValue(BytesReadWriteTest::testOutliersInNormalDataCompressablePayloads, TestCaseResult.of(5997))
+            .setTestCaseValue(BytesReadWriteTest::testSingleUncompressableBlock, TestCaseResult.of(65715))
+            .setTestCaseValue(BytesReadWriteTest::testSingleWriteByteBufferZSTD, TestCaseResult.of(796))
+            .setTestCaseValue(
+                BytesReadWriteTest::testSingleWriteByteBufferAlternateByteBufferProvider,
+                TestCaseResult.of(1077)
+            )
+            .setTestCaseValue(BytesReadWriteTest::testRandomBlockAccess, TestCaseResult.of(3124842))
+    );
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriterToBytesWriter.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriterToBytesWriter.java
new file mode 100644
index 0000000000..fed4032fc3
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriterToBytesWriter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.segment.data.CompressionStrategy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+public class BlockCompressedPayloadWriterToBytesWriter implements BytesWriter
+{
+  private final BlockCompressedPayloadWriter blockCompressedPayloadWriter;
+
+  public BlockCompressedPayloadWriterToBytesWriter(BlockCompressedPayloadWriter blockCompressedPayloadWriter)
+  {
+    this.blockCompressedPayloadWriter = blockCompressedPayloadWriter;
+  }
+
+  @Override
+  public void write(byte[] payload) throws IOException
+  {
+    blockCompressedPayloadWriter.write(payload);
+  }
+
+  @Override
+  public void write(ByteBuffer cellByteBuffer) throws IOException
+  {
+    blockCompressedPayloadWriter.write(cellByteBuffer);
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    blockCompressedPayloadWriter.close();
+  }
+
+  @Override
+  public void transferTo(WritableByteChannel channel) throws IOException
+  {
+    blockCompressedPayloadWriter.writeTo(channel, null);
+  }
+
+  @Override
+  public long getSerializedSize()
+  {
+    return blockCompressedPayloadWriter.getSerializedSize();
+  }
+
+  public static class Builder implements BytesWriterBuilder
+  {
+    private final BlockCompressedPayloadWriter.Builder builder;
+
+    public Builder(BlockCompressedPayloadWriter.Builder builder)
+    {
+      this.builder = builder;
+    }
+
+    @Override
+    public BytesWriter build() throws IOException
+    {
+      return new BlockCompressedPayloadWriterToBytesWriter(builder.build());
+    }
+
+    @Override
+    public BytesWriterBuilder setCompressionStrategy(CompressionStrategy compressionStrategy)
+    {
+      builder.setCompressionStrategy(compressionStrategy);
+
+      return this;
+    }
+
+    @Override
+    public BytesWriterBuilder setByteBufferProvider(ByteBufferProvider byteBufferProvider)
+    {
+      builder.setByteBufferProvider(byteBufferProvider);
+
+      return this;
+    }
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/ByteWriterTestHelper.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/ByteWriterTestHelper.java
new file mode 100644
index 0000000000..89e9b77bf9
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/ByteWriterTestHelper.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes;
+import org.junit.Assert;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+@SuppressWarnings("UnusedReturnValue")
+public class ByteWriterTestHelper
+{
+  private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new byte[0]);
+
+  private final BytesWriterBuilder bytesWriterBuilder;
+  private final ValidationFunctionBuilder validationFunctionBuilder;
+  private CompressionStrategy compressionStrategy = CompressionStrategy.LZ4;
+  private ByteBufferProvider byteBufferProvider = NativeClearedByteBufferProvider.INSTANCE;
+
+  public ByteWriterTestHelper(
+      BytesWriterBuilder bytesWriterBuilder,
+      ValidationFunctionBuilder validationFunctionBuilder
+  )
+  {
+    this.bytesWriterBuilder = bytesWriterBuilder;
+    this.validationFunctionBuilder = validationFunctionBuilder;
+  }
+
+  public ByteWriterTestHelper setCompressionStrategy(CompressionStrategy compressionStrategy)
+  {
+    this.compressionStrategy = compressionStrategy;
+    bytesWriterBuilder.setCompressionStrategy(compressionStrategy);
+
+    return this;
+  }
+
+  public ByteWriterTestHelper setByteBufferProvider(ByteBufferProvider byteBufferProvider)
+  {
+    this.byteBufferProvider = byteBufferProvider;
+    bytesWriterBuilder.setByteBufferProvider(byteBufferProvider);
+
+    return this;
+  }
+
+  public ByteBuffer writePayloadAsByteArray(ByteBuffer payload) throws IOException
+  {
+    return writePayload(payload, BufferWriterAsBytes.INSTANCE);
+  }
+
+  public ByteBuffer writePayloadAsByteBuffer(ByteBuffer payload) throws IOException
+  {
+    return writePayload(payload, BufferWriterAsBuffer.INSTANCE);
+  }
+
+  public List<ByteBuffer> generateRaggedPayloadBuffer(
+      int baseMin,
+      int baseMax,
+      int stepSize,
+      int largeSize,
+      int largeCount
+  )
+  {
+    return generateRaggedPayloadBuffer(baseMin, baseMax, stepSize, largeSize, largeCount, Integer.MAX_VALUE);
+  }
+
+  public List<ByteBuffer> generateRaggedPayloadBuffer(
+      int baseMin,
+      int baseMax,
+      int stepSize,
+      int largeSize,
+      int largeCount,
+      int modulo
+  )
+  {
+    List<ByteBuffer> byteBufferList = new ArrayList<>();
+
+    for (int i = baseMin; i < baseMax; i += stepSize) {
+      byteBufferList.add(generateIntPayloads(baseMin + i, modulo));
+    }
+
+    for (int j = 0; j < largeCount; j++) {
+      byteBufferList.add(generateIntPayloads(largeSize, modulo));
+
+      for (int i = baseMin; i < baseMax; i += stepSize) {
+        byteBufferList.add(generateIntPayloads(baseMin + i, modulo));
+      }
+    }
+
+    return byteBufferList;
+  }
+
+  public void validateRead(List<ByteBuffer> byteBufferList) throws Exception
+  {
+    ValidationFunction validationFunction = validationFunctionBuilder.build(this);
+    validationFunction.validateBufferList(byteBufferList);
+  }
+
+  public void validateReadAndSize(List<ByteBuffer> byteBufferList, int expectedSize) throws Exception
+  {
+    ValidationFunction validationFunction = validationFunctionBuilder.build(this);
+    ByteBuffer masterByteBuffer = validationFunction.validateBufferList(byteBufferList);
+    int actualSize = masterByteBuffer.limit();
+
+    if (expectedSize > -1) {
+      Assert.assertEquals(expectedSize, actualSize);
+    }
+  }
+
+  public ByteBuffer writePayload(ByteBuffer sourcePayLoad, BufferWriter bufferWriter) throws IOException
+  {
+    return writePayloadList(Collections.singletonList(sourcePayLoad), bufferWriter);
+  }
+
+  public ByteBuffer writePayloadList(List<ByteBuffer> payloadList) throws IOException
+  {
+    return writePayloadList(payloadList, BufferWriterAsBuffer.INSTANCE);
+  }
+
+  public ByteBuffer writePayloadList(List<ByteBuffer> payloadList, BufferWriter bufferWriter) throws IOException
+  {
+    BytesWriter bytesWriter = bytesWriterBuilder.build();
+
+    try {
+      for (ByteBuffer payload : payloadList) {
+        bufferWriter.writeTo(bytesWriter, payload);
+      }
+    }
+    finally {
+      bytesWriter.close();
+    }
+
+
+    HeapByteBufferWriteOutBytes bufferWriteOutBytes = new HeapByteBufferWriteOutBytes();
+
+    bytesWriter.transferTo(bufferWriteOutBytes);
+
+    int payloadSerializedSize = Ints.checkedCast(bytesWriter.getSerializedSize());
+    ByteBuffer masterByteBuffer = ByteBuffer.allocate(payloadSerializedSize).order(ByteOrder.nativeOrder());
+
+    bufferWriteOutBytes.readFully(0, masterByteBuffer);
+    masterByteBuffer.flip();
+
+    Assert.assertEquals(bytesWriter.getSerializedSize(), masterByteBuffer.limit());
+
+    return masterByteBuffer;
+  }
+
+  public ByteBuffer generateIntPayloads(int intCount)
+  {
+    return generateIntPayloads(intCount, Integer.MAX_VALUE);
+  }
+
+  public ByteBuffer generateIntPayloads(int intCount, int modulo)
+  {
+    ByteBuffer payload = ByteBuffer.allocate(Integer.BYTES * intCount).order(ByteOrder.nativeOrder());
+
+    for (int i = intCount - 1; i >= 0; i--) {
+
+      payload.putInt(i % modulo);
+    }
+
+    payload.flip();
+
+    return payload;
+  }
+
+  @Nonnull
+  public ByteBuffer generateBufferWithLongs(int longCount)
+  {
+    ByteBuffer longPayload = ByteBuffer.allocate(Long.BYTES * longCount).order(ByteOrder.nativeOrder());
+
+    for (int i = 0; i < longCount; i++) {
+      longPayload.putLong(longCount - i - 1);
+    }
+
+    longPayload.flip();
+
+    return longPayload;
+  }
+
+  public ByteBuffer validateBufferWriteAndReadBlockCompressed(
+      List<ByteBuffer> bufferList,
+      boolean useRandom
+  ) throws IOException
+  {
+    long position = 0;
+    List<PayloadEntrySpan> payloadReadList = new ArrayList<>();
+
+    for (ByteBuffer byteBuffer : bufferList) {
+      int expectedSize = byteBuffer == null ? 0 : byteBuffer.limit();
+      payloadReadList.add(new PayloadEntrySpan(position, expectedSize));
+      position += expectedSize;
+    }
+
+    ByteBuffer masterByteBuffer = writePayloadList(bufferList, new BufferWriterAsBytes());
+
+    return readAndValidatePayloads(bufferList, useRandom, payloadReadList, masterByteBuffer);
+  }
+
+  @Nonnull
+  private ByteBuffer readAndValidatePayloads(
+      List<ByteBuffer> bufferList,
+      boolean useRandom,
+      List<PayloadEntrySpan> payloadReadList,
+      ByteBuffer masterByteBuffer
+  ) throws IOException
+  {
+    try (BlockCompressedPayloadReader payloadReader = BlockCompressedPayloadReader.create(
+        masterByteBuffer,
+        byteBufferProvider,
+        compressionStrategy.getDecompressor()
+    )) {
+      List<Integer> positions = new ArrayList<>(bufferList.size());
+
+      for (int i = 0; i < bufferList.size(); i++) {
+        positions.add(i);
+      }
+
+      Random random = new Random(0);
+
+      if (useRandom) {
+        Collections.shuffle(positions, random);
+      }
+
+      for (int index : positions) {
+        ByteBuffer expectedByteBuffer = bufferList.get(index);
+        PayloadEntrySpan payloadEntrySpan = payloadReadList.get(index);
+        ByteBuffer readByteBuffer = payloadReader.read(payloadEntrySpan.getStart(), payloadEntrySpan.getSize());
+
+        if (expectedByteBuffer == null) {
+          Assert.assertEquals(StringUtils.format("expected empty buffer %s", index), EMPTY_BYTE_BUFFER, readByteBuffer);
+        } else {
+          Assert.assertEquals(StringUtils.format("failure on buffer %s", index), expectedByteBuffer, readByteBuffer);
+        }
+      }
+
+      return masterByteBuffer;
+    }
+  }
+
+  public ByteBuffer validateBufferWriteAndReadCells(List<ByteBuffer> bufferList, boolean useRandomRead)
+      throws IOException
+  {
+    ByteBuffer masterByteBuffer = writePayloadList(bufferList, new BufferWriterAsBytes());
+
+    return readAndValidateCells(bufferList, useRandomRead, masterByteBuffer);
+  }
+
+  @Nonnull
+  private ByteBuffer readAndValidateCells(
+      List<ByteBuffer> bufferList,
+      boolean useRandomRead,
+      ByteBuffer masterByteBuffer
+  ) throws IOException
+  {
+    try (CellReader cellReader = new CellReader.Builder(masterByteBuffer)
+        .setByteBufferProvider(byteBufferProvider)
+        .setCompressionStrategy(compressionStrategy)
+        .build()) {
+
+      List<Integer> positions = new ArrayList<>(bufferList.size());
+
+      for (int i = 0; i < bufferList.size(); i++) {
+        positions.add(i);
+      }
+
+      Random random = new Random(0);
+
+      if (useRandomRead) {
+        Collections.shuffle(positions, random);
+      }
+
+
+      for (int index : positions) {
+        ByteBuffer expectedByteBuffer = bufferList.get(index);
+
+        ByteBuffer readByteBuffer = cellReader.getCell(index);
+        if (expectedByteBuffer == null) {
+          Assert.assertEquals(StringUtils.format("failure on buffer %s", index), 0L, readByteBuffer.remaining());
+        } else {
+          Assert.assertEquals(StringUtils.format("failure on buffer %s", index), expectedByteBuffer, readByteBuffer);
+        }
+      }
+
+      return masterByteBuffer;
+    }
+  }
+
+  public ByteWriterTestHelper setUseRandomReadOrder(boolean useReadRandom)
+  {
+    validationFunctionBuilder.setReadRandom(useReadRandom);
+
+    return this;
+  }
+
+  public interface BufferWriter
+  {
+    void writeTo(BytesWriter writer, ByteBuffer payload) throws IOException;
+  }
+
+  public static class BufferWriterAsBytes implements BufferWriter
+  {
+    public static final BufferWriterAsBytes INSTANCE = new BufferWriterAsBytes();
+
+    @Override
+    public void writeTo(BytesWriter writer, ByteBuffer payload) throws IOException
+    {
+      if (payload == null) {
+        writer.write((byte[]) null);
+      } else {
+        writer.write(payload.array());
+      }
+    }
+  }
+
+  public static class BufferWriterAsBuffer implements BufferWriter
+  {
+    public static final BufferWriterAsBuffer INSTANCE = new BufferWriterAsBuffer();
+
+    @Override
+    public void writeTo(BytesWriter writer, ByteBuffer payload) throws IOException
+    {
+      writer.write(payload);
+    }
+  }
+
+  public interface ValidationFunction
+  {
+    ByteBuffer validateBufferList(List<ByteBuffer> byteBufferList) throws Exception;
+  }
+
+  public interface ValidationFunctionBuilder
+  {
+    ValidationFunctionBuilder PAYLOAD_WRITER_VALIDATION_FUNCTION_FACTORY = new PayloadWriterValidationFunctionBuilder();
+
+    ValidationFunctionBuilder CELL_READER_VALIDATION_FUNCTION_FACTORY = new CellReaderValidationFunctionBuilder();
+
+    ValidationFunction build(ByteWriterTestHelper testHelper);
+
+    ValidationFunctionBuilder setReadRandom(boolean useRandomRead);
+  }
+
+  public static class PayloadWriterValidationFunctionBuilder implements ValidationFunctionBuilder
+  {
+    private boolean useRandomRead;
+
+    @Override
+    public ValidationFunctionBuilder setReadRandom(boolean useRandomRead)
+    {
+      this.useRandomRead = useRandomRead;
+
+      return this;
+    }
+
+    @Override
+    public ValidationFunction build(ByteWriterTestHelper testHelper)
+    {
+      return bufferList -> testHelper.validateBufferWriteAndReadBlockCompressed(bufferList, useRandomRead);
+    }
+  }
+
+  public static class CellReaderValidationFunctionBuilder implements ValidationFunctionBuilder
+  {
+    private boolean useRandomRead;
+
+    @Override
+    public ValidationFunction build(ByteWriterTestHelper testHelper)
+    {
+      return bufferList -> testHelper.validateBufferWriteAndReadCells(bufferList, useRandomRead);
+    }
+
+    @Override
+    public ValidationFunctionBuilder setReadRandom(boolean useRandomRead)
+    {
+      this.useRandomRead = useRandomRead;
+      return this;
+    }
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTest.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTest.java
new file mode 100644
index 0000000000..516b913111
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+public interface BytesReadWriteTest
+{
+  void testSingleWriteBytes() throws Exception;
+
+  void testSingleMultiBlockWriteBytes() throws Exception;
+
+  void testSingleMultiBlockWriteBytesWithPrelude() throws Exception;
+
+  void testEmptyByteArray() throws Exception;
+
+  void testNull() throws Exception;
+
+  void testSingleLong() throws Exception;
+
+  void testVariableSizedCompressablePayloads() throws Exception;
+
+  void testOutliersInNormalDataUncompressablePayloads() throws Exception;
+
+  void testOutliersInNormalDataCompressablePayloads() throws Exception;
+
+  void testSingleUncompressableBlock() throws Exception;
+
+  void testSingleWriteByteBufferZSTD() throws Exception;
+
+  void testSingleWriteByteBufferAlternateByteBufferProvider() throws Exception;
+
+  void testRandomBlockAccess() throws Exception;
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTestBase.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTestBase.java
new file mode 100644
index 0000000000..f8c79ab606
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTestBase.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+// base class used only for extension
+public abstract class BytesReadWriteTestBase implements BytesReadWriteTest
+{
+  protected final BytesWriterBuilder bytesWriterBuilder;
+
+  private final TestCasesConfig<BytesReadWriteTest> testCases;
+  private final ByteWriterTestHelper.ValidationFunctionBuilder validationFunctionBuilder;
+
+  private ByteWriterTestHelper testHelper;
+
+  protected BytesReadWriteTestBase(
+      BytesWriterBuilder bytesWriterBuilder,
+      ByteWriterTestHelper.ValidationFunctionBuilder validationFunctionBuilder,
+      TestCasesConfig<BytesReadWriteTest> testCases
+  )
+  {
+    this.testCases = testCases;
+    this.bytesWriterBuilder = bytesWriterBuilder;
+    this.validationFunctionBuilder = validationFunctionBuilder;
+  }
+
+  protected ByteWriterTestHelper getTestHelper()
+  {
+    return testHelper;
+  }
+
+  @Before
+  public void setup()
+  {
+    testHelper = new ByteWriterTestHelper(bytesWriterBuilder, validationFunctionBuilder);
+  }
+
+  @Test
+  @Override
+  public void testSingleWriteBytes() throws Exception
+  {
+    Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+    ByteBuffer payload = testHelper.generateBufferWithLongs(1024);
+
+    runTestWithExceptionHandling(Collections.singletonList(payload), testCases.currentTestValue());
+  }
+
+  @Test
+  @Override
+  public void testSingleMultiBlockWriteBytes() throws Exception
+  {
+    Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+    ByteBuffer payload = testHelper.generateBufferWithLongs(256 * 1024); // 2mb
+
+    runTestWithExceptionHandling(Collections.singletonList(payload), testCases.currentTestValue());
+  }
+
+  @Test
+  @Override
+  public void testSingleMultiBlockWriteBytesWithPrelude() throws Exception
+  {
+    Assume.assumeTrue(testCases.isCurrentTestEnabled());
+    ByteBuffer payload1 = testHelper.generateBufferWithLongs(1024); // 8 kb
+    ByteBuffer payload2 = testHelper.generateBufferWithLongs(256 * 1024); // 256kb * 8 = 2mb
+
+    runTestWithExceptionHandling(Arrays.asList(payload1, payload2), testCases.currentTestValue());
+  }
+
+  @Test
+  @Override
+  public void testEmptyByteArray() throws Exception
+  {
+    Assume.assumeTrue(testCases.isCurrentTestEnabled());
+    // no-op
+    ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
+    // block index size: "8" : 4 bytes
+    // block index entry 0: "0": 4 bytes
+    // block index entry 1: "1": 4 bytes
+    // data stream size : "0" : 4 bytes
+    runTestWithExceptionHandling(Collections.singletonList(payload), testCases.currentTestValue());
+  }
+
+  @Test
+  @Override
+  public void testNull() throws Exception
+  {
+    Assume.assumeTrue(testCases.isCurrentTestEnabled());
+    TestCaseResult testCaseResult = testCases.currentTestValue();
+
+    runTestWithExceptionHandling(Collections.singletonList(null), testCaseResult);
+  }
+
+  @Test
+  @Override
+  public void testSingleLong() throws Exception
+  {
+    Assume.assumeTrue(testCases.isCurrentTestEnabled());
+    ByteBuffer payload = testHelper.generateBufferWithLongs(1);
+    // block index size: "8" : 4 bytes
+    // block index entry 0: "0": 4 bytes
+    // block index entry 1: "0": 4 bytes
+    // data stream size : "1" : 4 bytes
+    // compressed single 8 bytes: 9 bytes (compressed: "0")
+    runTestWithExceptionHandling(Collections.singletonList(payload), testCases.currentTestValue());
+  }
+
+  @Test
+  @Override
+  public void testVariableSizedCompressablePayloads() throws Exception
+  {
+    Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+    List<ByteBuffer> bufferList = testHelper.generateRaggedPayloadBuffer(100, 1024, 10, 0, 0, 10);
+
+    runTestWithExceptionHandling(bufferList, testCases.currentTestValue());
+  }
+
+  @Test
+  @Override
+  public void testOutliersInNormalDataUncompressablePayloads() throws Exception
+  {
+    Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+    // every integer within a payload is unique
+    List<ByteBuffer> bufferList = testHelper.generateRaggedPayloadBuffer(100, 1024, 10, 64 * 1024, 2);
+
+    runTestWithExceptionHandling(bufferList, testCases.currentTestValue());
+  }
+
+  @Test
+  @Override
+  public void testOutliersInNormalDataCompressablePayloads() throws Exception
+  {
+    Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+    // same # of payloads and size of payloads as testOutliersInNormalDataUncompressablePayloads()
+    // integer values range 0-9
+    List<ByteBuffer> bufferList = testHelper.generateRaggedPayloadBuffer(100, 1024, 10, 64 * 1024, 2, 10);
+
+    runTestWithExceptionHandling(bufferList, testCases.currentTestValue());
+  }
+
+  @Test
+  @Override
+  public void testSingleUncompressableBlock() throws Exception
+  {
+    Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+    // every integer within a payload is unique
+    ByteBuffer byteBuffer = testHelper.generateIntPayloads(16 * 1024);
+
+    Assert.assertEquals(64 * 1024, byteBuffer.limit());
+    // uncompressable 64k block size
+    runTestWithExceptionHandling(Collections.singletonList(byteBuffer), testCases.currentTestValue());
+
+  }
+
+  @Test
+  @Override
+  public void testSingleWriteByteBufferZSTD() throws Exception
+  {
+    Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+    ByteBuffer sourcePayLoad = testHelper.generateBufferWithLongs(1024); // 8k
+
+    testHelper.setCompressionStrategy(CompressionStrategy.ZSTD);
+    runTestWithExceptionHandling(Collections.singletonList(sourcePayLoad), testCases.currentTestValue());
+  }
+
+
+  @Test
+  @Override
+  public void testSingleWriteByteBufferAlternateByteBufferProvider() throws Exception
+  {
+    Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+    List<ByteBuffer> bufferList = testHelper.generateRaggedPayloadBuffer(100, 1024, 10, 0, 0, 10);
+
+    testHelper.setByteBufferProvider(() -> new ResourceHolder<ByteBuffer>()
+    {
+      @Override
+      public ByteBuffer get()
+      {
+        return ByteBuffer.allocate(128 * 1024);
+      }
+
+      @Override
+      public void close()
+      {
+      }
+    });
+    runTestWithExceptionHandling(bufferList, testCases.currentTestValue());
+  }
+
+  @Test
+  @Override
+  public void testRandomBlockAccess() throws Exception
+  {
+    Assume.assumeTrue(testCases.isCurrentTestEnabled());
+    //verified that blocks are accessed in random order and the same block is even returned to
+    List<ByteBuffer> bufferList = testHelper.generateRaggedPayloadBuffer(8192, 32 * 1024, 256, 256 * 1024, 3, 1024);
+
+    testHelper.setUseRandomReadOrder(true);
+    runTestWithExceptionHandling(bufferList, testCases.currentTestValue());
+  }
+
+  private void runTestWithExceptionHandling(List<ByteBuffer> bufferList, TestCaseResult testCaseResult) throws Exception
+  {
+    try {
+      testHelper.validateReadAndSize(bufferList, testCaseResult.size);
+
+      if (testCaseResult.exception != null) {
+        Assert.fail("expected exception " + testCaseResult.exception.getClass().getName());
+      }
+    }
+    catch (Exception e) {
+      if (testCaseResult.exception != null) {
+        Assert.assertTrue(testCaseResult.exception.getClass().isAssignableFrom(e.getClass()));
+      } else {
+        throw e;
+      }
+    }
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTestCases.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTestCases.java
new file mode 100644
index 0000000000..2bedcfb821
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTestCases.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+public class BytesReadWriteTestCases extends TestCasesConfig<BytesReadWriteTest>
+{
+  public BytesReadWriteTestCases()
+  {
+    super(BytesReadWriteTest.class, BytesReadWriteTestBase.class);
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesWriter.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesWriter.java
new file mode 100644
index 0000000000..13d3c595fe
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * This interface is used so that both CellWriter[.Builder] and BlockCompressedPayloadWriter[.Builder] may use the
+ * same test code.
+ */
+public interface BytesWriter extends Closeable
+{
+  void write(byte[] cellBytes) throws IOException;
+
+  void write(ByteBuffer cellByteBuffer) throws IOException;
+
+  @Override
+  void close() throws IOException;
+
+  void transferTo(WritableByteChannel channel) throws IOException;
+
+  long getSerializedSize();
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesWriterBuilder.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesWriterBuilder.java
new file mode 100644
index 0000000000..7b2703bb7b
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesWriterBuilder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.segment.data.CompressionStrategy;
+
+import java.io.IOException;
+
+/**
+ * this interface is used so that both CellWriter[.Builder] and BlockCompressedPayload[.Builder] may use the
+ * same test code. production code should not use this and use the classes directly
+ */
+
+public interface BytesWriterBuilder
+{
+  BytesWriter build() throws IOException;
+
+  BytesWriterBuilder setCompressionStrategy(CompressionStrategy compressionStrategy);
+
+  BytesWriterBuilder setByteBufferProvider(ByteBufferProvider byteBufferProvider);
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/CellWriterReaderTest.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/CellWriterReaderTest.java
new file mode 100644
index 0000000000..ddc338dfdb
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/CellWriterReaderTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.primitives.Ints;
+import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * tests both {@link CellWriter} and {@link CellReader}
+ */
+public class CellWriterReaderTest extends BytesReadWriteTestBase
+{
+  public CellWriterReaderTest()
+  {
+    super(
+        new CellWriterToBytesWriter.Builder(
+            new CellWriter.Builder(new OnHeapMemorySegmentWriteOutMedium())
+        ),
+        ByteWriterTestHelper.ValidationFunctionBuilder.CELL_READER_VALIDATION_FUNCTION_FACTORY,
+        new BytesReadWriteTestCases()
+            .setTestCaseValue(BytesReadWriteTest::testSingleLong, TestCaseResult.of(62))
+            .setTestCaseValue(BytesReadWriteTest::testEmptyByteArray, TestCaseResult.of(46))
+            .setTestCaseValue(BytesReadWriteTest::testNull, TestCaseResult.of(46))
+            .setTestCaseValue(BytesReadWriteTest::testSingleWriteBytes, TestCaseResult.of(4151))
+            .setTestCaseValue(BytesReadWriteTest::testSingleMultiBlockWriteBytes, TestCaseResult.of(1049204))
+            .setTestCaseValue(BytesReadWriteTest::testSingleMultiBlockWriteBytesWithPrelude, TestCaseResult.of(1053277))
+            .setTestCaseValue(BytesReadWriteTest::testVariableSizedCompressablePayloads, TestCaseResult.of(1655))
+            .setTestCaseValue(BytesReadWriteTest::testOutliersInNormalDataCompressablePayloads, TestCaseResult.of(7368))
+            .setTestCaseValue(
+                BytesReadWriteTest::testOutliersInNormalDataUncompressablePayloads,
+                TestCaseResult.of(575673)
+            )
+            .setTestCaseValue(BytesReadWriteTest::testSingleUncompressableBlock, TestCaseResult.of(65750))
+            .setTestCaseValue(BytesReadWriteTest::testSingleWriteByteBufferZSTD, TestCaseResult.of(845))
+            .setTestCaseValue(
+                BytesReadWriteTest::testSingleWriteByteBufferAlternateByteBufferProvider,
+                TestCaseResult.of(1552)
+            )
+            .setTestCaseValue(BytesReadWriteTest::testRandomBlockAccess, TestCaseResult.of(3126618))
+    );
+  }
+
+  @Test
+  public void testBasic() throws Exception
+  {
+    // generates a list of randomly variable-sized payloads in a range
+    List<ByteBuffer> byteBufferList = getTestHelper().generateRaggedPayloadBuffer(
+        500,
+        2048,
+        25,
+        64 * 1024,
+        2,
+        10 * 1024
+    );
+    // for test only, we store bytes heap buffers
+    HeapByteBufferWriteOutBytes writableChannel = new HeapByteBufferWriteOutBytes();
+    long size;
+
+    try (CellWriter cellWriter = CellWriter.builder(new OnHeapMemorySegmentWriteOutMedium()).build()) {
+      // write our payloads
+      for (ByteBuffer byteBuffer : byteBufferList) {
+        cellWriter.write(byteBuffer);
+      }
+      // finalize the internal buffer in the CellWriter
+      cellWriter.close();
+      // transfter the buffer to our WritableByteChannel
+      cellWriter.writeTo(writableChannel, null);
+      size = cellWriter.getSerializedSize();
+    }
+
+    // transfer the bytes into a ByteBuffer. Normally the WritableByteChannel would be to a file that could be
+    // memory mapped into a ByteBuffer
+    ByteBuffer storedByteBuffer = ByteBuffer.allocate(Ints.checkedCast(size));
+
+    writableChannel.readFully(0, storedByteBuffer);
+    storedByteBuffer.flip();
+
+    try (CellReader cellReader = CellReader.builder(storedByteBuffer).build()) {
+      for (int i = 0; i < byteBufferList.size(); i++) {
+        ByteBuffer buffer = byteBufferList.get(i);
+        ByteBuffer readCell = cellReader.getCell(i);
+
+        Assert.assertEquals(buffer, readCell);
+      }
+    }
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/CellWriterToBytesWriter.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/CellWriterToBytesWriter.java
new file mode 100644
index 0000000000..d60eec3fae
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/CellWriterToBytesWriter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.segment.data.CompressionStrategy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+public class CellWriterToBytesWriter implements BytesWriter
+{
+  private final CellWriter cellWriter;
+
+  public CellWriterToBytesWriter(CellWriter cellWriter)
+  {
+    this.cellWriter = cellWriter;
+  }
+
+  @Override
+  public void write(byte[] cellBytes) throws IOException
+  {
+    cellWriter.write(cellBytes);
+  }
+
+  @Override
+  public void write(ByteBuffer cellByteBuffer) throws IOException
+  {
+    cellWriter.write(cellByteBuffer);
+  }
+
+  @Override
+  public void transferTo(WritableByteChannel channel) throws IOException
+  {
+    cellWriter.writeTo(channel, null);
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    cellWriter.close();
+  }
+
+  @Override
+  public long getSerializedSize()
+  {
+    return cellWriter.getSerializedSize();
+  }
+
+  public static class Builder implements BytesWriterBuilder
+  {
+    private final CellWriter.Builder builder;
+
+    public Builder(CellWriter.Builder builder)
+    {
+      this.builder = builder;
+    }
+
+    @Override
+    public BytesWriterBuilder setCompressionStrategy(CompressionStrategy compressionStrategy)
+    {
+      builder.setCompressionStrategy(compressionStrategy);
+
+      return this;
+    }
+
+    @Override
+    public BytesWriterBuilder setByteBufferProvider(ByteBufferProvider byteBufferProvider)
+    {
+      builder.setByteBufferProvider(byteBufferProvider);
+
+      return this;
+    }
+
+    @Override
+    public BytesWriter build() throws IOException
+    {
+      return new CellWriterToBytesWriter(builder.build());
+    }
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/RandomStringUtils.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/RandomStringUtils.java
new file mode 100644
index 0000000000..a7b148fcda
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/RandomStringUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import java.util.Random;
+
+public class RandomStringUtils
+{
+  private final Random random;
+
+  public RandomStringUtils()
+  {
+    random = new Random(0);
+  }
+
+  public RandomStringUtils(Random random)
+  {
+    this.random = random;
+  }
+
+  public String randomAlphanumeric(int length)
+  {
+    return org.apache.commons.lang3.RandomStringUtils.random(length, 0, 0, true, true, null, random);
+  }
+
+  public String randomAlphanumeric(int minLength, int maxLength)
+  {
+    int length = random.nextInt(maxLength - minLength) + minLength;
+
+    return org.apache.commons.lang3.RandomStringUtils.random(length, 0, 0, true, true, null, random);
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/TestCaseResult.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/TestCaseResult.java
new file mode 100644
index 0000000000..a75b29fd0e
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/TestCaseResult.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+public class TestCaseResult
+{
+  public final byte[] bytes;
+  public final int size;
+  public final Exception exception;
+
+  private TestCaseResult(byte[] bytes, int size, Exception exception)
+  {
+    this.bytes = bytes;
+    this.size = size;
+    this.exception = exception;
+  }
+
+  public static TestCaseResult of(Exception exception)
+  {
+    return new TestCaseResult(null, -1, exception);
+  }
+
+  public static TestCaseResult of(int sizeBytes)
+  {
+    return new TestCaseResult(null, sizeBytes, null);
+  }
+
+  public static TestCaseResult of(byte[] bytes)
+  {
+    return new TestCaseResult(bytes, bytes.length, null);
+  }
+
+  public static TestCaseResult of(byte[] bytes, int sizeBytes)
+  {
+    return new TestCaseResult(bytes, sizeBytes, null);
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/TestCasesConfig.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/TestCasesConfig.java
new file mode 100644
index 0000000000..078113cc72
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/TestCasesConfig.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class TestCasesConfig<T>
+{
+  private final MethodCallCapturer<T> methodCallCapturer;
+  private final Class<T> testCasesInterface;
+  private final Class<? extends T> testClassImpl;
+  private final Map<TestMethodHandle, TestCaseResult> testCasesToRun = new LinkedHashMap<>();
+
+  public TestCasesConfig(Class<T> testCasesInterface, Class<? extends T> testClassImpl)
+  {
+    methodCallCapturer = new MethodCallCapturer<>(testCasesInterface);
+    this.testCasesInterface = testCasesInterface;
+    this.testClassImpl = testClassImpl;
+  }
+
+  public TestCasesConfig<T> setTestCaseValue(TestMethodHandle testMethodHandle, TestCaseResult expectedResult)
+  {
+    testCasesToRun.put(testMethodHandle, expectedResult);
+
+    return this;
+  }
+
+  public TestCasesConfig<T> setTestCaseValue(MethodAccess<T, Exception> methodAccess, TestCaseResult expectedResult)
+  {
+    TestMethodHandle testMethodHandle = capture(methodAccess);
+    testCasesToRun.put(testMethodHandle, expectedResult);
+
+    return this;
+  }
+
+  public TestCasesConfig<T> setTestCaseValue(MethodAccess<T, Exception> methodAccess, int sizeBytes)
+  {
+    TestMethodHandle testMethodHandle = capture(methodAccess);
+    testCasesToRun.put(testMethodHandle, TestCaseResult.of(sizeBytes));
+
+    return this;
+  }
+
+  public TestCasesConfig<T> setTestCaseValue(MethodAccess<T, Exception> methodAccess, byte[] bytes)
+  {
+    TestMethodHandle testMethodHandle = capture(methodAccess);
+    testCasesToRun.put(testMethodHandle, TestCaseResult.of(bytes));
+
+    return this;
+  }
+
+  public TestCasesConfig<T> enableTestCase(MethodAccess<T, Exception> methodAccess)
+  {
+    TestMethodHandle testMethodHandle = capture(methodAccess);
+    testCasesToRun.put(testMethodHandle, TestCaseResult.of(-1));
+
+    return this;
+  }
+
+  public TestCaseResult currentTestValue()
+  {
+    TestMethodHandle currentTestMethodHandle = getCurrentTestMethod();
+    return testCasesToRun.get(currentTestMethodHandle);
+  }
+
+  public boolean isCurrentTestEnabled()
+  {
+    TestMethodHandle currentTestMethodHandle = getCurrentTestMethod();
+    return testCasesToRun.containsKey(currentTestMethodHandle);
+  }
+
+  private TestMethodHandle capture(MethodAccess<T, Exception> access)
+  {
+    try {
+      Method method = methodCallCapturer.captureMethod(access);
+      TestMethodHandle testMethodHandle = new TestMethodHandle(method.getName());
+
+      return testMethodHandle;
+    }
+    catch (Throwable e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private TestMethodHandle getCurrentTestMethod()
+  {
+    StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+    String thisMethodName = stackTrace[3].getMethodName();
+
+    return new TestMethodHandle(thisMethodName);
+  }
+
+  public class TestMethodHandle
+  {
+    private final String name;
+
+    public TestMethodHandle(String name)
+    {
+      this.name = name;
+      try {
+        // validate method exists
+        MethodHandles.lookup()
+                     .findVirtual(testCasesInterface, name, MethodType.methodType(void.class));
+        // validate method exists
+        MethodHandles.lookup()
+                     .findVirtual(testClassImpl, name, MethodType.methodType(void.class));
+      }
+      catch (NoSuchMethodException | IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public String getName()
+    {
+      return testCasesInterface.getName() + "::void " + name + "()";
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return getName().hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (obj != null && this.getClass().equals(obj.getClass())) {
+        return getName().equals(((TestMethodHandle) obj).getName());
+      }
+
+      return false;
+    }
+
+
+    @Override
+    public String toString()
+    {
+      return getName();
+    }
+  }
+
+  public interface MethodAccess<I, T extends Throwable>
+  {
+    void access(I input) throws T;
+  }
+
+  private static class MethodCallCapturer<T> implements InvocationHandler
+  {
+    private volatile Method lastMethod = null;
+    private final T wrapper;
+
+    @SuppressWarnings("unchecked")
+    public MethodCallCapturer(Class<T> clazz)
+    {
+      wrapper = (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
+    }
+
+    public <E extends Throwable> Method captureMethod(MethodAccess<T, E> access) throws Throwable
+    {
+      access.access(wrapper);
+
+      return lastMethod;
+    }
+
+
+    @SuppressWarnings("ReturnOfNull")
+    @Override
+    public Object invoke(Object proxy, Method method, Object[] args)
+    {
+      lastMethod = method;
+
+      // unused
+      return null;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org