You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ch...@apache.org on 2022/09/07 03:01:05 UTC
[druid] branch master updated: Improve String Last/First Storage Efficiency (#12879)
This is an automated email from the ASF dual-hosted git repository.
cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ed26e2d634 Improve String Last/First Storage Efficiency (#12879)
ed26e2d634 is described below
commit ed26e2d6348772e91977ae67b747105ac0e5e5f3
Author: sr <sa...@gmail.com>
AuthorDate: Tue Sep 6 20:00:54 2022 -0700
Improve String Last/First Storage Efficiency (#12879)
-Add classes for writing cell values in LZ4 block compressed format.
Payloads are indexed by element number for efficient random lookup
-update SerializablePairLongStringComplexMetricSerde to use block
compression
-SerializablePairLongStringComplexMetricSerde also uses delta encoding
of the Long by doing 2-pass encoding: buffers first to find min/max
numbers and delta-encodes as integers if possible
Entry points for doing block-compressed storage of byte[] payloads
are the CellWriter and CellReader class. See
SerializablePairLongStringComplexMetricSerde for how these are used
along with how to do full column-based storage (delta encoding here)
which includes 2-pass encoding to compute a column header
---
.../apache/druid/jackson/AggregatorsModule.java | 4 +-
.../SerializablePairLongStringBufferStore.java | 162 ++++++++
.../SerializablePairLongStringColumnHeader.java | 111 ++++++
...SerializablePairLongStringColumnSerializer.java | 123 +++++++
.../SerializablePairLongStringComplexColumn.java | 133 +++++++
...ializablePairLongStringComplexMetricSerde.java} | 78 ++--
...zablePairLongStringDeltaEncodedStagedSerde.java | 124 +++++++
...erializablePairLongStringSimpleStagedSerde.java | 95 +++++
.../druid/query/aggregation/SerializedStorage.java | 153 ++++++++
.../serde/cell/BlockCompressedPayloadBuffer.java | 122 +++++++
.../cell/BlockCompressedPayloadBufferFactory.java | 65 ++++
.../serde/cell/BlockCompressedPayloadReader.java | 173 +++++++++
.../cell/BlockCompressedPayloadSerializer.java | 58 +++
.../serde/cell/BlockCompressedPayloadWriter.java | 125 +++++++
.../druid/segment/serde/cell/BlockIndexWriter.java | 30 ++
.../segment/serde/cell/ByteBufferProvider.java | 34 ++
.../druid/segment/serde/cell/CellIndexReader.java | 54 +++
.../druid/segment/serde/cell/CellIndexWriter.java | 76 ++++
.../druid/segment/serde/cell/CellReader.java | 176 +++++++++
.../druid/segment/serde/cell/CellWriter.java | 239 ++++++++++++
.../druid/segment/serde/cell/IOIterator.java | 33 ++
.../druid/segment/serde/cell/IndexWriter.java | 80 ++++
.../druid/segment/serde/cell/IntIndexView.java | 69 ++++
.../druid/segment/serde/cell/IntSerializer.java | 45 +++
.../druid/segment/serde/cell/LongSerializer.java | 43 +++
.../cell/NativeClearedByteBufferProvider.java | 45 +++
.../druid/segment/serde/cell/NumberSerializer.java | 29 ++
.../druid/segment/serde/cell/PayloadEntrySpan.java | 42 +++
.../druid/segment/serde/cell/StagedSerde.java | 103 ++++++
.../druid/segment/serde/cell/StorableBuffer.java | 54 +++
.../SerializablePairLongStringBufferStoreTest.java | 388 ++++++++++++++++++++
...izablePairLongStringComplexMetricSerdeTest.java | 237 ++++++++++++
...ePairLongStringDeltaEncodedStagedSerdeTest.java | 99 +++++
...lizablePairLongStringSimpleStagedSerdeTest.java | 70 ++++
.../SingleValueColumnValueSelector.java | 79 ++++
.../first/StringFirstTimeseriesQueryTest.java | 4 +-
.../last/StringLastTimeseriesQueryTest.java | 4 +-
.../druid/segment/serde/ComplexMetricsTest.java | 6 +-
.../BlockCompressedPayloadWriterReaderTest.java | 61 ++++
.../BlockCompressedPayloadWriterToBytesWriter.java | 98 +++++
.../segment/serde/cell/ByteWriterTestHelper.java | 406 +++++++++++++++++++++
.../segment/serde/cell/BytesReadWriteTest.java | 49 +++
.../segment/serde/cell/BytesReadWriteTestBase.java | 254 +++++++++++++
.../serde/cell/BytesReadWriteTestCases.java | 28 ++
.../druid/segment/serde/cell/BytesWriter.java | 43 +++
.../segment/serde/cell/BytesWriterBuilder.java | 38 ++
.../segment/serde/cell/CellWriterReaderTest.java | 110 ++++++
.../serde/cell/CellWriterToBytesWriter.java | 98 +++++
.../segment/serde/cell/RandomStringUtils.java | 49 +++
.../druid/segment/serde/cell/TestCaseResult.java | 54 +++
.../druid/segment/serde/cell/TestCasesConfig.java | 199 ++++++++++
51 files changed, 5002 insertions(+), 50 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
index 155b8e7e7b..9f79273050 100644
--- a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
+++ b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java
@@ -39,7 +39,7 @@ import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
-import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
+import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde;
import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.FloatAnyAggregatorFactory;
import org.apache.druid.query.aggregation.any.LongAnyAggregatorFactory;
@@ -81,7 +81,7 @@ public class AggregatorsModule extends SimpleModule
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
ComplexMetrics.registerSerde("preComputedHyperUnique", new PreComputedHyperUniquesSerde());
- ComplexMetrics.registerSerde("serializablePairLongString", new SerializablePairLongStringSerde());
+ ComplexMetrics.registerSerde("serializablePairLongString", new SerializablePairLongStringComplexMetricSerde());
setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
setMixInAnnotation(PostAggregator.class, PostAggregatorMixin.class);
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStore.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStore.java
new file mode 100644
index 0000000000..843e1a4eff
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStore.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.serde.Serializer;
+import org.apache.druid.segment.serde.cell.ByteBufferProvider;
+import org.apache.druid.segment.serde.cell.CellWriter;
+import org.apache.druid.segment.serde.cell.IOIterator;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+public class SerializablePairLongStringBufferStore
+{
+ private final SerializedStorage<SerializablePairLongString> serializedStorage;
+
+ private long minValue = Long.MAX_VALUE;
+ private long maxValue = Long.MIN_VALUE;
+
+ public SerializablePairLongStringBufferStore(SerializedStorage<SerializablePairLongString> serializedStorage)
+ {
+ this.serializedStorage = serializedStorage;
+ }
+
+ public void store(@Nullable SerializablePairLongString pairLongString) throws IOException
+ {
+ if (pairLongString != null && pairLongString.lhs != null) {
+ minValue = Math.min(minValue, pairLongString.lhs);
+ maxValue = Math.max(maxValue, pairLongString.lhs);
+ }
+
+ serializedStorage.store(pairLongString);
+ }
+
+ /**
+ * each call transfers the temporary buffer into an encoded, block-compessed buffer of the segment. It is ready to be
+ * transferred to a {@link WritableByteChannel}
+ *
+ * @param byteBufferProvider - provides a ByteBuffer used for block compressed encoding
+ * @param segmentWriteOutMedium - used to create temporary storage
+ * @return encoded buffer ready to be stored
+ * @throws IOException
+ */
+ public TransferredBuffer transferToRowWriter(
+ ByteBufferProvider byteBufferProvider,
+ SegmentWriteOutMedium segmentWriteOutMedium
+ ) throws IOException
+ {
+ SerializablePairLongStringColumnHeader columnHeader = createColumnHeader();
+ SerializablePairLongStringDeltaEncodedStagedSerde serde =
+ new SerializablePairLongStringDeltaEncodedStagedSerde(
+ columnHeader.getMinValue(),
+ columnHeader.isUseIntegerDeltas()
+ );
+
+ // try-with-resources will call cellWriter.close() an extra time in the normal case, but it protects against
+ // buffer leaking in the case of an exception (close() is idempotent). In the normal path, close() performs some
+ // finalization of the CellWriter object. We want that object state finalized before creating the TransferredBuffer
+ // as a point of good style (though strictly speaking, it works fine to pass it in before calling close since
+ // TransferredBuffer does not do anything in the constructor with the object)
+ try (CellWriter cellWriter =
+ new CellWriter.Builder(segmentWriteOutMedium).setByteBufferProvider(byteBufferProvider).build()) {
+ try (IOIterator<SerializablePairLongString> bufferIterator = iterator()) {
+ while (bufferIterator.hasNext()) {
+ SerializablePairLongString pairLongString = bufferIterator.next();
+ byte[] serialized = serde.serialize(pairLongString);
+
+ cellWriter.write(serialized);
+ }
+
+ cellWriter.close();
+
+ return new TransferredBuffer(cellWriter, columnHeader);
+ }
+ }
+ }
+
+ @Nonnull
+ public SerializablePairLongStringColumnHeader createColumnHeader()
+ {
+ long maxDelta = maxValue - minValue;
+ SerializablePairLongStringColumnHeader columnHeader;
+
+ if (minValue < maxValue && maxDelta < 0 || minValue > maxValue) {
+ // true iff
+ // 1. we have overflow in our range || 2. we have only seen null values
+ // in this case, effectively disable delta encoding by using longs and a min value of 0
+ maxDelta = Long.MAX_VALUE;
+ minValue = 0;
+ }
+
+ if (maxDelta <= Integer.MAX_VALUE) {
+ columnHeader = new SerializablePairLongStringColumnHeader(
+ SerializablePairLongStringComplexMetricSerde.EXPECTED_VERSION,
+ true,
+ minValue
+ );
+ } else {
+ columnHeader = new SerializablePairLongStringColumnHeader(
+ SerializablePairLongStringComplexMetricSerde.EXPECTED_VERSION,
+ false,
+ minValue
+ );
+ }
+ return columnHeader;
+ }
+
+ public IOIterator<SerializablePairLongString> iterator() throws IOException
+ {
+ return serializedStorage.iterator();
+ }
+
+ /**
+ * contains serialized data that is compressed and delta-encoded (Long)
+ * It's ready to be transferred to a {@link WritableByteChannel}
+ */
+ public static class TransferredBuffer implements Serializer
+ {
+ private final CellWriter cellWriter;
+ private final SerializablePairLongStringColumnHeader columnHeader;
+
+ public TransferredBuffer(CellWriter cellWriter, SerializablePairLongStringColumnHeader columnHeader)
+ {
+ this.cellWriter = cellWriter;
+ this.columnHeader = columnHeader;
+ }
+
+ @Override
+ public void writeTo(WritableByteChannel channel, @Nullable FileSmoosher smoosher) throws IOException
+ {
+ columnHeader.transferTo(channel);
+ cellWriter.writeTo(channel, smoosher);
+ }
+
+ @Override
+ public long getSerializedSize()
+ {
+ return columnHeader.getSerializedSize() + cellWriter.getSerializedSize();
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnHeader.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnHeader.java
new file mode 100644
index 0000000000..e0aa21af03
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnHeader.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import org.apache.druid.segment.serde.cell.LongSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+public class SerializablePairLongStringColumnHeader
+{
+ // header size is 4 bytes for word alignment for LZ4 (minmatch) compression
+ private static final int HEADER_SIZE_BYTES = 4;
+ private static final int USE_INTEGER_MASK = 0x80;
+ private static final int VERSION_INDEX = 0;
+ private static final int ENCODING_INDEX = 1;
+
+ private final byte[] bytes;
+ private final long minValue;
+
+ private SerializablePairLongStringColumnHeader(byte[] bytes, long minTimestamp)
+ {
+ this.bytes = bytes;
+ this.minValue = minTimestamp;
+ }
+
+ public SerializablePairLongStringColumnHeader(int version, boolean useIntegerDeltas, long minTimestamp)
+ {
+ this.minValue = minTimestamp;
+ bytes = new byte[HEADER_SIZE_BYTES];
+ Preconditions.checkArgument(version <= 255, "max version 255");
+ bytes[VERSION_INDEX] = (byte) version;
+
+ if (useIntegerDeltas) {
+ bytes[ENCODING_INDEX] |= USE_INTEGER_MASK;
+ }
+ }
+
+ public static SerializablePairLongStringColumnHeader fromBuffer(ByteBuffer byteBuffer)
+ {
+ byte[] bytes = new byte[HEADER_SIZE_BYTES];
+
+ byteBuffer.get(bytes);
+
+ long minTimestamp = byteBuffer.getLong();
+
+ return new SerializablePairLongStringColumnHeader(bytes, minTimestamp);
+ }
+
+ public SerializablePairLongStringDeltaEncodedStagedSerde createSerde()
+ {
+ return new SerializablePairLongStringDeltaEncodedStagedSerde(minValue, isUseIntegerDeltas());
+ }
+
+ public void transferTo(WritableByteChannel channel) throws IOException
+ {
+ LongSerializer longSerializer = new LongSerializer();
+
+ channel.write(ByteBuffer.wrap(bytes));
+ channel.write(longSerializer.serialize(minValue));
+ }
+
+ public int getVersion()
+ {
+ return 0XFF & bytes[VERSION_INDEX];
+ }
+
+ public boolean isUseIntegerDeltas()
+ {
+ return (bytes[ENCODING_INDEX] & USE_INTEGER_MASK) != 0;
+ }
+
+ public long getMinValue()
+ {
+ return minValue;
+ }
+
+ public int getSerializedSize()
+ {
+ return HEADER_SIZE_BYTES + Long.BYTES;
+ }
+
+ @Override
+ public String toString()
+ {
+ return Objects.toStringHelper(this)
+ .add("bytes", bytes)
+ .add("minValue", minValue)
+ .toString();
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnSerializer.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnSerializer.java
new file mode 100644
index 0000000000..3930ee6060
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringColumnSerializer.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.GenericColumnSerializer;
+import org.apache.druid.segment.serde.cell.ByteBufferProvider;
+import org.apache.druid.segment.serde.cell.StagedSerde;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * valid call sequence
+ * <p>
+ * open()+serialize()*(getSerializedSize()|writeTo())*
+ * <p>
+ * getSerializedSize() / writeTo() effectively function as a close call, but each may be called multiple times and has
+ * no effect on one another.
+ */
+@SuppressWarnings("NotNullFieldNotInitialized")
+public class SerializablePairLongStringColumnSerializer implements GenericColumnSerializer<SerializablePairLongString>
+{
+ public static final StagedSerde<SerializablePairLongString> STAGED_SERDE =
+ new SerializablePairLongStringSimpleStagedSerde();
+
+ private final SegmentWriteOutMedium segmentWriteOutMedium;
+ private final ByteBufferProvider byteBufferProvider;
+
+ private State state = State.START;
+ private SerializablePairLongStringBufferStore bufferStore;
+ private SerializablePairLongStringBufferStore.TransferredBuffer transferredBuffer;
+
+ public SerializablePairLongStringColumnSerializer(
+ SegmentWriteOutMedium segmentWriteOutMedium,
+ ByteBufferProvider byteBufferProvider
+ )
+ {
+ this.segmentWriteOutMedium = segmentWriteOutMedium;
+ this.byteBufferProvider = byteBufferProvider;
+ }
+
+ @Override
+ public void open() throws IOException
+ {
+ Preconditions.checkState(state == State.START || state == State.OPEN, "open called in invalid state %s", state);
+
+ if (state == State.START) {
+ bufferStore = new SerializablePairLongStringBufferStore(
+ new SerializedStorage<>(segmentWriteOutMedium.makeWriteOutBytes(), STAGED_SERDE)
+ );
+ state = State.OPEN;
+ }
+ }
+
+ @Override
+ public void serialize(ColumnValueSelector<? extends SerializablePairLongString> selector) throws IOException
+ {
+ Preconditions.checkState(state == State.OPEN, "serialize called in invalid state %s", state);
+
+ SerializablePairLongString pairLongString = selector.getObject();
+
+ bufferStore.store(pairLongString);
+ }
+
+ @Override
+ public long getSerializedSize() throws IOException
+ {
+ Preconditions.checkState(
+ state != State.START,
+ "getSerializedSize called in invalid state %s (must have opened at least)",
+ state
+ );
+
+ transferToRowWriterIfNecessary();
+
+ return transferredBuffer.getSerializedSize();
+ }
+
+ @Override
+ public void writeTo(WritableByteChannel channel, @Nullable FileSmoosher smoosher) throws IOException
+ {
+ Preconditions.checkState(state != State.START, "writeTo called in invalid state %s", state);
+ transferToRowWriterIfNecessary();
+ transferredBuffer.writeTo(channel, smoosher);
+ }
+
+ private void transferToRowWriterIfNecessary() throws IOException
+ {
+ if (state == State.OPEN) {
+ transferredBuffer = bufferStore.transferToRowWriter(byteBufferProvider, segmentWriteOutMedium);
+ state = State.CLOSED;
+ }
+ }
+
+ private enum State
+ {
+ START,
+ OPEN,
+ CLOSED,
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexColumn.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexColumn.java
new file mode 100644
index 0000000000..d3b4dcfbb4
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexColumn.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.column.ComplexColumn;
+import org.apache.druid.segment.serde.cell.ByteBufferProvider;
+import org.apache.druid.segment.serde.cell.CellReader;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class SerializablePairLongStringComplexColumn implements ComplexColumn
+{
+ private final Closer closer;
+ private final int serializedSize;
+ private final CellReader cellReader;
+ private final SerializablePairLongStringDeltaEncodedStagedSerde serde;
+
+ public SerializablePairLongStringComplexColumn(
+ CellReader cellReader,
+ SerializablePairLongStringDeltaEncodedStagedSerde serde,
+ Closer closer,
+ int serializedSize
+ )
+ {
+ this.cellReader = cellReader;
+ this.serde = serde;
+ this.closer = closer;
+ this.serializedSize = serializedSize;
+ }
+
+ @Override
+ public Class<?> getClazz()
+ {
+ return SerializablePairLongString.class;
+ }
+
+ @Override
+ public String getTypeName()
+ {
+ return SerializablePairLongStringComplexMetricSerde.TYPE_NAME;
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ @Override
+ public Object getRowValue(int rowNum)
+ {
+ // This can return nulls, meaning that it is expected that anything reading from this does
+ // something "good" with null. At time of writing, the relevan taggregators handle null properly
+ return serde.deserialize(cellReader.getCell(rowNum));
+ }
+
+ @Override
+ public int getLength()
+ {
+ return serializedSize;
+ }
+
+ @Override
+ public void close()
+ {
+ try {
+ closer.close();
+ }
+ catch (IOException e) {
+ throw new RE(e, "error closing " + getClass().getName());
+ }
+ }
+
+ public static class Builder
+ {
+ private final int serializedSize;
+ private final SerializablePairLongStringDeltaEncodedStagedSerde serde;
+ private final CellReader.Builder cellReaderBuilder;
+
+ public Builder(ByteBuffer buffer)
+ {
+ ByteBuffer masterByteBuffer = buffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder());
+
+ serializedSize = masterByteBuffer.remaining();
+
+ SerializablePairLongStringColumnHeader columnHeader =
+ SerializablePairLongStringColumnHeader.fromBuffer(masterByteBuffer);
+
+ Preconditions.checkArgument(
+ columnHeader.getVersion() == SerializablePairLongStringComplexMetricSerde.EXPECTED_VERSION,
+ "version %s expected, got %s",
+ SerializablePairLongStringComplexMetricSerde.EXPECTED_VERSION,
+ columnHeader.getVersion()
+ );
+ serde = columnHeader.createSerde();
+ cellReaderBuilder = new CellReader.Builder(masterByteBuffer);
+ }
+
+ public Builder setByteBufferProvider(ByteBufferProvider byteBufferProvider)
+ {
+ cellReaderBuilder.setByteBufferProvider(byteBufferProvider);
+
+ return this;
+ }
+
+ public SerializablePairLongStringComplexColumn build()
+ {
+ Closer closer = Closer.create();
+ CellReader cellReader = cellReaderBuilder.build();
+
+ closer.register(cellReader);
+
+ return new SerializablePairLongStringComplexColumn(cellReader, serde, closer, serializedSize);
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java
similarity index 61%
rename from processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSerde.java
rename to processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java
index 49300ff531..deb1623701 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSerde.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java
@@ -21,7 +21,6 @@ package org.apache.druid.query.aggregation;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.data.input.InputRow;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.data.GenericIndexed;
@@ -29,7 +28,7 @@ import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
-import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
+import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
@@ -39,14 +38,19 @@ import java.util.Comparator;
/**
* The SerializablePairLongStringSerde serializes a Long-String pair (SerializablePairLongString).
* The serialization structure is: Long:Integer:String
+ * The Long is delta-encoded for the column in order to potentially reduce the size to an integer so it may be stored
+ * as: Integer:Integer:String
+ * <p>
+ * Future work: dictionary encoding of the String may be performed
* <p>
* The class is used on first/last String aggregators to store the time and the first/last string.
- * Long:Integer:String -> Timestamp:StringSize:StringData
+ * [Integer|Long]:Integer:String -> delta:StringSize:StringData --(delta decoded)--> TimeStamp:StringSize:StringData
+ * (see {@link SerializablePairLongStringDeltaEncodedStagedSerde )}
*/
-public class SerializablePairLongStringSerde extends ComplexMetricSerde
+public class SerializablePairLongStringComplexMetricSerde extends ComplexMetricSerde
{
-
- private static final String TYPE_NAME = "serializablePairLongString";
+ public static final int EXPECTED_VERSION = 3;
+ public static final String TYPE_NAME = "serializablePairLongString";
// Null SerializablePairLongString values are put first
private static final Comparator<SerializablePairLongString> COMPARATOR = Comparator.nullsFirst(
// assumes that the LHS of the pair will never be null
@@ -54,6 +58,9 @@ public class SerializablePairLongStringSerde extends ComplexMetricSerde
.thenComparing(SerializablePair::getRhs, Comparator.nullsFirst(Comparator.naturalOrder()))
);
+ private static final SerializablePairLongStringSimpleStagedSerde SERDE =
+ new SerializablePairLongStringSimpleStagedSerde();
+
@Override
public String getTypeName()
{
@@ -61,9 +68,9 @@ public class SerializablePairLongStringSerde extends ComplexMetricSerde
}
@Override
- public ComplexMetricExtractor getExtractor()
+ public ComplexMetricExtractor<?> getExtractor()
{
- return new ComplexMetricExtractor()
+ return new ComplexMetricExtractor<Object>()
{
@Override
public Class<SerializablePairLongString> extractedClass()
@@ -82,12 +89,21 @@ public class SerializablePairLongStringSerde extends ComplexMetricSerde
@Override
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder)
{
- final GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper());
- columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
+ byte version = buffer.get(buffer.position());
+
+ if (version == 0 || version == 1 || version == 2) {
+ GenericIndexed<?> column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper());
+ columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column));
+ } else {
+ SerializablePairLongStringComplexColumn.Builder builder =
+ new SerializablePairLongStringComplexColumn.Builder(buffer)
+ .setByteBufferProvider(NativeClearedByteBufferProvider.INSTANCE);
+ columnBuilder.setComplexColumnSupplier(builder::build);
+ }
}
@Override
- public ObjectStrategy getObjectStrategy()
+ public ObjectStrategy<?> getObjectStrategy()
{
return new ObjectStrategy<SerializablePairLongString>()
{
@@ -106,48 +122,28 @@ public class SerializablePairLongStringSerde extends ComplexMetricSerde
@Override
public SerializablePairLongString fromByteBuffer(ByteBuffer buffer, int numBytes)
{
- final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
-
- long lhs = readOnlyBuffer.getLong();
- int stringSize = readOnlyBuffer.getInt();
+ ByteBuffer readOnlyByteBuffer = buffer.asReadOnlyBuffer().order(buffer.order());
- String lastString = null;
- if (stringSize > 0) {
- byte[] stringBytes = new byte[stringSize];
- readOnlyBuffer.get(stringBytes, 0, stringSize);
- lastString = StringUtils.fromUtf8(stringBytes);
- }
+ readOnlyByteBuffer.limit(buffer.position() + numBytes);
- return new SerializablePairLongString(lhs, lastString);
+ return SERDE.deserialize(readOnlyByteBuffer);
}
+ @SuppressWarnings("NullableProblems")
@Override
public byte[] toBytes(SerializablePairLongString val)
{
- String rhsString = val.rhs;
- ByteBuffer bbuf;
-
- if (rhsString != null) {
- byte[] rhsBytes = StringUtils.toUtf8(rhsString);
- bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + rhsBytes.length);
- bbuf.putLong(val.lhs);
- bbuf.putInt(Long.BYTES, rhsBytes.length);
- bbuf.position(Long.BYTES + Integer.BYTES);
- bbuf.put(rhsBytes);
- } else {
- bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
- bbuf.putLong(val.lhs);
- bbuf.putInt(Long.BYTES, 0);
- }
-
- return bbuf.array();
+ return SERDE.serialize(val);
}
};
}
@Override
- public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
+ public GenericColumnSerializer<?> getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
- return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
+ return new SerializablePairLongStringColumnSerializer(
+ segmentWriteOutMedium,
+ NativeClearedByteBufferProvider.INSTANCE
+ );
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerde.java
new file mode 100644
index 0000000000..e1c070127b
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerde.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.serde.cell.StagedSerde;
+import org.apache.druid.segment.serde.cell.StorableBuffer;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * serializes a Long/String pair in the context of a column/segment. Uses the minValue to perform delta
+ * encoding/decoding and if the range of the segment fits in an integer (useIntegerDelta), the format is
+ * Integer:Integer:bytes
+ *
+ * otherwise
+ * Long:Integer:bytes
+ */
+public class SerializablePairLongStringDeltaEncodedStagedSerde implements StagedSerde<SerializablePairLongString>
+{
+ private final long minValue;
+ private final boolean useIntegerDelta;
+
+ public SerializablePairLongStringDeltaEncodedStagedSerde(long minValue, boolean useIntegerDelta)
+ {
+ this.minValue = minValue;
+ this.useIntegerDelta = useIntegerDelta;
+ }
+
+ @Override
+ public StorableBuffer serializeDelayed(@Nullable SerializablePairLongString value)
+ {
+ if (value == null) {
+ return StorableBuffer.EMPTY;
+ }
+
+ String rhsString = value.rhs;
+ byte[] rhsBytes = StringUtils.toUtf8WithNullToEmpty(rhsString);
+
+ return new StorableBuffer()
+ {
+ @Override
+ public void store(ByteBuffer byteBuffer)
+ {
+ Preconditions.checkNotNull(value.lhs, "Long in SerializablePairLongString must be non-null");
+
+ long delta = value.lhs - minValue;
+
+ Preconditions.checkState(delta >= 0 || delta == value.lhs);
+
+ if (useIntegerDelta) {
+ byteBuffer.putInt(Ints.checkedCast(delta));
+ } else {
+ byteBuffer.putLong(delta);
+ }
+
+ byteBuffer.putInt(rhsBytes.length);
+
+ if (rhsBytes.length > 0) {
+ byteBuffer.put(rhsBytes);
+ }
+ }
+
+ @Override
+ public int getSerializedSize()
+ {
+ return (useIntegerDelta ? Integer.BYTES : Long.BYTES) + Integer.BYTES + rhsBytes.length;
+ }
+ };
+ }
+
+ @Nullable
+ @Override
+ public SerializablePairLongString deserialize(ByteBuffer byteBuffer)
+ {
+ if (byteBuffer.remaining() == 0) {
+ return null;
+ }
+
+ ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder());
+ long lhs;
+
+ if (useIntegerDelta) {
+ lhs = readOnlyBuffer.getInt();
+ } else {
+ lhs = readOnlyBuffer.getLong();
+ }
+
+ lhs += minValue;
+
+ int stringSize = readOnlyBuffer.getInt();
+ String lastString = null;
+
+ if (stringSize > 0) {
+ byte[] stringBytes = new byte[stringSize];
+
+ readOnlyBuffer.get(stringBytes, 0, stringSize);
+ lastString = StringUtils.fromUtf8(stringBytes);
+ }
+
+ return new SerializablePairLongString(lhs, lastString);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerde.java
new file mode 100644
index 0000000000..acdd937d2a
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerde.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.serde.cell.StagedSerde;
+import org.apache.druid.segment.serde.cell.StorableBuffer;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * serializes a Long/String pair as
+ * Long:Integer:bytes
+ * <p>
+ * or
+ * Long:StringSize:StringData
+ */
+public class SerializablePairLongStringSimpleStagedSerde implements StagedSerde<SerializablePairLongString>
+{
+ @Override
+ public StorableBuffer serializeDelayed(@Nullable SerializablePairLongString value)
+ {
+ if (value == null) {
+ return StorableBuffer.EMPTY;
+ }
+
+ String rhsString = value.rhs;
+ byte[] rhsBytes = StringUtils.toUtf8WithNullToEmpty(rhsString);
+
+ return new StorableBuffer()
+ {
+ @Override
+ public void store(ByteBuffer byteBuffer)
+ {
+ Preconditions.checkNotNull(value.lhs, "Long in SerializablePairLongString must be non-null");
+
+ byteBuffer.putLong(value.lhs);
+ byteBuffer.putInt(rhsBytes.length);
+
+ if (rhsBytes.length > 0) {
+ byteBuffer.put(rhsBytes);
+ }
+ }
+
+ @Override
+ public int getSerializedSize()
+ {
+ return Long.BYTES + Integer.BYTES + rhsBytes.length;
+ }
+ };
+ }
+
+ @Nullable
+ @Override
+ public SerializablePairLongString deserialize(ByteBuffer byteBuffer)
+ {
+ if (byteBuffer.remaining() == 0) {
+ return null;
+ }
+
+ ByteBuffer readOnlyBuffer = byteBuffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder());
+ long lhs = readOnlyBuffer.getLong();
+ int stringSize = readOnlyBuffer.getInt();
+ String lastString = null;
+
+ if (stringSize > 0) {
+ byte[] stringBytes = new byte[stringSize];
+
+ readOnlyBuffer.get(stringBytes, 0, stringSize);
+ lastString = StringUtils.fromUtf8(stringBytes);
+ }
+
+ return new SerializablePairLongString(lhs, lastString);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java
new file mode 100644
index 0000000000..a6fee46b3c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializedStorage.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.segment.serde.cell.IOIterator;
+import org.apache.druid.segment.serde.cell.IntSerializer;
+import org.apache.druid.segment.serde.cell.StagedSerde;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import javax.annotation.Nullable;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.NoSuchElementException;
+
+/**
+ * simple utility class useful for when multiple passes of input are needed for encoding (e.g. delta or dictionary
+ * encoding).
+ * <p/>
+ * This allows objects to be serialized to some temporary storage and iterated over for final processing.
+ * <p/>
+ * @param <T>
+ */
+public class SerializedStorage<T>
+{
+ private final WriteOutBytes writeOutBytes;
+ private final StagedSerde<T> serde;
+ private final IntSerializer intSerializer = new IntSerializer();
+
+ public SerializedStorage(WriteOutBytes writeOutBytes, StagedSerde<T> serde)
+ {
+ this.writeOutBytes = writeOutBytes;
+ this.serde = serde;
+ }
+
+ public void store(@Nullable T value) throws IOException
+ {
+ byte[] bytes = serde.serialize(value);
+
+ writeOutBytes.write(intSerializer.serialize(bytes.length));
+ writeOutBytes.write(bytes);
+ }
+
+ public IOIterator<T> iterator() throws IOException
+ {
+ return new DeserializingIOIterator<>(writeOutBytes.asInputStream(), serde);
+ }
+
+ private static class DeserializingIOIterator<T> implements IOIterator<T>
+ {
+ private static final int NEEDS_READ = -2;
+ private static final int EOF = -1;
+
+ private final byte[] intBytes;
+ private final BufferedInputStream inputStream;
+ private final StagedSerde<T> serde;
+
+ private int nextSize;
+
+ public DeserializingIOIterator(InputStream inputStream, StagedSerde<T> serde)
+ {
+ this.inputStream = new BufferedInputStream(inputStream);
+ this.serde = serde;
+ intBytes = new byte[Integer.BYTES];
+ nextSize = NEEDS_READ;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException
+ {
+ return getNextSize() > EOF;
+ }
+
+ @Override
+ public T next() throws IOException
+ {
+ int currentNextSize = getNextSize();
+
+ if (currentNextSize == -1) {
+ throw new NoSuchElementException("end of buffer reached");
+ }
+
+ byte[] nextBytes = new byte[currentNextSize];
+ int bytesRead = 0;
+
+ while (bytesRead < currentNextSize) {
+ int result = inputStream.read(nextBytes, bytesRead, currentNextSize - bytesRead);
+
+ if (result == -1) {
+ throw new NoSuchElementException("unexpected end of buffer reached");
+ }
+
+ bytesRead += result;
+ }
+
+ Preconditions.checkState(bytesRead == currentNextSize);
+ T value = serde.deserialize(nextBytes);
+
+ nextSize = NEEDS_READ;
+
+ return value;
+ }
+
+ private int getNextSize() throws IOException
+ {
+ if (nextSize == NEEDS_READ) {
+ int bytesRead = 0;
+
+ while (bytesRead < Integer.BYTES) {
+ int result = inputStream.read(intBytes, bytesRead, Integer.BYTES - bytesRead);
+
+ if (result == -1) {
+ nextSize = EOF;
+ return EOF;
+ } else {
+ bytesRead += result;
+ }
+ }
+ Preconditions.checkState(bytesRead == Integer.BYTES);
+
+ nextSize = ByteBuffer.wrap(intBytes).order(ByteOrder.nativeOrder()).getInt();
+ }
+
+ return nextSize;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ inputStream.close();
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadBuffer.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadBuffer.java
new file mode 100644
index 0000000000..cf5309b6a3
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadBuffer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class BlockCompressedPayloadBuffer implements Closeable
+{
+ private final ByteBuffer currentBlock;
+ private final ByteBuffer compressedByteBuffer;
+ private final BlockIndexWriter blockIndexWriter;
+ private final WriteOutBytes dataOutBytes;
+ private final Closer closer;
+ private final CompressionStrategy.Compressor compressor;
+
+ private boolean open = true;
+
+ public BlockCompressedPayloadBuffer(
+ ByteBuffer currentBlock,
+ ByteBuffer compressedByteBuffer,
+ BlockIndexWriter blockIndexWriter,
+ WriteOutBytes dataOutBytes,
+ Closer closer,
+ CompressionStrategy.Compressor compressor
+ )
+ {
+ currentBlock.clear();
+ compressedByteBuffer.clear();
+ this.currentBlock = currentBlock;
+ this.compressedByteBuffer = compressedByteBuffer;
+ this.closer = closer;
+ this.blockIndexWriter = blockIndexWriter;
+ this.dataOutBytes = dataOutBytes;
+ this.compressor = compressor;
+ }
+
+ public void write(byte[] payload) throws IOException
+ {
+ Preconditions.checkNotNull(payload);
+ write(ByteBuffer.wrap(payload).order(ByteOrder.nativeOrder()));
+ }
+
+ public void write(ByteBuffer masterPayload) throws IOException
+ {
+ Preconditions.checkNotNull(masterPayload);
+ Preconditions.checkState(open, "cannot write to closed BlockCompressedPayloadWriter");
+ ByteBuffer payload = masterPayload.asReadOnlyBuffer().order(masterPayload.order());
+
+ while (payload.hasRemaining()) {
+ int writeSize = Math.min(payload.remaining(), currentBlock.remaining());
+
+ payload.limit(payload.position() + writeSize);
+ currentBlock.put(payload);
+
+ if (!currentBlock.hasRemaining()) {
+ flush();
+ }
+
+ payload.limit(masterPayload.limit());
+ }
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ closer.close();
+ }
+
+ public BlockCompressedPayloadSerializer closeToSerializer() throws IOException
+ {
+ if (open) {
+ if (currentBlock.position() > 0) {
+ flush();
+ }
+
+ blockIndexWriter.close();
+ closer.close();
+ open = false;
+ }
+
+ return new BlockCompressedPayloadSerializer(blockIndexWriter, dataOutBytes);
+ }
+
+ private void flush() throws IOException
+ {
+ Preconditions.checkState(open, "flush() on closed BlockCompressedPayloadWriter");
+ currentBlock.flip();
+
+ ByteBuffer actualCompressedByteBuffer = compressor.compress(currentBlock, compressedByteBuffer);
+ int compressedBlockSize = actualCompressedByteBuffer.limit();
+
+ blockIndexWriter.persistAndIncrement(compressedBlockSize);
+ dataOutBytes.write(actualCompressedByteBuffer);
+ currentBlock.clear();
+ compressedByteBuffer.clear();
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadBufferFactory.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadBufferFactory.java
new file mode 100644
index 0000000000..d4bd6a4b9d
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadBufferFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class BlockCompressedPayloadBufferFactory
+{
+ private final ByteBufferProvider byteBufferProvider;
+ private final SegmentWriteOutMedium writeOutMedium;
+ private final CompressionStrategy.Compressor compressor;
+
+ public BlockCompressedPayloadBufferFactory(
+ ByteBufferProvider byteBufferProvider,
+ SegmentWriteOutMedium writeOutMedium,
+ CompressionStrategy.Compressor compressor
+ )
+ {
+ this.byteBufferProvider = byteBufferProvider;
+ this.writeOutMedium = writeOutMedium;
+ this.compressor = compressor;
+ }
+
+ public BlockCompressedPayloadBuffer create() throws IOException
+ {
+ Closer closer = Closer.create();
+ ResourceHolder<ByteBuffer> currentBlockHolder = byteBufferProvider.get();
+
+ closer.register(currentBlockHolder);
+
+ ByteBuffer compressedBlockByteBuffer = compressor.allocateOutBuffer(currentBlockHolder.get().limit(), closer);
+
+ return new BlockCompressedPayloadBuffer(
+ currentBlockHolder.get(),
+ compressedBlockByteBuffer,
+ new BlockIndexWriter(writeOutMedium.makeWriteOutBytes()),
+ writeOutMedium.makeWriteOutBytes(),
+ closer,
+ compressor
+ );
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadReader.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadReader.java
new file mode 100644
index 0000000000..2c6263be58
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadReader.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.data.CompressionStrategy;
+
+import javax.annotation.Nonnull;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class BlockCompressedPayloadReader implements Closeable
+{
+ private static final ByteBuffer NULL_CELL = ByteBuffer.wrap(new byte[0]);
+ private final IntIndexView blockIndexView;
+ private final ByteBuffer compressedBlocksByteBuffer;
+ private final ByteBuffer uncompressedByteBuffer;
+ private final Closer closer;
+ private final int blockSize;
+ private final long maxValidUncompressedOffset;
+ private final CompressionStrategy.Decompressor decompressor;
+
+ private int currentUncompressedBlockNumber = -1;
+
+ private BlockCompressedPayloadReader(
+ IntIndexView blockIndexView,
+ ByteBuffer compressedBlocksByteBuffer,
+ ByteBuffer uncompressedByteBuffer,
+ CompressionStrategy.Decompressor decompressor,
+ Closer closer
+ )
+ {
+ this.blockIndexView = blockIndexView;
+ this.compressedBlocksByteBuffer = compressedBlocksByteBuffer;
+ this.uncompressedByteBuffer = uncompressedByteBuffer;
+ this.closer = closer;
+ uncompressedByteBuffer.clear();
+ blockSize = uncompressedByteBuffer.remaining();
+ maxValidUncompressedOffset = Integer.MAX_VALUE * (long) blockSize;
+ this.decompressor = decompressor;
+ }
+
+ /**
+ * @param originalByteBuffer - buffer as written byte {@link BlockCompressedPayloadWriter}. Not modified.
+ * @param byteBufferProvider - should be native ordered ByteBuffer
+ * @param decompressor - decompressor for block compression
+ * @return BlockCompressedPayloadReader
+ */
+ public static BlockCompressedPayloadReader create(
+ ByteBuffer originalByteBuffer,
+ ByteBufferProvider byteBufferProvider,
+ CompressionStrategy.Decompressor decompressor
+ )
+ {
+ ByteBuffer masterByteBuffer = originalByteBuffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder());
+
+ int blockIndexSize = masterByteBuffer.getInt();
+ ByteBuffer blockIndexBuffer = masterByteBuffer.asReadOnlyBuffer().order(masterByteBuffer.order());
+ blockIndexBuffer.limit(blockIndexBuffer.position() + blockIndexSize);
+
+ masterByteBuffer.position(masterByteBuffer.position() + blockIndexSize);
+
+ int dataStreamSize = masterByteBuffer.getInt();
+ ByteBuffer compressedBlockStreamByteBuffer = masterByteBuffer.asReadOnlyBuffer().order(masterByteBuffer.order());
+ compressedBlockStreamByteBuffer.limit(compressedBlockStreamByteBuffer.position() + dataStreamSize);
+
+ Closer closer = Closer.create();
+ ResourceHolder<ByteBuffer> byteBufferResourceHolder = byteBufferProvider.get();
+
+ closer.register(byteBufferResourceHolder);
+
+ return new BlockCompressedPayloadReader(
+ new IntIndexView(blockIndexBuffer),
+ compressedBlockStreamByteBuffer,
+ byteBufferResourceHolder.get(),
+ decompressor,
+ closer
+ );
+ }
+
+ public ByteBuffer read(long uncompressedStart, int size)
+ {
+ if (size == 0) {
+ return NULL_CELL;
+ }
+
+ Preconditions.checkArgument(uncompressedStart + size < maxValidUncompressedOffset);
+
+ int blockNumber = (int) (uncompressedStart / blockSize);
+ int blockOffset = (int) (uncompressedStart % blockSize);
+ ByteBuffer currentUncompressedBlock = getUncompressedBlock(blockNumber);
+
+ currentUncompressedBlock.position(blockOffset);
+
+ if (size <= currentUncompressedBlock.remaining()) {
+ ByteBuffer resultByteBuffer = currentUncompressedBlock.asReadOnlyBuffer().order(ByteOrder.nativeOrder());
+
+ resultByteBuffer.limit(blockOffset + size);
+
+ return resultByteBuffer;
+ } else {
+ byte[] payload = readMultiBlock(size, blockNumber, blockOffset);
+
+ return ByteBuffer.wrap(payload).order(ByteOrder.nativeOrder());
+ }
+ }
+
+ @Nonnull
+ private byte[] readMultiBlock(int size, int blockNumber, int blockOffset)
+ {
+ byte[] payload = new byte[size];
+ int bytesRead = 0;
+
+ do {
+ ByteBuffer currentUncompressedBlock = getUncompressedBlock(blockNumber);
+
+ currentUncompressedBlock.position(blockOffset);
+
+ int readSizeBytes = Math.min(size - bytesRead, currentUncompressedBlock.remaining());
+
+ currentUncompressedBlock.get(payload, bytesRead, readSizeBytes);
+ bytesRead += readSizeBytes;
+ blockNumber++;
+ blockOffset = 0;
+ } while (bytesRead < size);
+
+ return payload;
+ }
+
+ private ByteBuffer getUncompressedBlock(int blockNumber)
+ {
+ if (currentUncompressedBlockNumber != blockNumber) {
+ IntIndexView.EntrySpan span = blockIndexView.getEntrySpan(blockNumber);
+ ByteBuffer compressedBlock = compressedBlocksByteBuffer.asReadOnlyBuffer()
+ .order(compressedBlocksByteBuffer.order());
+ compressedBlock.position(compressedBlock.position() + span.getStart());
+ compressedBlock.limit(compressedBlock.position() + span.getSize());
+ uncompressedByteBuffer.clear();
+
+ decompressor.decompress(compressedBlock, span.getSize(), uncompressedByteBuffer);
+ currentUncompressedBlockNumber = blockNumber;
+ }
+
+ return uncompressedByteBuffer;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ closer.close();
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadSerializer.java
new file mode 100644
index 0000000000..9b3f4c4d30
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadSerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.serde.Serializer;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+public class BlockCompressedPayloadSerializer implements Serializer
+{
+ private final IntSerializer intSerializer = new IntSerializer();
+ private final BlockIndexWriter blockIndexWriter;
+ private final WriteOutBytes dataOutBytes;
+
+ public BlockCompressedPayloadSerializer(BlockIndexWriter blockIndexWriter, WriteOutBytes dataOutBytes)
+ {
+ this.blockIndexWriter = blockIndexWriter;
+ this.dataOutBytes = dataOutBytes;
+ }
+
+ @Override
+ public void writeTo(WritableByteChannel channel, @Nullable FileSmoosher smoosher) throws IOException
+ {
+ blockIndexWriter.transferTo(channel);
+ channel.write(intSerializer.serialize(dataOutBytes.size()));
+ dataOutBytes.writeTo(channel);
+ }
+
+ @Override
+ public long getSerializedSize()
+ {
+ return blockIndexWriter.getSerializedSize()
+ + intSerializer.getSerializedSize()
+ + Ints.checkedCast(dataOutBytes.size());
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriter.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriter.java
new file mode 100644
index 0000000000..794272fb24
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.serde.Serializer;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+public class BlockCompressedPayloadWriter implements Serializer, Closeable
+{
+ private final BlockCompressedPayloadBuffer buffer;
+ private BlockCompressedPayloadSerializer serializer;
+ private State state = State.OPEN;
+
+ private BlockCompressedPayloadWriter(BlockCompressedPayloadBuffer buffer)
+ {
+ this.buffer = buffer;
+ }
+
+ public void write(byte[] payload) throws IOException
+ {
+ Preconditions.checkState(state == State.OPEN);
+ buffer.write(payload);
+ }
+
+ public void write(ByteBuffer payload) throws IOException
+ {
+ Preconditions.checkState(state == State.OPEN);
+ buffer.write(payload);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if (state == State.OPEN) {
+ serializer = buffer.closeToSerializer();
+ state = State.CLOSED;
+ }
+ }
+
+ @Override
+ public void writeTo(WritableByteChannel channel, @Nullable FileSmoosher smoosher) throws IOException
+ {
+ Preconditions.checkState(state == State.CLOSED);
+ serializer.writeTo(channel, smoosher);
+ }
+
+ @Override
+ public long getSerializedSize()
+ {
+ Preconditions.checkState(state == State.CLOSED);
+ return serializer.getSerializedSize();
+ }
+
+ private enum State
+ {
+ OPEN,
+ CLOSED
+ }
+
+ public static class Builder
+ {
+ private ByteBufferProvider byteBufferProvider = NativeClearedByteBufferProvider.INSTANCE;
+ private final SegmentWriteOutMedium writeOutMedium;
+
+ private CompressionStrategy compressionStrategy = CompressionStrategy.LZ4;
+
+ public Builder(SegmentWriteOutMedium writeOutMedium)
+ {
+ this.writeOutMedium = writeOutMedium;
+ }
+
+ public Builder setCompressionStrategy(CompressionStrategy compressionStrategy)
+ {
+ this.compressionStrategy = compressionStrategy;
+
+ return this;
+ }
+
+ public Builder setByteBufferProvider(ByteBufferProvider byteBufferProvider)
+ {
+ this.byteBufferProvider = byteBufferProvider;
+
+ return this;
+ }
+
+ public BlockCompressedPayloadWriter build() throws IOException
+ {
+ BlockCompressedPayloadBufferFactory bufferFactory = new BlockCompressedPayloadBufferFactory(
+ byteBufferProvider,
+ writeOutMedium,
+ compressionStrategy.getCompressor()
+ );
+ BlockCompressedPayloadBuffer payloadBuffer = bufferFactory.create();
+ BlockCompressedPayloadWriter payloadWriter = new BlockCompressedPayloadWriter(payloadBuffer);
+
+ return payloadWriter;
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockIndexWriter.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockIndexWriter.java
new file mode 100644
index 0000000000..986628690c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/BlockIndexWriter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+public class BlockIndexWriter extends IndexWriter
+{
+ public BlockIndexWriter(WriteOutBytes outBytes)
+ {
+ super(outBytes, new IntSerializer());
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/ByteBufferProvider.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/ByteBufferProvider.java
new file mode 100644
index 0000000000..170e046ec8
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/ByteBufferProvider.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.collections.ResourceHolder;
+
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+
+public interface ByteBufferProvider extends Supplier<ResourceHolder<ByteBuffer>>
+{
+ /**
+ * @return a resource holder of a ByteBuffer
+ */
+ @Override
+ ResourceHolder<ByteBuffer> get();
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/CellIndexReader.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellIndexReader.java
new file mode 100644
index 0000000000..7346dab24f
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellIndexReader.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.primitives.Ints;
+
+import javax.annotation.Nonnull;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class CellIndexReader implements Closeable
+{
+ private final BlockCompressedPayloadReader payloadReader;
+
+ public CellIndexReader(BlockCompressedPayloadReader payloadReader)
+ {
+ this.payloadReader = payloadReader;
+ }
+
+ @Nonnull
+ public PayloadEntrySpan getEntrySpan(int entryNumber)
+ {
+ int position = entryNumber * Long.BYTES;
+ ByteBuffer payload = payloadReader.read(position, 2 * Long.BYTES);
+ long payloadValue = payload.getLong();
+ long nextPayloadValue = payload.getLong();
+
+ return new PayloadEntrySpan(payloadValue, Ints.checkedCast(nextPayloadValue - payloadValue));
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ payloadReader.close();
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/CellIndexWriter.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellIndexWriter.java
new file mode 100644
index 0000000000..1f311f56de
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellIndexWriter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.serde.Serializer;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+
+public class CellIndexWriter implements Serializer, Closeable
+{
+ private final LongSerializer longSerializer = new LongSerializer();
+ private final BlockCompressedPayloadWriter payloadWriter;
+
+ private long position = 0;
+ private boolean open = true;
+
+ public CellIndexWriter(BlockCompressedPayloadWriter payloadWriter)
+ {
+ this.payloadWriter = payloadWriter;
+ }
+
+ public void persistAndIncrement(int increment) throws IOException
+ {
+ Preconditions.checkArgument(increment >= 0);
+ Preconditions.checkState(open, "cannot write to closed CellIndex");
+ payloadWriter.write(longSerializer.serialize(position));
+ position += increment;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if (open) {
+ payloadWriter.write(longSerializer.serialize(position));
+ payloadWriter.close();
+ open = false;
+ }
+ }
+
+ @Override
+ public void writeTo(WritableByteChannel channel, @Nullable FileSmoosher smoosher) throws IOException
+ {
+ Preconditions.checkState(!open, "cannot transfer a CellIndex that is not closed and finalized");
+
+ payloadWriter.writeTo(channel, smoosher);
+ }
+
+ @Override
+ public long getSerializedSize()
+ {
+ return payloadWriter.getSerializedSize();
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/CellReader.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellReader.java
new file mode 100644
index 0000000000..678920e6d2
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellReader.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.segment.data.CompressionStrategy;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * CellReader is intended to read the data written byte {@link CellWriter}. The {@code CellWriter.writeTo()} method's
+ * output must be made available as a ByteBuffer. While this provides relatively efficient random access, it is
+ * optimized for sequential access by caching the last decompressed block in both the index (which is
+ * block-compressed) and the data.
+ * <p/>
+ * A random access incurs the following costs:
+ * <pre>
+ * 1. seek to compressed block location in index
+ * 2. decompress index block
+ * 3. read data location
+ * 4. decompress data block
+ * 5. wrap or copy data from uncompressed block (copy for data that spans more than one block)<br>
+ * </pre>
+ * Sequential access amortizes the decompression cost by storing the last decompressed block (cache of size 1,
+ * effectively).
+ * <p/>
+ * Note also that the index itself is compressed, so random accesses potentially incur an additional decompression
+ * step for large datasets.
+ * <p/>
+ * <pre>{@code
+ * ByteBuffer byteBuffer = ....; // ByteBuffer created from writableChannel output of CellWriter.writeTo()
+ * try (CellRead cellReader = new CellReader.Builder(byteBuffer).build()) {
+ * for (int i = 0; i < numPayloads; i++) {
+ * byte[] payload = cellReader.getCell(i);
+ *
+ * processPayload(payload); // may deserialize and peform work
+ * }
+ * }
+ * </pre>
+ *
+ * While you may allocate your own 64k buffers, it is recommended you use {@code NativeClearedByteBufferProvider}
+ * which provides direct 64k ByteBuffers from a pool, wrapped in a ResourceHolder. These objects may be
+ * registered in a Closer
+ *
+ * To enhance future random accesss, a decompressed block cache may be added of some size k (=10, etc)
+ * At present, we effecitively have a block cache of size 1
+ */
+public class CellReader implements Closeable
+{
+ private final CellIndexReader cellIndexReader;
+ private final BlockCompressedPayloadReader dataReader;
+ private final Closer closer;
+
+ private CellReader(CellIndexReader cellIndexReader, BlockCompressedPayloadReader dataReader, Closer closer)
+ {
+ this.cellIndexReader = cellIndexReader;
+ this.dataReader = dataReader;
+ this.closer = closer;
+ }
+
+ public ByteBuffer getCell(int rowNumber)
+ {
+ PayloadEntrySpan payloadEntrySpan = cellIndexReader.getEntrySpan(rowNumber);
+ ByteBuffer payload = dataReader.read(payloadEntrySpan.getStart(), payloadEntrySpan.getSize());
+
+ return payload;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ closer.close();
+ }
+
+ public static Builder builder(ByteBuffer originalByteBuffer)
+ {
+ return new Builder(originalByteBuffer);
+ }
+
+ public static class Builder
+ {
+ private final ByteBuffer cellIndexBuffer;
+ private final ByteBuffer dataStorageBuffer;
+
+ private CompressionStrategy compressionStrategy = CompressionStrategy.LZ4;
+ private ByteBufferProvider byteBufferProvider = NativeClearedByteBufferProvider.INSTANCE;
+
+ /**
+ * The default block size is 64k as provided by NativeClearedByteBufferProvider. You may change this, but
+ * be sure the size of the ByteBuffers match what the CelLWriter used as this is the block size stored. All
+ * reading will fail unpredictably if a different block size is used when reading
+ *
+ * @param originalByteBuffer - buffer from {@code CellWriter.writeTo()} as written to the WritableChannel
+ */
+ public Builder(ByteBuffer originalByteBuffer)
+ {
+ ByteBuffer masterByteBuffer = originalByteBuffer.asReadOnlyBuffer().order(ByteOrder.nativeOrder());
+
+ int cellIndexSize = masterByteBuffer.getInt();
+ cellIndexBuffer = masterByteBuffer.asReadOnlyBuffer().order(masterByteBuffer.order());
+ cellIndexBuffer.limit(cellIndexBuffer.position() + cellIndexSize);
+
+ masterByteBuffer.position(masterByteBuffer.position() + cellIndexSize);
+
+ int dataStorageSize = masterByteBuffer.getInt();
+ dataStorageBuffer = masterByteBuffer.asReadOnlyBuffer().order(masterByteBuffer.order());
+ dataStorageBuffer.limit(dataStorageBuffer.position() + dataStorageSize);
+ }
+
+ /**
+ *
+ * @param compressionStrategy - this must match the CellWriter compressionStrategy. Default LZ4
+ * @return
+ */
+ public Builder setCompressionStrategy(CompressionStrategy compressionStrategy)
+ {
+ this.compressionStrategy = compressionStrategy;
+
+ return this;
+ }
+
+ /**
+ *
+ * @param byteBufferProvider - ByteBuffers returned must match the size used in the CellWriter
+ * @return Builder
+ */
+ public Builder setByteBufferProvider(ByteBufferProvider byteBufferProvider)
+ {
+ this.byteBufferProvider = byteBufferProvider;
+
+ return this;
+ }
+
+ public CellReader build()
+ {
+ Closer closer = Closer.create();
+ CellIndexReader cellIndexReader = new CellIndexReader(BlockCompressedPayloadReader.create(
+ cellIndexBuffer,
+ byteBufferProvider,
+ compressionStrategy.getDecompressor()
+ ));
+ BlockCompressedPayloadReader dataReader = BlockCompressedPayloadReader.create(
+ dataStorageBuffer,
+ byteBufferProvider,
+ compressionStrategy.getDecompressor()
+ );
+
+ closer.register(cellIndexReader);
+ closer.register(dataReader);
+
+ CellReader cellReader = new CellReader(cellIndexReader, dataReader, closer);
+
+ return cellReader;
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/CellWriter.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellWriter.java
new file mode 100644
index 0000000000..2c732d806c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/CellWriter.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.serde.Serializer;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * <h2>usage:</h2>
+ * <p/>
+ * CellReader effectively stores a list of byte[] payloads that are retrievable randomly by index. The entirety of
+ * the data is block compressed. For reading, see {@link CellReader}. Example usage:
+ *<p/>
+ * <pre>{@code
+ *
+ * StagedSerde<Fuu> fuuSerDe = new ...
+ * // note that cellWriter.close() *must* be called before writeTo() in order to finalize the index
+ * try (CellWriter cellWriter = new CellWriter.Builder(segmentWriteOutMedium).build()) {
+ *
+ * fuuList.stream().map(fuuSerDe:serialize).forEach(cellWriter::write);
+ * }
+ * // at this point cellWriter contains the index and compressed data
+ *
+ *
+ * // transfers the index and compressed data in the format specified below. This method is idempotent and copies
+ * // the data each time.
+ * cellWriter.writeTo(writableChannel, fileSmoosher); // 2nd argument currently unused, may be null
+ *
+ * } </pre>
+ * <p/>
+ * Note that for use with CellReader, the contents written to the writableChannel must be available as a ByteBuffer
+ * <p/>
+ * <h2>Internal Storage Details</h2>
+ * <p/>
+ * <pre>
+ * serialized data is of the form:
+ *
+ * [cell index]
+ * [payload storage]
+ *
+ * each of these items is stored in compressed streams of blocks with a block index.
+ *
+ * A BlockCompressedPayloadWriter stores byte[] payloads. These may be accessed by creating a
+ * BlockCompressedPayloadReader over the produced ByteBuffer. Reads may be done by giving a location in the
+ * uncompressed stream and a size
+ *
+ * NOTE: {@link BlockCompressedPayloadBuffer} does not store nulls on write(). However, the cellIndex stores an entry
+ * with a size of 0 for nulls and {@link CellReader} will return null for any null written
+ *
+ * [blockIndexSize:int]
+ * |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ * | block index
+ * | compressed block # -> block start in compressed stream position (relative to data start)
+ * |
+ * | 0: [block position: int]
+ * | 1: [block position: int]
+ * | ...
+ * | i: [block position: int]
+ * | ...
+ * | n: [block position: int]
+ * | n+1: [total compressed size ] // stored to simplify invariant of n+1 - n = length(n)
+ * |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ * [dataSize:int]
+ * |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ * | [compressed payload block 1]
+ * | [compressed payload block 2]
+ * | ...
+ * | [compressed paylod block n]
+ * |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
+ * the CellIndexWriter stores an array of longs using the BlockCompressedPayloadWriter
+ *
+ * logically this an array of longs
+ *
+ * | 0: start_0 : long
+ * | 1: start_1 : long
+ * | ...
+ * | n: start_n : long
+ * | n+1: start_n + length_n : long //ie, next position that would have been written to
+ * | //used again for invariant of length_i = row_i+1 - row_i
+ * |
+ * | but this will be stored as block compressed. Reads are done by addressing it as a long array of bytes
+ * |
+ * | [block index size]
+ * | [block index>
+ * |
+ * | [data stream size]
+ * | [block compressed payload stream]
+ *
+ * resulting in
+ *
+ * | [cell index size]
+ * | ----cell index------------------------
+ * | [block index size]
+ * | [block index]
+ * | [data stream size]
+ * | [block compressed payload stream]
+ * | -------------------------------------
+ * | [data stream size]
+ * | ----data stream------------------------
+ * | [block index size]
+ * | [block index]
+ * | [data stream size]
+ * | [block compressed payload stream]
+ * | -------------------------------------
+ * </pre>
+ */
+
+public class CellWriter implements Serializer, Closeable
+{
+ private final IntSerializer intSerializer = new IntSerializer();
+ private final CellIndexWriter cellIndexWriter;
+ private final BlockCompressedPayloadWriter payloadWriter;
+
+ private CellWriter(CellIndexWriter cellIndexWriter, BlockCompressedPayloadWriter payloadWriter)
+ {
+ this.cellIndexWriter = cellIndexWriter;
+ this.payloadWriter = payloadWriter;
+ }
+
+ public void write(byte[] cellBytes) throws IOException
+ {
+ if (cellBytes == null) {
+ cellIndexWriter.persistAndIncrement(0);
+ } else {
+ cellIndexWriter.persistAndIncrement(cellBytes.length);
+ payloadWriter.write(cellBytes);
+ }
+ }
+
+ public void write(ByteBuffer cellByteBuffer) throws IOException
+ {
+ if (cellByteBuffer == null) {
+ cellIndexWriter.persistAndIncrement(0);
+ } else {
+ cellIndexWriter.persistAndIncrement(cellByteBuffer.remaining());
+ payloadWriter.write(cellByteBuffer);
+ }
+ }
+
+ @Override
+ public void writeTo(WritableByteChannel channel, @Nullable FileSmoosher smoosher) throws IOException
+ {
+ channel.write(intSerializer.serialize(cellIndexWriter.getSerializedSize()));
+ cellIndexWriter.writeTo(channel, smoosher);
+ channel.write(intSerializer.serialize(payloadWriter.getSerializedSize()));
+ payloadWriter.writeTo(channel, smoosher);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ cellIndexWriter.close();
+ payloadWriter.close();
+ }
+
+ @Override
+ public long getSerializedSize()
+ {
+ return intSerializer.getSerializedSize()
+ + cellIndexWriter.getSerializedSize()
+ + intSerializer.getSerializedSize()
+ + payloadWriter.getSerializedSize();
+ }
+
+ public static Builder builder(SegmentWriteOutMedium segmentWriteOutMedium)
+ {
+ return new Builder(segmentWriteOutMedium);
+ }
+
+ public static class Builder
+ {
+ private final BlockCompressedPayloadWriter.Builder blockCompressedPayloadWriterBuilder;
+
+ /**
+ * Default instance with a {@link NativeClearedByteBufferProvider}
+ *
+ * @param segmentWriteOutMedium - used store block-compressed index and data
+ */
+ public Builder(SegmentWriteOutMedium segmentWriteOutMedium)
+ {
+ blockCompressedPayloadWriterBuilder =
+ new BlockCompressedPayloadWriter.Builder(segmentWriteOutMedium);
+ }
+
+ /**
+ * change the compression strategy. The default is LZ4
+ *
+ * @param compressionStrategy - a valid {@link CompressionStrategy}
+ * @return
+ */
+ public Builder setCompressionStrategy(CompressionStrategy compressionStrategy)
+ {
+ blockCompressedPayloadWriterBuilder.setCompressionStrategy(compressionStrategy);
+
+ return this;
+ }
+
+ public Builder setByteBufferProvider(ByteBufferProvider byteBufferProvider)
+ {
+ blockCompressedPayloadWriterBuilder.setByteBufferProvider(byteBufferProvider);
+
+ return this;
+ }
+
+ public CellWriter build() throws IOException
+ {
+ BlockCompressedPayloadWriter cellIndexPayloadWriter = blockCompressedPayloadWriterBuilder.build();
+ BlockCompressedPayloadWriter payloadWriter = blockCompressedPayloadWriterBuilder.build();
+ CellIndexWriter cellIndexWriter = new CellIndexWriter(cellIndexPayloadWriter);
+
+ return new CellWriter(cellIndexWriter, payloadWriter);
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java
new file mode 100644
index 0000000000..887f1fb65a
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/IOIterator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface IOIterator<T> extends Closeable
+{
+ boolean hasNext() throws IOException;
+
+ T next() throws IOException;
+
+ @Override
+ void close() throws IOException;
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/IndexWriter.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/IndexWriter.java
new file mode 100644
index 0000000000..46f26ad988
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/IndexWriter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+public class IndexWriter
+{
+ private final WriteOutBytes outBytes;
+ private final NumberSerializer positionSerializer;
+ private final NumberSerializer indexSizeSerializer;
+
+ private boolean open = true;
+ private long position = 0;
+
+ public IndexWriter(
+ WriteOutBytes outBytes,
+ NumberSerializer positionSerializer,
+ NumberSerializer indexSizeSerializer
+ )
+ {
+ this.outBytes = outBytes;
+ this.positionSerializer = positionSerializer;
+ this.indexSizeSerializer = indexSizeSerializer;
+ }
+
+ public IndexWriter(WriteOutBytes outBytes, NumberSerializer positionSerializer)
+ {
+ this(outBytes, positionSerializer, new IntSerializer());
+ }
+
+ public void persistAndIncrement(int increment) throws IOException
+ {
+ Preconditions.checkArgument(increment >= 0, "increment must be non-negative");
+ Preconditions.checkState(open, "peristAndIncrement() must be called when open");
+ outBytes.write(positionSerializer.serialize(position));
+ position += increment;
+ }
+
+ public void close() throws IOException
+ {
+ // when done, write an n+1'th entry for the next unused block; this lets us the use invariant
+ // of length of block i = entry i+1 - entry i for all i < n
+ outBytes.write(positionSerializer.serialize(position));
+ open = false;
+ }
+
+ public void transferTo(WritableByteChannel channel) throws IOException
+ {
+ channel.write(indexSizeSerializer.serialize(outBytes.size()));
+ outBytes.writeTo(channel);
+ }
+
+ public long getSerializedSize()
+ {
+ return indexSizeSerializer.getSerializedSize() + Ints.checkedCast(outBytes.size());
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/IntIndexView.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/IntIndexView.java
new file mode 100644
index 0000000000..fd40cd22d6
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/IntIndexView.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.base.Preconditions;
+
+import java.nio.ByteBuffer;
+
+public class IntIndexView
+{
+ private final ByteBuffer byteBuffer;
+ private final int numberOfEntries;
+
+ public IntIndexView(ByteBuffer byteBuffer)
+ {
+ this.byteBuffer = byteBuffer;
+ numberOfEntries = byteBuffer.remaining() / Integer.BYTES;
+ }
+
+ public EntrySpan getEntrySpan(int entryNumber)
+ {
+ Preconditions.checkArgument(
+ entryNumber < numberOfEntries, "invalid entry number %s [%s]", entryNumber, numberOfEntries
+ );
+ int start = byteBuffer.getInt(byteBuffer.position() + entryNumber * Integer.BYTES);
+ int nextStart = byteBuffer.getInt(byteBuffer.position() + ((entryNumber + 1) * Integer.BYTES));
+
+ return new EntrySpan(start, nextStart - start);
+ }
+
+ public static class EntrySpan
+ {
+ private final int start;
+ private final int size;
+
+ public EntrySpan(int start, int size)
+ {
+ this.start = start;
+ this.size = size;
+ }
+
+ public int getStart()
+ {
+ return start;
+ }
+
+ public int getSize()
+ {
+ return size;
+ }
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/IntSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/IntSerializer.java
new file mode 100644
index 0000000000..20e893500c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/IntSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.primitives.Ints;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class IntSerializer implements NumberSerializer
+{
+ private final ByteBuffer intValueByteBuffer = ByteBuffer.allocate(Integer.BYTES).order(ByteOrder.nativeOrder());
+
+ @Override
+ public ByteBuffer serialize(long value)
+ {
+ intValueByteBuffer.clear();
+ intValueByteBuffer.putInt(Ints.checkedCast(value)).flip();
+
+ return intValueByteBuffer;
+ }
+
+ @Override
+ public int getSerializedSize()
+ {
+ return Integer.BYTES;
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/LongSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/LongSerializer.java
new file mode 100644
index 0000000000..cfcaad5b8d
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/LongSerializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class LongSerializer implements NumberSerializer
+{
+ private final ByteBuffer longValueByteBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.nativeOrder());
+
+ @Override
+ public ByteBuffer serialize(long value)
+ {
+ longValueByteBuffer.clear();
+ longValueByteBuffer.putLong(value).flip();
+
+ return longValueByteBuffer;
+ }
+
+ @Override
+ public int getSerializedSize()
+ {
+ return Long.BYTES;
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/NativeClearedByteBufferProvider.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/NativeClearedByteBufferProvider.java
new file mode 100644
index 0000000000..8ee4bc33d1
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/NativeClearedByteBufferProvider.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.segment.CompressedPools;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * supplies direct, 64k, clear()'d ByteBuffers wrapped in a ResourceHolder. Caller is responsible for calling
+ * close() on the {@link ResourceHolder} in order to return it to the pool
+ */
+public class NativeClearedByteBufferProvider implements ByteBufferProvider
+{
+ public static final NativeClearedByteBufferProvider INSTANCE = new NativeClearedByteBufferProvider();
+
+ @Override
+ public ResourceHolder<ByteBuffer> get()
+ {
+ ResourceHolder<ByteBuffer> byteBufferResourceHolder = CompressedPools.getByteBuf(ByteOrder.nativeOrder());
+
+ byteBufferResourceHolder.get().clear();
+
+ return byteBufferResourceHolder;
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/NumberSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/NumberSerializer.java
new file mode 100644
index 0000000000..b4deafe548
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/NumberSerializer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import java.nio.ByteBuffer;
+
+public interface NumberSerializer
+{
+ ByteBuffer serialize(long value);
+
+ int getSerializedSize();
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/PayloadEntrySpan.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/PayloadEntrySpan.java
new file mode 100644
index 0000000000..86ff673af5
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/PayloadEntrySpan.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+public class PayloadEntrySpan
+{
+ private final long start;
+ private final int size;
+
+ public PayloadEntrySpan(long start, int size)
+ {
+ this.start = start;
+ this.size = size;
+ }
+
+ public long getStart()
+ {
+ return start;
+ }
+
+ public int getSize()
+ {
+ return size;
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java
new file mode 100644
index 0000000000..ffbf00a9c4
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/StagedSerde.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * {@code StagedSerde} is useful when you have objects that have their own internal logic to serialize, but you wish to
+ * compose the results of multiple serialized objects into a single ByteBuffer (or wrapped {@code byte[]}). Serializers
+ * can implement {@code serializeDelayed} and return a {@code StorableBuffer}. This object allows the serialization to
+ * be broken up so that serializers do whatever work is necessary to report how many bytes are needed. The caller can
+ * then allocate a large enough byte[], wrap it in a ByteBuffer, and use the {@code StorableBuffer.store()} method.
+ * <p/>
+ * This results in superior efficiency over a {@code byte[] toBytes()} method because repeated copies of byte[] are
+ * avoided.
+ * <p/>
+ * Since any serialization that returns a byte[] must reach a point in its serialization that it allocates
+ * said byte[], that code may be executed to create the {@code StorableBuffer}. What code would have written to
+ * a byte[] then makes calls to a ByteBuffer in the store() method.
+ * <p/>
+ * For the cases when it is not easy to break apart the serialization code, increased efficiency may be obtained by
+ * overriding serialize() and directly returning bytes
+ *
+ * <p/>
+ * example
+ * <pre>{@code
+ * StagedSerde<Fuu> fuuSerde = new ...;
+ * StagedSerde<Bar> barerde = new ...;
+ * StorableBuffer fuuBuffer = fuuSerde.serializeDelayed(fuuInstance);
+ * StorableBuffer barBuffer = barSerde.serializeDelayed(barInstance);
+ * int size = fuuBuffer.getSerializedSize() + barBuffer.getSerializedSize();
+ * byte[] bytes = new byte[size];
+ * ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+ *
+ * fuuBuffer.store(buffer);
+ * barBuffer.store(buffer);
+ *
+ * }</pre>
+ * <p/>
+ * Note that for a common case in which you want a byte[] for a single object, a default implementation is provided
+ * that does the above code for a single object.
+ * <p/>
+ *
+ * @param <T>
+ */
+public interface StagedSerde<T>
+{
+ /**
+ * Useful method when some computation is necessary to prepare for serialization without actually writing out
+ * all the bytes in order to determine the serialized size. It allows encapsulation of the size computation and
+ * the final logical to actually store into a ByteBuffer. It also allows for callers to pack multiple serialized
+ * objects into a single ByteBuffer without extra copies of a byte[]/ByteBuffer by using the {@link StorableBuffer}
+ * instance returned
+ *
+ * @param value - object to serialize
+ * @return an object that reports its serialized size and how to serialize the object to a ByteBuffer
+ */
+ StorableBuffer serializeDelayed(@Nullable T value);
+
+ /**
+ * Default implementation for when a byte[] is desired. Typically, this default should suffice. Implementing
+ * serializeDelayed() includes the logic of how to store into a ByteBuffer
+ *
+ * @param value - object to serialize
+ * @return serialized byte[] of value
+ */
+ default byte[] serialize(T value)
+ {
+ StorableBuffer storableBuffer = serializeDelayed(value);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(storableBuffer.getSerializedSize()).order(ByteOrder.nativeOrder());
+
+ storableBuffer.store(byteBuffer);
+
+ return byteBuffer.array();
+ }
+
+ @Nullable
+ T deserialize(ByteBuffer byteBuffer);
+
+ default T deserialize(byte[] bytes)
+ {
+ return deserialize(ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder()));
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/serde/cell/StorableBuffer.java b/processing/src/main/java/org/apache/druid/segment/serde/cell/StorableBuffer.java
new file mode 100644
index 0000000000..f228a81904
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/serde/cell/StorableBuffer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import java.nio.ByteBuffer;
+
+/**
+ * It's useful to return this object when multiple serializable objects are to be composed into a single ByteBuffer
+ * or byte[]. This allows serializable objects to report their size so that callers may get the total size of all
+ * objects and allocate a sufficiently large ByteBuffer/byte[]. Then the store() methods may be used to serialize.
+ * <p/>
+ * This avoids extra copies and wasted memory/gc pressure in the case of just returning a byte[].
+ * <p/>
+ * The getSerializedSize() method is provided because an object may need to use private data in order to calculate
+ * the size needed.
+ *
+ **/
+public interface StorableBuffer
+{
+ StorableBuffer EMPTY = new StorableBuffer()
+ {
+ @Override
+ public void store(ByteBuffer byteBuffer)
+ {
+ }
+
+ @Override
+ public int getSerializedSize()
+ {
+ return 0;
+ }
+ };
+
+ void store(ByteBuffer byteBuffer);
+
+ int getSerializedSize();
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStoreTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStoreTest.java
new file mode 100644
index 0000000000..8075e273c2
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringBufferStoreTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.primitives.Ints;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.column.ColumnBuilder;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.serde.cell.IOIterator;
+import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
+import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+
+public class SerializablePairLongStringBufferStoreTest
+{
+ private final Random random = new Random(0);
+ private static final int MIN_INTEGER = 100;
+ private static final long MIN_LONG = 0L;
+ private final SerializablePairLongString[] integerRangeArr = new SerializablePairLongString[]{
+ new SerializablePairLongString((long) MIN_INTEGER, "fuu"),
+ new SerializablePairLongString(101L, "bar"),
+ new SerializablePairLongString(102L, "baz"),
+ };
+ private final SerializablePairLongString[] longRangeArr = new SerializablePairLongString[]{
+ new SerializablePairLongString(MIN_LONG, "fuu"),
+ new SerializablePairLongString(100L, "bar"),
+ new SerializablePairLongString((long) Integer.MAX_VALUE, "baz"),
+ new SerializablePairLongString(Long.MAX_VALUE, "fuubarbaz"),
+ };
+
+ private final SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium();
+
+ private SerializablePairLongStringBufferStore bufferStore;
+
+ @Before
+ public void setup() throws Exception
+ {
+ bufferStore = new SerializablePairLongStringBufferStore(
+ new SerializedStorage<>(
+ writeOutMedium.makeWriteOutBytes(),
+ SerializablePairLongStringColumnSerializer.STAGED_SERDE
+ ));
+ }
+
+ @Test
+ public void testIteratorSimple() throws Exception
+ {
+ for (SerializablePairLongString value : integerRangeArr) {
+ bufferStore.store(value);
+ }
+
+ IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+
+ int i = 0;
+ while (iterator.hasNext()) {
+ Assert.assertEquals(integerRangeArr[i], iterator.next());
+ i++;
+ }
+ }
+
+ @Test
+ public void testIteratorEmptyBuffer() throws Exception
+ {
+ IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testIteratorNull() throws Exception
+ {
+ bufferStore.store(null);
+ IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+ Assert.assertTrue(iterator.hasNext());
+ Assert.assertNull(iterator.next());
+ }
+
+ @Test
+ public void testIteratorIdempotentHasNext() throws Exception
+ {
+ bufferStore.store(integerRangeArr[0]);
+
+ IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+
+ Assert.assertTrue(iterator.hasNext());
+ // expect hasNext() to not modify state
+ Assert.assertTrue(iterator.hasNext());
+ }
+
+ @Test(expected = NoSuchElementException.class)
+ public void testIteratorEmptyThrows() throws Exception
+ {
+ IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+ iterator.next();
+ }
+
+ @Test
+ public void testIteratorEmptyHasNext() throws Exception
+ {
+ IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testMinValueUsesInteger() throws Exception
+ {
+ for (SerializablePairLongString value : integerRangeArr) {
+ bufferStore.store(value);
+ }
+
+ SerializablePairLongStringColumnHeader columnHeader = bufferStore.createColumnHeader();
+ Assert.assertEquals(integerRangeArr[0].lhs.longValue(), columnHeader.getMinValue());
+ Assert.assertTrue(columnHeader.isUseIntegerDeltas());
+ }
+
+ @Test
+ public void testMinValueUsesLong() throws Exception
+ {
+ for (SerializablePairLongString value : longRangeArr) {
+ bufferStore.store(value);
+ }
+
+ SerializablePairLongStringColumnHeader columnHeader = bufferStore.createColumnHeader();
+ Assert.assertEquals(MIN_LONG, columnHeader.getMinValue());
+ Assert.assertFalse(columnHeader.isUseIntegerDeltas());
+ }
+
+ @Test
+ public void testMinValueUsesIntegerSerialization() throws Exception
+ {
+ for (SerializablePairLongString value : integerRangeArr) {
+ bufferStore.store(value);
+ }
+
+ SerializablePairLongStringColumnHeader columnHeader = bufferStore.createColumnHeader();
+
+ HeapByteBufferWriteOutBytes channel = new HeapByteBufferWriteOutBytes();
+ try (ResourceHolder<ByteBuffer> resourceHolder = NativeClearedByteBufferProvider.INSTANCE.get()) {
+ columnHeader.transferTo(channel);
+
+ ByteBuffer byteBuffer = resourceHolder.get();
+ channel.writeTo(byteBuffer);
+ byteBuffer.flip();
+
+ SerializablePairLongStringColumnHeader deserializedColumnhHeader =
+ SerializablePairLongStringColumnHeader.fromBuffer(byteBuffer);
+ Assert.assertEquals(MIN_INTEGER, deserializedColumnhHeader.getMinValue());
+ Assert.assertTrue(deserializedColumnhHeader.isUseIntegerDeltas());
+ }
+ }
+
+ @Test
+ public void testMinValueSerialization() throws Exception
+
+ {
+ for (SerializablePairLongString value : longRangeArr) {
+ bufferStore.store(value);
+ }
+
+ SerializablePairLongStringColumnHeader columnHeader = bufferStore.createColumnHeader();
+
+ HeapByteBufferWriteOutBytes channel = new HeapByteBufferWriteOutBytes();
+ try (ResourceHolder<ByteBuffer> resourceHolder = NativeClearedByteBufferProvider.INSTANCE.get()) {
+ columnHeader.transferTo(channel);
+
+ ByteBuffer byteBuffer = resourceHolder.get();
+
+ channel.writeTo(byteBuffer);
+ byteBuffer.flip();
+
+ SerializablePairLongStringColumnHeader deserializedColumnhHeader =
+ SerializablePairLongStringColumnHeader.fromBuffer(byteBuffer);
+ Assert.assertEquals(MIN_LONG, deserializedColumnhHeader.getMinValue());
+ Assert.assertFalse(deserializedColumnhHeader.isUseIntegerDeltas());
+ }
+ }
+
+ @Test
+ public void testVariedSize() throws Exception
+ {
+ int rowCount = 100;
+ int maxStringSize = 1024 * 1024;
+ int minStringSize = 1024;
+ List<SerializablePairLongString> input = new ArrayList<>(rowCount);
+ int totalCount = 0;
+
+ for (int i = 0; i < rowCount; i++) {
+ long longValue = random.nextLong();
+ SerializablePairLongString value =
+ new SerializablePairLongString(longValue, RandomStringUtils.randomAlphabetic(minStringSize, maxStringSize));
+
+ input.add(value);
+ totalCount += longValue;
+ totalCount = Math.max(totalCount, 0);
+
+ bufferStore.store(value);
+ }
+
+ IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+ int i = 0;
+
+ while (iterator.hasNext()) {
+ Assert.assertEquals(input.get(i), iterator.next());
+ i++;
+ }
+ }
+
+ @Test
+ public void testLargeBuffer() throws Exception
+ {
+ // note: tests single element larger than 64k
+ int stringSize = 128 * 1024;
+ SerializablePairLongString value =
+ new SerializablePairLongString(Long.MAX_VALUE, RandomStringUtils.randomAlphabetic(stringSize));
+
+ bufferStore.store(value);
+
+ IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+
+ Assert.assertTrue(iterator.hasNext());
+ Assert.assertEquals(value, iterator.next());
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testLargeValueCount() throws Exception
+ {
+ List<SerializablePairLongString> valueList = new ArrayList<>();
+
+ for (int i = 0; i < 10000; i++) {
+ valueList.add(new SerializablePairLongString(Integer.MAX_VALUE + (long) i, "the same string"));
+ }
+
+ assertBufferedValuesEqual(valueList);
+ }
+
+ @Test
+ public void testOverflowTransfer() throws Exception
+ {
+ bufferStore.store(new SerializablePairLongString(Long.MIN_VALUE, "fuu"));
+ bufferStore.store(new SerializablePairLongString(Long.MAX_VALUE, "fuu"));
+
+ SerializablePairLongStringColumnHeader columnHeader = bufferStore.createColumnHeader();
+
+ Assert.assertEquals(0, columnHeader.getMinValue());
+
+ SerializablePairLongStringBufferStore.TransferredBuffer transferredBuffer = bufferStore.transferToRowWriter(
+ NativeClearedByteBufferProvider.INSTANCE,
+ writeOutMedium
+ );
+
+ Assert.assertEquals(94, transferredBuffer.getSerializedSize());
+ }
+
+ @Test
+ public void testNullOnlyTransfer() throws Exception
+ {
+ bufferStore.store(null);
+
+ bufferStore.store(null);
+
+ bufferStore.store(null);
+
+ SerializablePairLongStringColumnHeader columnHeader = bufferStore.createColumnHeader();
+
+ Assert.assertEquals(0, columnHeader.getMinValue());
+
+ SerializablePairLongStringBufferStore.TransferredBuffer transferredBuffer = bufferStore.transferToRowWriter(
+ NativeClearedByteBufferProvider.INSTANCE,
+ writeOutMedium
+ );
+
+ Assert.assertEquals(59, transferredBuffer.getSerializedSize());
+ }
+
+ @Test
+ public void testTransferIntegerRange() throws Exception
+ {
+ for (SerializablePairLongString value : integerRangeArr) {
+ bufferStore.store(value);
+ }
+
+ Assert.assertTrue(bufferStore.createColumnHeader().isUseIntegerDeltas());
+
+ assertTransferredValuesEqual(integerRangeArr);
+ }
+
+ @Test
+ public void testTransferLongRange() throws Exception
+ {
+ for (SerializablePairLongString value : longRangeArr) {
+ bufferStore.store(value);
+ }
+
+ Assert.assertFalse(bufferStore.createColumnHeader().isUseIntegerDeltas());
+
+ assertTransferredValuesEqual(longRangeArr);
+ }
+
+ private void assertBufferedValuesEqual(List<SerializablePairLongString> input) throws IOException
+ {
+ for (SerializablePairLongString pairLongString : input) {
+ bufferStore.store(pairLongString);
+ }
+
+ IOIterator<SerializablePairLongString> iterator = bufferStore.iterator();
+ int i = 0;
+
+ while (iterator.hasNext()) {
+ Assert.assertEquals(input.get(i), iterator.next());
+ i++;
+ }
+
+ Assert.assertEquals(
+ StringUtils.format("element count mismatch: expected %s, got %s", input.size(), i),
+ input.size(),
+ i
+ );
+ }
+
+ private void assertTransferredValuesEqual(SerializablePairLongString[] input) throws IOException
+ {
+ SerializablePairLongStringBufferStore.TransferredBuffer transferredBuffer =
+ bufferStore.transferToRowWriter(NativeClearedByteBufferProvider.INSTANCE, writeOutMedium);
+ HeapByteBufferWriteOutBytes resultChannel = new HeapByteBufferWriteOutBytes();
+
+ transferredBuffer.writeTo(resultChannel, null);
+
+ try (SerializablePairLongStringComplexColumn column = createComplexColumn(transferredBuffer, resultChannel)) {
+ for (int i = 0; i < input.length; i++) {
+ Assert.assertEquals(input[i], column.getRowValue(i));
+ }
+ }
+ }
+
+ private static SerializablePairLongStringComplexColumn createComplexColumn(
+ SerializablePairLongStringBufferStore.TransferredBuffer transferredBuffer,
+ HeapByteBufferWriteOutBytes resultChannel
+ )
+ {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Ints.checkedCast(transferredBuffer.getSerializedSize()));
+
+ resultChannel.readFully(0, byteBuffer);
+ byteBuffer.flip();
+
+ SerializablePairLongStringComplexMetricSerde complexMetricSerde = new SerializablePairLongStringComplexMetricSerde();
+ ColumnBuilder builder = new ColumnBuilder();
+
+ complexMetricSerde.deserializeColumn(byteBuffer, builder);
+ builder.setType(ValueType.COMPLEX);
+
+ ColumnHolder columnHolder = builder.build();
+ SerializablePairLongStringComplexColumn column = (SerializablePairLongStringComplexColumn) columnHolder.getColumn();
+
+ return column;
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java
new file mode 100644
index 0000000000..c8605f4695
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.segment.GenericColumnSerializer;
+import org.apache.druid.segment.column.ColumnBuilder;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.serde.cell.RandomStringUtils;
+import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+public class SerializablePairLongStringComplexMetricSerdeTest
+{
+ private static final SerializablePairLongStringComplexMetricSerde COMPLEX_METRIC_SERDE =
+ new SerializablePairLongStringComplexMetricSerde();
+
+ // want deterministic test input
+ private final Random random = new Random(0);
+ private final RandomStringUtils randomStringUtils = new RandomStringUtils(random);
+
+ private GenericColumnSerializer<SerializablePairLongString> serializer;
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setup()
+ {
+ SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium();
+ serializer = (GenericColumnSerializer<SerializablePairLongString>) COMPLEX_METRIC_SERDE.getSerializer(
+ writeOutMedium,
+ "not-used"
+ );
+ }
+
+ @Test
+ public void testSingle() throws Exception
+ {
+ assertExpected(ImmutableList.of(new SerializablePairLongString(100L, "fuu")), 77);
+ }
+
+ @Test
+ public void testLargeString() throws Exception
+ {
+ // single entry spans more than one block in underlying storage
+ assertExpected(ImmutableList.of(new SerializablePairLongString(
+ 100L,
+ randomStringUtils.randomAlphanumeric(2 * 1024 * 1024)
+ )), 2103140);
+ }
+
+ @Test
+ public void testCompressable() throws Exception
+ {
+ int numStrings = 10;
+ List<SerializablePairLongString> valueList = new ArrayList<>();
+ List<String> stringList = new ArrayList<>();
+
+ for (int i = 0; i < numStrings; i++) {
+ stringList.add(randomStringUtils.randomAlphanumeric(1024));
+ }
+ for (int i = 0; i < 10000; i++) {
+ valueList.add(new SerializablePairLongString(Integer.MAX_VALUE + (long) i, stringList.get(i % numStrings)));
+ }
+
+ //actual input bytes in naive encoding is ~10mb
+ assertExpected(valueList, 1746026);
+ }
+
+ @Test
+ public void testHighlyCompressable() throws Exception
+ {
+ List<SerializablePairLongString> valueList = new ArrayList<>();
+
+ String stringValue = randomStringUtils.randomAlphanumeric(1024);
+ for (int i = 0; i < 10000; i++) {
+ valueList.add(new SerializablePairLongString(Integer.MAX_VALUE + (long) i, stringValue));
+ }
+
+ //actual input bytes in naive encoding is ~10mb
+ assertExpected(valueList, 289645);
+ }
+
+ @Test
+ public void testRandom() throws Exception
+ {
+ List<SerializablePairLongString> valueList = new ArrayList<>();
+
+ for (int i = 0; i < 10000; i++) {
+ valueList.add(new SerializablePairLongString(random.nextLong(), randomStringUtils.randomAlphanumeric(1024)));
+ }
+
+ assertExpected(valueList, 10428975);
+ }
+
+ @Test
+ public void testNullString() throws Exception
+ {
+ assertExpected(ImmutableList.of(new SerializablePairLongString(100L, null)), 74);
+ }
+
+ @Test
+ public void testEmpty() throws Exception
+ {
+ // minimum size for empty data
+ assertExpected(Collections.emptyList(), 57);
+ }
+
+ @Test
+ public void testSingleNull() throws Exception
+ {
+ assertExpected(Arrays.asList(new SerializablePairLongString[]{null}), 58);
+ }
+
+ @Test
+ public void testMultipleNull() throws Exception
+ {
+ assertExpected(Arrays.asList(null, null, null, null), 59);
+ }
+
+ private void assertExpected(List<SerializablePairLongString> expected) throws IOException
+ {
+ assertExpected(expected, -1);
+ }
+
+ private void assertExpected(List<SerializablePairLongString> expected, int expectedSize) throws IOException
+ {
+ List<SerializablePairLongStringValueSelector> valueSelectors =
+ expected.stream().map(SerializablePairLongStringValueSelector::new).collect(Collectors.toList());
+ ByteBuffer byteBuffer = serializeAllValuesToByteBuffer(valueSelectors, serializer, expectedSize);
+
+ try (SerializablePairLongStringComplexColumn complexColumn = createComplexColumn(byteBuffer)) {
+ for (int i = 0; i < valueSelectors.size(); i++) {
+ Assert.assertEquals(expected.get(i), complexColumn.getRowValue(i));
+ }
+ }
+ }
+
+ private SerializablePairLongStringComplexColumn createComplexColumn(ByteBuffer byteBuffer)
+ {
+ ColumnBuilder builder = new ColumnBuilder();
+ int serializedSize = byteBuffer.remaining();
+
+ COMPLEX_METRIC_SERDE.deserializeColumn(byteBuffer, builder);
+ builder.setType(ValueType.COMPLEX);
+
+ ColumnHolder columnHolder = builder.build();
+
+ SerializablePairLongStringComplexColumn column = (SerializablePairLongStringComplexColumn) columnHolder.getColumn();
+
+ Assert.assertEquals(serializedSize, column.getLength());
+ Assert.assertEquals("serializablePairLongString", column.getTypeName());
+ Assert.assertEquals(SerializablePairLongString.class, column.getClazz());
+
+ return column;
+ }
+
+
+ private static ByteBuffer serializeAllValuesToByteBuffer(
+ Collection<SerializablePairLongStringValueSelector> valueSelectors,
+ GenericColumnSerializer<SerializablePairLongString> serializer,
+ int expectedSize
+ ) throws IOException
+ {
+ serializer.open();
+
+ for (SerializablePairLongStringValueSelector valueSelector : valueSelectors) {
+ serializer.serialize(valueSelector);
+ }
+
+ return serializeToByteBuffer(serializer, expectedSize);
+ }
+
+ private static ByteBuffer serializeToByteBuffer(
+ GenericColumnSerializer<SerializablePairLongString> serializer,
+ int expectedSize
+ ) throws IOException
+ {
+ HeapByteBufferWriteOutBytes channel = new HeapByteBufferWriteOutBytes();
+
+ serializer.writeTo(channel, null);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate((int) channel.size()).order(ByteOrder.nativeOrder());
+
+ channel.readFully(0, byteBuffer);
+ byteBuffer.flip();
+
+ if (expectedSize > -1) {
+ Assert.assertEquals(expectedSize, serializer.getSerializedSize());
+ }
+
+ Assert.assertEquals(serializer.getSerializedSize(), byteBuffer.limit());
+
+ return byteBuffer;
+ }
+
+ private static class SerializablePairLongStringValueSelector
+ extends SingleValueColumnValueSelector<SerializablePairLongString>
+ {
+ public SerializablePairLongStringValueSelector(SerializablePairLongString value)
+ {
+ super(SerializablePairLongString.class, value);
+ }
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerdeTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerdeTest.java
new file mode 100644
index 0000000000..d0489cf928
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringDeltaEncodedStagedSerdeTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.segment.serde.cell.RandomStringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+public class SerializablePairLongStringDeltaEncodedStagedSerdeTest
+{
+ private static final SerializablePairLongStringDeltaEncodedStagedSerde INTEGER_SERDE =
+ new SerializablePairLongStringDeltaEncodedStagedSerde(0L, true);
+
+ private static final SerializablePairLongStringDeltaEncodedStagedSerde LONG_SERDE =
+ new SerializablePairLongStringDeltaEncodedStagedSerde(0L, false);
+
+ private final RandomStringUtils randomStringUtils = new RandomStringUtils();
+
+ @Test
+ public void testNull()
+ {
+ assertValueEquals(null, 0, INTEGER_SERDE);
+ }
+
+ @Test
+ public void testSimpleInteger()
+ {
+ assertValueEquals(new SerializablePairLongString(100L, "fuu"), 11, INTEGER_SERDE);
+ }
+
+ @Test
+ public void testNullStringInteger()
+ {
+ assertValueEquals(new SerializablePairLongString(100L, null), 8, INTEGER_SERDE);
+ }
+
+ @Test
+ public void testLargeStringInteger()
+ {
+ assertValueEquals(
+ new SerializablePairLongString(100L, randomStringUtils.randomAlphanumeric(1024 * 1024)),
+ 1048584,
+ INTEGER_SERDE
+ );
+ }
+
+ @Test
+ public void testSimpleLong()
+ {
+ assertValueEquals(new SerializablePairLongString(100L, "fuu"), 15, LONG_SERDE);
+ }
+
+ @Test
+ public void testNullStringLong()
+ {
+ assertValueEquals(new SerializablePairLongString(100L, null), 12, LONG_SERDE);
+ }
+
+ @Test
+ public void testLargeStringLong()
+ {
+ assertValueEquals(
+ new SerializablePairLongString(100L, randomStringUtils.randomAlphanumeric(10 * 1024 * 1024)),
+ 10485772,
+ LONG_SERDE
+ );
+ }
+
+ private static void assertValueEquals(
+ @Nullable SerializablePairLongString value,
+ int size,
+ SerializablePairLongStringDeltaEncodedStagedSerde serde
+ )
+ {
+ byte[] bytes = serde.serialize(value);
+ Assert.assertEquals(size, bytes.length);
+ SerializablePairLongString deserialized = serde.deserialize(bytes);
+ Assert.assertEquals(value, deserialized);
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerdeTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerdeTest.java
new file mode 100644
index 0000000000..23d57f0aa9
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringSimpleStagedSerdeTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.segment.serde.cell.RandomStringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Random;
+
+public class SerializablePairLongStringSimpleStagedSerdeTest
+{
+ private static final SerializablePairLongStringSimpleStagedSerde SERDE =
+ new SerializablePairLongStringSimpleStagedSerde();
+
+ private final RandomStringUtils randomStringUtils = new RandomStringUtils(new Random(0));
+
+ @Test
+ public void testSimple()
+ {
+ assertValueEquals(new SerializablePairLongString(Long.MAX_VALUE, "fuu"), 15);
+ }
+
+ @Test
+ public void testNull()
+ {
+ assertValueEquals(null, 0);
+ }
+
+ @Test
+ public void testNullString()
+ {
+ assertValueEquals(new SerializablePairLongString(Long.MAX_VALUE, null), 12);
+ }
+
+ @Test
+ public void testLargeString()
+ {
+ assertValueEquals(
+ new SerializablePairLongString(Long.MAX_VALUE, randomStringUtils.randomAlphanumeric(1024 * 1024)),
+ 1048588
+ );
+ }
+
+ private static void assertValueEquals(@Nullable SerializablePairLongString value, int size)
+ {
+ byte[] bytes = SERDE.serialize(value);
+ Assert.assertEquals(size, bytes.length);
+ SerializablePairLongString deserialized = SERDE.deserialize(bytes);
+ Assert.assertEquals(value, deserialized);
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SingleValueColumnValueSelector.java b/processing/src/test/java/org/apache/druid/query/aggregation/SingleValueColumnValueSelector.java
new file mode 100644
index 0000000000..8c9d232ca9
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/SingleValueColumnValueSelector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation;
+
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+public class SingleValueColumnValueSelector<T> implements ColumnValueSelector<T>
+{
+ private final Class<T> valueClass;
+ private final T value;
+
+ public SingleValueColumnValueSelector(Class<T> valueClass, T value)
+ {
+ this.valueClass = valueClass;
+ this.value = value;
+ }
+
+ @Override
+ public double getDouble()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public float getFloat()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getLong()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ }
+
+ @Override
+ public boolean isNull()
+ {
+ return false;
+ }
+
+ @Nullable
+ @Override
+ public T getObject()
+ {
+ return value;
+ }
+
+ @Override
+ public Class<? extends T> classOfObject()
+ {
+ return valueClass;
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
index 250525ca1b..267fa52ab6 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
@@ -30,7 +30,7 @@ import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.SerializablePairLongString;
-import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
+import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde;
import org.apache.druid.query.timeseries.DefaultTimeseriesQueryMetrics;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
@@ -68,7 +68,7 @@ public class StringFirstTimeseriesQueryTest extends InitializedNullHandlingTest
@Before
public void setUp() throws IndexSizeExceededException
{
- final SerializablePairLongStringSerde serde = new SerializablePairLongStringSerde();
+ final SerializablePairLongStringComplexMetricSerde serde = new SerializablePairLongStringComplexMetricSerde();
ComplexMetrics.registerSerde(serde.getTypeName(), serde);
incrementalIndex = new OnheapIncrementalIndex.Builder()
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
index 6c017bab4a..33bff8146f 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
@@ -30,7 +30,7 @@ import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.SerializablePairLongString;
-import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
+import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde;
import org.apache.druid.query.timeseries.DefaultTimeseriesQueryMetrics;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
@@ -67,7 +67,7 @@ public class StringLastTimeseriesQueryTest
@Before
public void setUp() throws IndexSizeExceededException
{
- final SerializablePairLongStringSerde serde = new SerializablePairLongStringSerde();
+ final SerializablePairLongStringComplexMetricSerde serde = new SerializablePairLongStringComplexMetricSerde();
ComplexMetrics.registerSerde(serde.getTypeName(), serde);
incrementalIndex = new OnheapIncrementalIndex.Builder()
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/ComplexMetricsTest.java b/processing/src/test/java/org/apache/druid/segment/serde/ComplexMetricsTest.java
index df580a5c3a..d9e6b7ba54 100644
--- a/processing/src/test/java/org/apache/druid/segment/serde/ComplexMetricsTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/serde/ComplexMetricsTest.java
@@ -19,7 +19,7 @@
package org.apache.druid.segment.serde;
-import org.apache.druid.query.aggregation.SerializablePairLongStringSerde;
+import org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.junit.Assert;
import org.junit.Rule;
@@ -67,9 +67,9 @@ public class ComplexMetricsTest
Assert.assertTrue(serde instanceof HyperUniquesSerde);
expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage("Incompatible serializer for type[hyperUnique] already exists. Expected [org.apache.druid.query.aggregation.SerializablePairLongStringSerde], found [org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde");
+ expectedException.expectMessage("Incompatible serializer for type[hyperUnique] already exists. Expected [org.apache.druid.query.aggregation.SerializablePairLongStringComplexMetricSerde], found [org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde");
- ComplexMetrics.registerSerde("hyperUnique", new SerializablePairLongStringSerde());
+ ComplexMetrics.registerSerde("hyperUnique", new SerializablePairLongStringComplexMetricSerde());
serde = ComplexMetrics.getSerdeForType("hyperUnique");
Assert.assertNotNull(serde);
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriterReaderTest.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriterReaderTest.java
new file mode 100644
index 0000000000..9e23d8e30f
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriterReaderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+
+/**
+ * tests both {@link BlockCompressedPayloadWriter} and {@link BlockCompressedPayloadReader}
+ */
+public class BlockCompressedPayloadWriterReaderTest extends BytesReadWriteTestBase
+{
+ public BlockCompressedPayloadWriterReaderTest()
+ {
+ super(
+ new BlockCompressedPayloadWriterToBytesWriter.Builder(
+ new BlockCompressedPayloadWriter.Builder(
+ new OnHeapMemorySegmentWriteOutMedium()
+ )
+ ),
+ ByteWriterTestHelper.ValidationFunctionBuilder.PAYLOAD_WRITER_VALIDATION_FUNCTION_FACTORY,
+ new BytesReadWriteTestCases()
+ .setTestCaseValue(BytesReadWriteTest::testSingleWriteBytes, TestCaseResult.of(4115))
+ .setTestCaseValue(BytesReadWriteTest::testSingleMultiBlockWriteBytes, TestCaseResult.of(1049169))
+ .setTestCaseValue(BytesReadWriteTest::testSingleMultiBlockWriteBytesWithPrelude, TestCaseResult.of(1053238))
+ // BytesReadWriteTest::testEmptyByteArray -> compression header is 12-bytes
+ .setTestCaseValue(BytesReadWriteTest::testEmptyByteArray, TestCaseResult.of(12))
+ .setTestCaseValue(BytesReadWriteTest::testNull, TestCaseResult.of(new NullPointerException()))
+ .setTestCaseValue(BytesReadWriteTest::testSingleLong, TestCaseResult.of(25))
+ .setTestCaseValue(BytesReadWriteTest::testVariableSizedCompressablePayloads, TestCaseResult.of(1180))
+ .setTestCaseValue(
+ BytesReadWriteTest::testOutliersInNormalDataUncompressablePayloads,
+ TestCaseResult.of(574302)
+ )
+ .setTestCaseValue(BytesReadWriteTest::testOutliersInNormalDataCompressablePayloads, TestCaseResult.of(5997))
+ .setTestCaseValue(BytesReadWriteTest::testSingleUncompressableBlock, TestCaseResult.of(65715))
+ .setTestCaseValue(BytesReadWriteTest::testSingleWriteByteBufferZSTD, TestCaseResult.of(796))
+ .setTestCaseValue(
+ BytesReadWriteTest::testSingleWriteByteBufferAlternateByteBufferProvider,
+ TestCaseResult.of(1077)
+ )
+ .setTestCaseValue(BytesReadWriteTest::testRandomBlockAccess, TestCaseResult.of(3124842))
+ );
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriterToBytesWriter.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriterToBytesWriter.java
new file mode 100644
index 0000000000..fed4032fc3
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/BlockCompressedPayloadWriterToBytesWriter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.segment.data.CompressionStrategy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+public class BlockCompressedPayloadWriterToBytesWriter implements BytesWriter
+{
+ private final BlockCompressedPayloadWriter blockCompressedPayloadWriter;
+
+ public BlockCompressedPayloadWriterToBytesWriter(BlockCompressedPayloadWriter blockCompressedPayloadWriter)
+ {
+ this.blockCompressedPayloadWriter = blockCompressedPayloadWriter;
+ }
+
+ @Override
+ public void write(byte[] payload) throws IOException
+ {
+ blockCompressedPayloadWriter.write(payload);
+ }
+
+ @Override
+ public void write(ByteBuffer cellByteBuffer) throws IOException
+ {
+ blockCompressedPayloadWriter.write(cellByteBuffer);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ blockCompressedPayloadWriter.close();
+ }
+
+ @Override
+ public void transferTo(WritableByteChannel channel) throws IOException
+ {
+ blockCompressedPayloadWriter.writeTo(channel, null);
+ }
+
+ @Override
+ public long getSerializedSize()
+ {
+ return blockCompressedPayloadWriter.getSerializedSize();
+ }
+
+ public static class Builder implements BytesWriterBuilder
+ {
+ private final BlockCompressedPayloadWriter.Builder builder;
+
+ public Builder(BlockCompressedPayloadWriter.Builder builder)
+ {
+ this.builder = builder;
+ }
+
+ @Override
+ public BytesWriter build() throws IOException
+ {
+ return new BlockCompressedPayloadWriterToBytesWriter(builder.build());
+ }
+
+ @Override
+ public BytesWriterBuilder setCompressionStrategy(CompressionStrategy compressionStrategy)
+ {
+ builder.setCompressionStrategy(compressionStrategy);
+
+ return this;
+ }
+
+ @Override
+ public BytesWriterBuilder setByteBufferProvider(ByteBufferProvider byteBufferProvider)
+ {
+ builder.setByteBufferProvider(byteBufferProvider);
+
+ return this;
+ }
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/ByteWriterTestHelper.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/ByteWriterTestHelper.java
new file mode 100644
index 0000000000..89e9b77bf9
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/ByteWriterTestHelper.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes;
+import org.junit.Assert;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+@SuppressWarnings("UnusedReturnValue")
+public class ByteWriterTestHelper
+{
+ private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new byte[0]);
+
+ private final BytesWriterBuilder bytesWriterBuilder;
+ private final ValidationFunctionBuilder validationFunctionBuilder;
+ private CompressionStrategy compressionStrategy = CompressionStrategy.LZ4;
+ private ByteBufferProvider byteBufferProvider = NativeClearedByteBufferProvider.INSTANCE;
+
+ public ByteWriterTestHelper(
+ BytesWriterBuilder bytesWriterBuilder,
+ ValidationFunctionBuilder validationFunctionBuilder
+ )
+ {
+ this.bytesWriterBuilder = bytesWriterBuilder;
+ this.validationFunctionBuilder = validationFunctionBuilder;
+ }
+
+ public ByteWriterTestHelper setCompressionStrategy(CompressionStrategy compressionStrategy)
+ {
+ this.compressionStrategy = compressionStrategy;
+ bytesWriterBuilder.setCompressionStrategy(compressionStrategy);
+
+ return this;
+ }
+
+ public ByteWriterTestHelper setByteBufferProvider(ByteBufferProvider byteBufferProvider)
+ {
+ this.byteBufferProvider = byteBufferProvider;
+ bytesWriterBuilder.setByteBufferProvider(byteBufferProvider);
+
+ return this;
+ }
+
+ public ByteBuffer writePayloadAsByteArray(ByteBuffer payload) throws IOException
+ {
+ return writePayload(payload, BufferWriterAsBytes.INSTANCE);
+ }
+
+ public ByteBuffer writePayloadAsByteBuffer(ByteBuffer payload) throws IOException
+ {
+ return writePayload(payload, BufferWriterAsBuffer.INSTANCE);
+ }
+
+ public List<ByteBuffer> generateRaggedPayloadBuffer(
+ int baseMin,
+ int baseMax,
+ int stepSize,
+ int largeSize,
+ int largeCount
+ )
+ {
+ return generateRaggedPayloadBuffer(baseMin, baseMax, stepSize, largeSize, largeCount, Integer.MAX_VALUE);
+ }
+
+ public List<ByteBuffer> generateRaggedPayloadBuffer(
+ int baseMin,
+ int baseMax,
+ int stepSize,
+ int largeSize,
+ int largeCount,
+ int modulo
+ )
+ {
+ List<ByteBuffer> byteBufferList = new ArrayList<>();
+
+ for (int i = baseMin; i < baseMax; i += stepSize) {
+ byteBufferList.add(generateIntPayloads(baseMin + i, modulo));
+ }
+
+ for (int j = 0; j < largeCount; j++) {
+ byteBufferList.add(generateIntPayloads(largeSize, modulo));
+
+ for (int i = baseMin; i < baseMax; i += stepSize) {
+ byteBufferList.add(generateIntPayloads(baseMin + i, modulo));
+ }
+ }
+
+ return byteBufferList;
+ }
+
+ public void validateRead(List<ByteBuffer> byteBufferList) throws Exception
+ {
+ ValidationFunction validationFunction = validationFunctionBuilder.build(this);
+ validationFunction.validateBufferList(byteBufferList);
+ }
+
+ public void validateReadAndSize(List<ByteBuffer> byteBufferList, int expectedSize) throws Exception
+ {
+ ValidationFunction validationFunction = validationFunctionBuilder.build(this);
+ ByteBuffer masterByteBuffer = validationFunction.validateBufferList(byteBufferList);
+ int actualSize = masterByteBuffer.limit();
+
+ if (expectedSize > -1) {
+ Assert.assertEquals(expectedSize, actualSize);
+ }
+ }
+
+ public ByteBuffer writePayload(ByteBuffer sourcePayLoad, BufferWriter bufferWriter) throws IOException
+ {
+ return writePayloadList(Collections.singletonList(sourcePayLoad), bufferWriter);
+ }
+
+ public ByteBuffer writePayloadList(List<ByteBuffer> payloadList) throws IOException
+ {
+ return writePayloadList(payloadList, BufferWriterAsBuffer.INSTANCE);
+ }
+
+ public ByteBuffer writePayloadList(List<ByteBuffer> payloadList, BufferWriter bufferWriter) throws IOException
+ {
+ BytesWriter bytesWriter = bytesWriterBuilder.build();
+
+ try {
+ for (ByteBuffer payload : payloadList) {
+ bufferWriter.writeTo(bytesWriter, payload);
+ }
+ }
+ finally {
+ bytesWriter.close();
+ }
+
+
+ HeapByteBufferWriteOutBytes bufferWriteOutBytes = new HeapByteBufferWriteOutBytes();
+
+ bytesWriter.transferTo(bufferWriteOutBytes);
+
+ int payloadSerializedSize = Ints.checkedCast(bytesWriter.getSerializedSize());
+ ByteBuffer masterByteBuffer = ByteBuffer.allocate(payloadSerializedSize).order(ByteOrder.nativeOrder());
+
+ bufferWriteOutBytes.readFully(0, masterByteBuffer);
+ masterByteBuffer.flip();
+
+ Assert.assertEquals(bytesWriter.getSerializedSize(), masterByteBuffer.limit());
+
+ return masterByteBuffer;
+ }
+
+ public ByteBuffer generateIntPayloads(int intCount)
+ {
+ return generateIntPayloads(intCount, Integer.MAX_VALUE);
+ }
+
+ public ByteBuffer generateIntPayloads(int intCount, int modulo)
+ {
+ ByteBuffer payload = ByteBuffer.allocate(Integer.BYTES * intCount).order(ByteOrder.nativeOrder());
+
+ for (int i = intCount - 1; i >= 0; i--) {
+
+ payload.putInt(i % modulo);
+ }
+
+ payload.flip();
+
+ return payload;
+ }
+
+ @Nonnull
+ public ByteBuffer generateBufferWithLongs(int longCount)
+ {
+ ByteBuffer longPayload = ByteBuffer.allocate(Long.BYTES * longCount).order(ByteOrder.nativeOrder());
+
+ for (int i = 0; i < longCount; i++) {
+ longPayload.putLong(longCount - i - 1);
+ }
+
+ longPayload.flip();
+
+ return longPayload;
+ }
+
+ public ByteBuffer validateBufferWriteAndReadBlockCompressed(
+ List<ByteBuffer> bufferList,
+ boolean useRandom
+ ) throws IOException
+ {
+ long position = 0;
+ List<PayloadEntrySpan> payloadReadList = new ArrayList<>();
+
+ for (ByteBuffer byteBuffer : bufferList) {
+ int expectedSize = byteBuffer == null ? 0 : byteBuffer.limit();
+ payloadReadList.add(new PayloadEntrySpan(position, expectedSize));
+ position += expectedSize;
+ }
+
+ ByteBuffer masterByteBuffer = writePayloadList(bufferList, new BufferWriterAsBytes());
+
+ return readAndValidatePayloads(bufferList, useRandom, payloadReadList, masterByteBuffer);
+ }
+
+ @Nonnull
+ private ByteBuffer readAndValidatePayloads(
+ List<ByteBuffer> bufferList,
+ boolean useRandom,
+ List<PayloadEntrySpan> payloadReadList,
+ ByteBuffer masterByteBuffer
+ ) throws IOException
+ {
+ try (BlockCompressedPayloadReader payloadReader = BlockCompressedPayloadReader.create(
+ masterByteBuffer,
+ byteBufferProvider,
+ compressionStrategy.getDecompressor()
+ )) {
+ List<Integer> positions = new ArrayList<>(bufferList.size());
+
+ for (int i = 0; i < bufferList.size(); i++) {
+ positions.add(i);
+ }
+
+ Random random = new Random(0);
+
+ if (useRandom) {
+ Collections.shuffle(positions, random);
+ }
+
+ for (int index : positions) {
+ ByteBuffer expectedByteBuffer = bufferList.get(index);
+ PayloadEntrySpan payloadEntrySpan = payloadReadList.get(index);
+ ByteBuffer readByteBuffer = payloadReader.read(payloadEntrySpan.getStart(), payloadEntrySpan.getSize());
+
+ if (expectedByteBuffer == null) {
+ Assert.assertEquals(StringUtils.format("expected empty buffer %s", index), EMPTY_BYTE_BUFFER, readByteBuffer);
+ } else {
+ Assert.assertEquals(StringUtils.format("failure on buffer %s", index), expectedByteBuffer, readByteBuffer);
+ }
+ }
+
+ return masterByteBuffer;
+ }
+ }
+
+ public ByteBuffer validateBufferWriteAndReadCells(List<ByteBuffer> bufferList, boolean useRandomRead)
+ throws IOException
+ {
+ ByteBuffer masterByteBuffer = writePayloadList(bufferList, new BufferWriterAsBytes());
+
+ return readAndValidateCells(bufferList, useRandomRead, masterByteBuffer);
+ }
+
+ @Nonnull
+ private ByteBuffer readAndValidateCells(
+ List<ByteBuffer> bufferList,
+ boolean useRandomRead,
+ ByteBuffer masterByteBuffer
+ ) throws IOException
+ {
+ try (CellReader cellReader = new CellReader.Builder(masterByteBuffer)
+ .setByteBufferProvider(byteBufferProvider)
+ .setCompressionStrategy(compressionStrategy)
+ .build()) {
+
+ List<Integer> positions = new ArrayList<>(bufferList.size());
+
+ for (int i = 0; i < bufferList.size(); i++) {
+ positions.add(i);
+ }
+
+ Random random = new Random(0);
+
+ if (useRandomRead) {
+ Collections.shuffle(positions, random);
+ }
+
+
+ for (int index : positions) {
+ ByteBuffer expectedByteBuffer = bufferList.get(index);
+
+ ByteBuffer readByteBuffer = cellReader.getCell(index);
+ if (expectedByteBuffer == null) {
+ Assert.assertEquals(StringUtils.format("failure on buffer %s", index), 0L, readByteBuffer.remaining());
+ } else {
+ Assert.assertEquals(StringUtils.format("failure on buffer %s", index), expectedByteBuffer, readByteBuffer);
+ }
+ }
+
+ return masterByteBuffer;
+ }
+ }
+
+ public ByteWriterTestHelper setUseRandomReadOrder(boolean useReadRandom)
+ {
+ validationFunctionBuilder.setReadRandom(useReadRandom);
+
+ return this;
+ }
+
+ public interface BufferWriter
+ {
+ void writeTo(BytesWriter writer, ByteBuffer payload) throws IOException;
+ }
+
+ public static class BufferWriterAsBytes implements BufferWriter
+ {
+ public static final BufferWriterAsBytes INSTANCE = new BufferWriterAsBytes();
+
+ @Override
+ public void writeTo(BytesWriter writer, ByteBuffer payload) throws IOException
+ {
+ if (payload == null) {
+ writer.write((byte[]) null);
+ } else {
+ writer.write(payload.array());
+ }
+ }
+ }
+
+ public static class BufferWriterAsBuffer implements BufferWriter
+ {
+ public static final BufferWriterAsBuffer INSTANCE = new BufferWriterAsBuffer();
+
+ @Override
+ public void writeTo(BytesWriter writer, ByteBuffer payload) throws IOException
+ {
+ writer.write(payload);
+ }
+ }
+
+ public interface ValidationFunction
+ {
+ ByteBuffer validateBufferList(List<ByteBuffer> byteBufferList) throws Exception;
+ }
+
+ public interface ValidationFunctionBuilder
+ {
+ ValidationFunctionBuilder PAYLOAD_WRITER_VALIDATION_FUNCTION_FACTORY = new PayloadWriterValidationFunctionBuilder();
+
+ ValidationFunctionBuilder CELL_READER_VALIDATION_FUNCTION_FACTORY = new CellReaderValidationFunctionBuilder();
+
+ ValidationFunction build(ByteWriterTestHelper testHelper);
+
+ ValidationFunctionBuilder setReadRandom(boolean useRandomRead);
+ }
+
+ public static class PayloadWriterValidationFunctionBuilder implements ValidationFunctionBuilder
+ {
+ private boolean useRandomRead;
+
+ @Override
+ public ValidationFunctionBuilder setReadRandom(boolean useRandomRead)
+ {
+ this.useRandomRead = useRandomRead;
+
+ return this;
+ }
+
+ @Override
+ public ValidationFunction build(ByteWriterTestHelper testHelper)
+ {
+ return bufferList -> testHelper.validateBufferWriteAndReadBlockCompressed(bufferList, useRandomRead);
+ }
+ }
+
+ public static class CellReaderValidationFunctionBuilder implements ValidationFunctionBuilder
+ {
+ private boolean useRandomRead;
+
+ @Override
+ public ValidationFunction build(ByteWriterTestHelper testHelper)
+ {
+ return bufferList -> testHelper.validateBufferWriteAndReadCells(bufferList, useRandomRead);
+ }
+
+ @Override
+ public ValidationFunctionBuilder setReadRandom(boolean useRandomRead)
+ {
+ this.useRandomRead = useRandomRead;
+ return this;
+ }
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTest.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTest.java
new file mode 100644
index 0000000000..516b913111
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+public interface BytesReadWriteTest
+{
+ void testSingleWriteBytes() throws Exception;
+
+ void testSingleMultiBlockWriteBytes() throws Exception;
+
+ void testSingleMultiBlockWriteBytesWithPrelude() throws Exception;
+
+ void testEmptyByteArray() throws Exception;
+
+ void testNull() throws Exception;
+
+ void testSingleLong() throws Exception;
+
+ void testVariableSizedCompressablePayloads() throws Exception;
+
+ void testOutliersInNormalDataUncompressablePayloads() throws Exception;
+
+ void testOutliersInNormalDataCompressablePayloads() throws Exception;
+
+ void testSingleUncompressableBlock() throws Exception;
+
+ void testSingleWriteByteBufferZSTD() throws Exception;
+
+ void testSingleWriteByteBufferAlternateByteBufferProvider() throws Exception;
+
+ void testRandomBlockAccess() throws Exception;
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTestBase.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTestBase.java
new file mode 100644
index 0000000000..f8c79ab606
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTestBase.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+// base class used only for extension
+public abstract class BytesReadWriteTestBase implements BytesReadWriteTest
+{
+ protected final BytesWriterBuilder bytesWriterBuilder;
+
+ private final TestCasesConfig<BytesReadWriteTest> testCases;
+ private final ByteWriterTestHelper.ValidationFunctionBuilder validationFunctionBuilder;
+
+ private ByteWriterTestHelper testHelper;
+
+ protected BytesReadWriteTestBase(
+ BytesWriterBuilder bytesWriterBuilder,
+ ByteWriterTestHelper.ValidationFunctionBuilder validationFunctionBuilder,
+ TestCasesConfig<BytesReadWriteTest> testCases
+ )
+ {
+ this.testCases = testCases;
+ this.bytesWriterBuilder = bytesWriterBuilder;
+ this.validationFunctionBuilder = validationFunctionBuilder;
+ }
+
+ protected ByteWriterTestHelper getTestHelper()
+ {
+ return testHelper;
+ }
+
+ @Before
+ public void setup()
+ {
+ testHelper = new ByteWriterTestHelper(bytesWriterBuilder, validationFunctionBuilder);
+ }
+
+ @Test
+ @Override
+ public void testSingleWriteBytes() throws Exception
+ {
+ Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+ ByteBuffer payload = testHelper.generateBufferWithLongs(1024);
+
+ runTestWithExceptionHandling(Collections.singletonList(payload), testCases.currentTestValue());
+ }
+
+ @Test
+ @Override
+ public void testSingleMultiBlockWriteBytes() throws Exception
+ {
+ Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+ ByteBuffer payload = testHelper.generateBufferWithLongs(256 * 1024); // 2mb
+
+ runTestWithExceptionHandling(Collections.singletonList(payload), testCases.currentTestValue());
+ }
+
+ @Test
+ @Override
+ public void testSingleMultiBlockWriteBytesWithPrelude() throws Exception
+ {
+ Assume.assumeTrue(testCases.isCurrentTestEnabled());
+ ByteBuffer payload1 = testHelper.generateBufferWithLongs(1024); // 8 kb
+ ByteBuffer payload2 = testHelper.generateBufferWithLongs(256 * 1024); // 256kb * 8 = 2mb
+
+ runTestWithExceptionHandling(Arrays.asList(payload1, payload2), testCases.currentTestValue());
+ }
+
+ @Test
+ @Override
+ public void testEmptyByteArray() throws Exception
+ {
+ Assume.assumeTrue(testCases.isCurrentTestEnabled());
+ // no-op
+ ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
+ // block index size: "8" : 4 bytes
+ // block index entry 0: "0": 4 bytes
+ // block index entry 1: "1": 4 bytes
+ // data stream size : "0" : 4 bytes
+ runTestWithExceptionHandling(Collections.singletonList(payload), testCases.currentTestValue());
+ }
+
+ @Test
+ @Override
+ public void testNull() throws Exception
+ {
+ Assume.assumeTrue(testCases.isCurrentTestEnabled());
+ TestCaseResult testCaseResult = testCases.currentTestValue();
+
+ runTestWithExceptionHandling(Collections.singletonList(null), testCaseResult);
+ }
+
+ @Test
+ @Override
+ public void testSingleLong() throws Exception
+ {
+ Assume.assumeTrue(testCases.isCurrentTestEnabled());
+ ByteBuffer payload = testHelper.generateBufferWithLongs(1);
+ // block index size: "8" : 4 bytes
+ // block index entry 0: "0": 4 bytes
+ // block index entry 1: "0": 4 bytes
+ // data stream size : "1" : 4 bytes
+ // compressed single 8 bytes: 9 bytes (compressed: "0")
+ runTestWithExceptionHandling(Collections.singletonList(payload), testCases.currentTestValue());
+ }
+
+ @Test
+ @Override
+ public void testVariableSizedCompressablePayloads() throws Exception
+ {
+ Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+ List<ByteBuffer> bufferList = testHelper.generateRaggedPayloadBuffer(100, 1024, 10, 0, 0, 10);
+
+ runTestWithExceptionHandling(bufferList, testCases.currentTestValue());
+ }
+
+ @Test
+ @Override
+ public void testOutliersInNormalDataUncompressablePayloads() throws Exception
+ {
+ Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+ // every integer within a payload is unique
+ List<ByteBuffer> bufferList = testHelper.generateRaggedPayloadBuffer(100, 1024, 10, 64 * 1024, 2);
+
+ runTestWithExceptionHandling(bufferList, testCases.currentTestValue());
+ }
+
+ @Test
+ @Override
+ public void testOutliersInNormalDataCompressablePayloads() throws Exception
+ {
+ Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+ // same # of payloads and size of payloads as testOutliersInNormalDataUncompressablePayloads()
+ // integer values range 0-9
+ List<ByteBuffer> bufferList = testHelper.generateRaggedPayloadBuffer(100, 1024, 10, 64 * 1024, 2, 10);
+
+ runTestWithExceptionHandling(bufferList, testCases.currentTestValue());
+ }
+
+ @Test
+ @Override
+ public void testSingleUncompressableBlock() throws Exception
+ {
+ Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+ // every integer within a payload is unique
+ ByteBuffer byteBuffer = testHelper.generateIntPayloads(16 * 1024);
+
+ Assert.assertEquals(64 * 1024, byteBuffer.limit());
+ // uncompressable 64k block size
+ runTestWithExceptionHandling(Collections.singletonList(byteBuffer), testCases.currentTestValue());
+
+ }
+
+ @Test
+ @Override
+ public void testSingleWriteByteBufferZSTD() throws Exception
+ {
+ Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+ ByteBuffer sourcePayLoad = testHelper.generateBufferWithLongs(1024); // 8k
+
+ testHelper.setCompressionStrategy(CompressionStrategy.ZSTD);
+ runTestWithExceptionHandling(Collections.singletonList(sourcePayLoad), testCases.currentTestValue());
+ }
+
+
+ @Test
+ @Override
+ public void testSingleWriteByteBufferAlternateByteBufferProvider() throws Exception
+ {
+ Assume.assumeTrue(testCases.isCurrentTestEnabled());
+
+ List<ByteBuffer> bufferList = testHelper.generateRaggedPayloadBuffer(100, 1024, 10, 0, 0, 10);
+
+ testHelper.setByteBufferProvider(() -> new ResourceHolder<ByteBuffer>()
+ {
+ @Override
+ public ByteBuffer get()
+ {
+ return ByteBuffer.allocate(128 * 1024);
+ }
+
+ @Override
+ public void close()
+ {
+ }
+ });
+ runTestWithExceptionHandling(bufferList, testCases.currentTestValue());
+ }
+
+ @Test
+ @Override
+ public void testRandomBlockAccess() throws Exception
+ {
+ Assume.assumeTrue(testCases.isCurrentTestEnabled());
+ //verified that blocks are accessed in random order and the same block is even returned to
+ List<ByteBuffer> bufferList = testHelper.generateRaggedPayloadBuffer(8192, 32 * 1024, 256, 256 * 1024, 3, 1024);
+
+ testHelper.setUseRandomReadOrder(true);
+ runTestWithExceptionHandling(bufferList, testCases.currentTestValue());
+ }
+
+ private void runTestWithExceptionHandling(List<ByteBuffer> bufferList, TestCaseResult testCaseResult) throws Exception
+ {
+ try {
+ testHelper.validateReadAndSize(bufferList, testCaseResult.size);
+
+ if (testCaseResult.exception != null) {
+ Assert.fail("expected exception " + testCaseResult.exception.getClass().getName());
+ }
+ }
+ catch (Exception e) {
+ if (testCaseResult.exception != null) {
+ Assert.assertTrue(testCaseResult.exception.getClass().isAssignableFrom(e.getClass()));
+ } else {
+ throw e;
+ }
+ }
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTestCases.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTestCases.java
new file mode 100644
index 0000000000..2bedcfb821
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesReadWriteTestCases.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+public class BytesReadWriteTestCases extends TestCasesConfig<BytesReadWriteTest>
+{
+ public BytesReadWriteTestCases()
+ {
+ super(BytesReadWriteTest.class, BytesReadWriteTestBase.class);
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesWriter.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesWriter.java
new file mode 100644
index 0000000000..13d3c595fe
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * This interface is used so that both CellWriter[.Builder] and BlockCompressedPayloadWriter[.Builder] may use the
+ * same test code.
+ */
+public interface BytesWriter extends Closeable
+{
+ void write(byte[] cellBytes) throws IOException;
+
+ void write(ByteBuffer cellByteBuffer) throws IOException;
+
+ @Override
+ void close() throws IOException;
+
+ void transferTo(WritableByteChannel channel) throws IOException;
+
+ long getSerializedSize();
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesWriterBuilder.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesWriterBuilder.java
new file mode 100644
index 0000000000..7b2703bb7b
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/BytesWriterBuilder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.segment.data.CompressionStrategy;
+
+import java.io.IOException;
+
+/**
+ * this interface is used so that both CellWriter[.Builder] and BlockCompressedPayload[.Builder] may use the
+ * same test code. production code should not use this and use the classes directly
+ */
+
+public interface BytesWriterBuilder
+{
+ BytesWriter build() throws IOException;
+
+ BytesWriterBuilder setCompressionStrategy(CompressionStrategy compressionStrategy);
+
+ BytesWriterBuilder setByteBufferProvider(ByteBufferProvider byteBufferProvider);
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/CellWriterReaderTest.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/CellWriterReaderTest.java
new file mode 100644
index 0000000000..ddc338dfdb
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/CellWriterReaderTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import com.google.common.primitives.Ints;
+import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * tests both {@link CellWriter} and {@link CellReader}
+ */
+public class CellWriterReaderTest extends BytesReadWriteTestBase
+{
+ public CellWriterReaderTest()
+ {
+ super(
+ new CellWriterToBytesWriter.Builder(
+ new CellWriter.Builder(new OnHeapMemorySegmentWriteOutMedium())
+ ),
+ ByteWriterTestHelper.ValidationFunctionBuilder.CELL_READER_VALIDATION_FUNCTION_FACTORY,
+ new BytesReadWriteTestCases()
+ .setTestCaseValue(BytesReadWriteTest::testSingleLong, TestCaseResult.of(62))
+ .setTestCaseValue(BytesReadWriteTest::testEmptyByteArray, TestCaseResult.of(46))
+ .setTestCaseValue(BytesReadWriteTest::testNull, TestCaseResult.of(46))
+ .setTestCaseValue(BytesReadWriteTest::testSingleWriteBytes, TestCaseResult.of(4151))
+ .setTestCaseValue(BytesReadWriteTest::testSingleMultiBlockWriteBytes, TestCaseResult.of(1049204))
+ .setTestCaseValue(BytesReadWriteTest::testSingleMultiBlockWriteBytesWithPrelude, TestCaseResult.of(1053277))
+ .setTestCaseValue(BytesReadWriteTest::testVariableSizedCompressablePayloads, TestCaseResult.of(1655))
+ .setTestCaseValue(BytesReadWriteTest::testOutliersInNormalDataCompressablePayloads, TestCaseResult.of(7368))
+ .setTestCaseValue(
+ BytesReadWriteTest::testOutliersInNormalDataUncompressablePayloads,
+ TestCaseResult.of(575673)
+ )
+ .setTestCaseValue(BytesReadWriteTest::testSingleUncompressableBlock, TestCaseResult.of(65750))
+ .setTestCaseValue(BytesReadWriteTest::testSingleWriteByteBufferZSTD, TestCaseResult.of(845))
+ .setTestCaseValue(
+ BytesReadWriteTest::testSingleWriteByteBufferAlternateByteBufferProvider,
+ TestCaseResult.of(1552)
+ )
+ .setTestCaseValue(BytesReadWriteTest::testRandomBlockAccess, TestCaseResult.of(3126618))
+ );
+ }
+
+ @Test
+ public void testBasic() throws Exception
+ {
+ // generates a list of randomly variable-sized payloads in a range
+ List<ByteBuffer> byteBufferList = getTestHelper().generateRaggedPayloadBuffer(
+ 500,
+ 2048,
+ 25,
+ 64 * 1024,
+ 2,
+ 10 * 1024
+ );
+ // for test only, we store bytes heap buffers
+ HeapByteBufferWriteOutBytes writableChannel = new HeapByteBufferWriteOutBytes();
+ long size;
+
+ try (CellWriter cellWriter = CellWriter.builder(new OnHeapMemorySegmentWriteOutMedium()).build()) {
+ // write our payloads
+ for (ByteBuffer byteBuffer : byteBufferList) {
+ cellWriter.write(byteBuffer);
+ }
+ // finalize the internal buffer in the CellWriter
+ cellWriter.close();
+ // transfter the buffer to our WritableByteChannel
+ cellWriter.writeTo(writableChannel, null);
+ size = cellWriter.getSerializedSize();
+ }
+
+ // transfer the bytes into a ByteBuffer. Normally the WritableByteChannel would be to a file that could be
+ // memory mapped into a ByteBuffer
+ ByteBuffer storedByteBuffer = ByteBuffer.allocate(Ints.checkedCast(size));
+
+ writableChannel.readFully(0, storedByteBuffer);
+ storedByteBuffer.flip();
+
+ try (CellReader cellReader = CellReader.builder(storedByteBuffer).build()) {
+ for (int i = 0; i < byteBufferList.size(); i++) {
+ ByteBuffer buffer = byteBufferList.get(i);
+ ByteBuffer readCell = cellReader.getCell(i);
+
+ Assert.assertEquals(buffer, readCell);
+ }
+ }
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/CellWriterToBytesWriter.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/CellWriterToBytesWriter.java
new file mode 100644
index 0000000000..d60eec3fae
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/CellWriterToBytesWriter.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import org.apache.druid.segment.data.CompressionStrategy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+public class CellWriterToBytesWriter implements BytesWriter
+{
+ private final CellWriter cellWriter;
+
+ public CellWriterToBytesWriter(CellWriter cellWriter)
+ {
+ this.cellWriter = cellWriter;
+ }
+
+ @Override
+ public void write(byte[] cellBytes) throws IOException
+ {
+ cellWriter.write(cellBytes);
+ }
+
+ @Override
+ public void write(ByteBuffer cellByteBuffer) throws IOException
+ {
+ cellWriter.write(cellByteBuffer);
+ }
+
+ @Override
+ public void transferTo(WritableByteChannel channel) throws IOException
+ {
+ cellWriter.writeTo(channel, null);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ cellWriter.close();
+ }
+
+ @Override
+ public long getSerializedSize()
+ {
+ return cellWriter.getSerializedSize();
+ }
+
+ public static class Builder implements BytesWriterBuilder
+ {
+ private final CellWriter.Builder builder;
+
+ public Builder(CellWriter.Builder builder)
+ {
+ this.builder = builder;
+ }
+
+ @Override
+ public BytesWriterBuilder setCompressionStrategy(CompressionStrategy compressionStrategy)
+ {
+ builder.setCompressionStrategy(compressionStrategy);
+
+ return this;
+ }
+
+ @Override
+ public BytesWriterBuilder setByteBufferProvider(ByteBufferProvider byteBufferProvider)
+ {
+ builder.setByteBufferProvider(byteBufferProvider);
+
+ return this;
+ }
+
+ @Override
+ public BytesWriter build() throws IOException
+ {
+ return new CellWriterToBytesWriter(builder.build());
+ }
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/RandomStringUtils.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/RandomStringUtils.java
new file mode 100644
index 0000000000..a7b148fcda
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/RandomStringUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import java.util.Random;
+
+public class RandomStringUtils
+{
+ private final Random random;
+
+ public RandomStringUtils()
+ {
+ random = new Random(0);
+ }
+
+ public RandomStringUtils(Random random)
+ {
+ this.random = random;
+ }
+
+ public String randomAlphanumeric(int length)
+ {
+ return org.apache.commons.lang3.RandomStringUtils.random(length, 0, 0, true, true, null, random);
+ }
+
+ public String randomAlphanumeric(int minLength, int maxLength)
+ {
+ int length = random.nextInt(maxLength - minLength) + minLength;
+
+ return org.apache.commons.lang3.RandomStringUtils.random(length, 0, 0, true, true, null, random);
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/TestCaseResult.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/TestCaseResult.java
new file mode 100644
index 0000000000..a75b29fd0e
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/TestCaseResult.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+public class TestCaseResult
+{
+ public final byte[] bytes;
+ public final int size;
+ public final Exception exception;
+
+ private TestCaseResult(byte[] bytes, int size, Exception exception)
+ {
+ this.bytes = bytes;
+ this.size = size;
+ this.exception = exception;
+ }
+
+ public static TestCaseResult of(Exception exception)
+ {
+ return new TestCaseResult(null, -1, exception);
+ }
+
+ public static TestCaseResult of(int sizeBytes)
+ {
+ return new TestCaseResult(null, sizeBytes, null);
+ }
+
+ public static TestCaseResult of(byte[] bytes)
+ {
+ return new TestCaseResult(bytes, bytes.length, null);
+ }
+
+ public static TestCaseResult of(byte[] bytes, int sizeBytes)
+ {
+ return new TestCaseResult(bytes, sizeBytes, null);
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/serde/cell/TestCasesConfig.java b/processing/src/test/java/org/apache/druid/segment/serde/cell/TestCasesConfig.java
new file mode 100644
index 0000000000..078113cc72
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/serde/cell/TestCasesConfig.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.serde.cell;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class TestCasesConfig<T>
+{
+ private final MethodCallCapturer<T> methodCallCapturer;
+ private final Class<T> testCasesInterface;
+ private final Class<? extends T> testClassImpl;
+ private final Map<TestMethodHandle, TestCaseResult> testCasesToRun = new LinkedHashMap<>();
+
+ public TestCasesConfig(Class<T> testCasesInterface, Class<? extends T> testClassImpl)
+ {
+ methodCallCapturer = new MethodCallCapturer<>(testCasesInterface);
+ this.testCasesInterface = testCasesInterface;
+ this.testClassImpl = testClassImpl;
+ }
+
+ public TestCasesConfig<T> setTestCaseValue(TestMethodHandle testMethodHandle, TestCaseResult expectedResult)
+ {
+ testCasesToRun.put(testMethodHandle, expectedResult);
+
+ return this;
+ }
+
+ public TestCasesConfig<T> setTestCaseValue(MethodAccess<T, Exception> methodAccess, TestCaseResult expectedResult)
+ {
+ TestMethodHandle testMethodHandle = capture(methodAccess);
+ testCasesToRun.put(testMethodHandle, expectedResult);
+
+ return this;
+ }
+
+ public TestCasesConfig<T> setTestCaseValue(MethodAccess<T, Exception> methodAccess, int sizeBytes)
+ {
+ TestMethodHandle testMethodHandle = capture(methodAccess);
+ testCasesToRun.put(testMethodHandle, TestCaseResult.of(sizeBytes));
+
+ return this;
+ }
+
+ public TestCasesConfig<T> setTestCaseValue(MethodAccess<T, Exception> methodAccess, byte[] bytes)
+ {
+ TestMethodHandle testMethodHandle = capture(methodAccess);
+ testCasesToRun.put(testMethodHandle, TestCaseResult.of(bytes));
+
+ return this;
+ }
+
+ public TestCasesConfig<T> enableTestCase(MethodAccess<T, Exception> methodAccess)
+ {
+ TestMethodHandle testMethodHandle = capture(methodAccess);
+ testCasesToRun.put(testMethodHandle, TestCaseResult.of(-1));
+
+ return this;
+ }
+
+ public TestCaseResult currentTestValue()
+ {
+ TestMethodHandle currentTestMethodHandle = getCurrentTestMethod();
+ return testCasesToRun.get(currentTestMethodHandle);
+ }
+
+ public boolean isCurrentTestEnabled()
+ {
+ TestMethodHandle currentTestMethodHandle = getCurrentTestMethod();
+ return testCasesToRun.containsKey(currentTestMethodHandle);
+ }
+
+ private TestMethodHandle capture(MethodAccess<T, Exception> access)
+ {
+ try {
+ Method method = methodCallCapturer.captureMethod(access);
+ TestMethodHandle testMethodHandle = new TestMethodHandle(method.getName());
+
+ return testMethodHandle;
+ }
+ catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private TestMethodHandle getCurrentTestMethod()
+ {
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ String thisMethodName = stackTrace[3].getMethodName();
+
+ return new TestMethodHandle(thisMethodName);
+ }
+
+ public class TestMethodHandle
+ {
+ private final String name;
+
+ public TestMethodHandle(String name)
+ {
+ this.name = name;
+ try {
+ // validate method exists
+ MethodHandles.lookup()
+ .findVirtual(testCasesInterface, name, MethodType.methodType(void.class));
+ // validate method exists
+ MethodHandles.lookup()
+ .findVirtual(testClassImpl, name, MethodType.methodType(void.class));
+ }
+ catch (NoSuchMethodException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getName()
+ {
+ return testCasesInterface.getName() + "::void " + name + "()";
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return getName().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj != null && this.getClass().equals(obj.getClass())) {
+ return getName().equals(((TestMethodHandle) obj).getName());
+ }
+
+ return false;
+ }
+
+
+ @Override
+ public String toString()
+ {
+ return getName();
+ }
+ }
+
+ public interface MethodAccess<I, T extends Throwable>
+ {
+ void access(I input) throws T;
+ }
+
+ private static class MethodCallCapturer<T> implements InvocationHandler
+ {
+ private volatile Method lastMethod = null;
+ private final T wrapper;
+
+ @SuppressWarnings("unchecked")
+ public MethodCallCapturer(Class<T> clazz)
+ {
+ wrapper = (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
+ }
+
+ public <E extends Throwable> Method captureMethod(MethodAccess<T, E> access) throws Throwable
+ {
+ access.access(wrapper);
+
+ return lastMethod;
+ }
+
+
+ @SuppressWarnings("ReturnOfNull")
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ {
+ lastMethod = method;
+
+ // unused
+ return null;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org