You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2020/03/27 19:17:54 UTC
[druid] branch 0.18.0 updated: error on value counter overflow
instead of writing sad segments (#9559) (#9572)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 0.18.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.18.0 by this push:
new 0fcdfb0 error on value counter overflow instead of writing sad segments (#9559) (#9572)
0fcdfb0 is described below
commit 0fcdfb083c7b9dcfd8810db0d36582e0ca948e1a
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Fri Mar 27 12:17:41 2020 -0700
error on value counter overflow instead of writing sad segments (#9559) (#9572)
---
.../FloatCompressionBenchmarkFileGenerator.java | 36 ++--
.../LongCompressionBenchmarkFileGenerator.java | 36 ++--
.../druid/segment/DoubleColumnSerializer.java | 7 +-
.../druid/segment/DoubleColumnSerializerV2.java | 6 +
.../druid/segment/FloatColumnSerializer.java | 7 +-
.../druid/segment/FloatColumnSerializerV2.java | 6 +
.../org/apache/druid/segment/IndexMergerV9.java | 6 +
.../apache/druid/segment/LongColumnSerializer.java | 7 +-
.../druid/segment/LongColumnSerializerV2.java | 6 +
.../druid/segment/StringDimensionMergerV9.java | 5 +-
.../data/BlockLayoutColumnarDoublesSerializer.java | 12 ++
.../data/BlockLayoutColumnarFloatsSerializer.java | 6 +
.../data/BlockLayoutColumnarLongsSerializer.java | 6 +
...r.java => ColumnCapacityExceededException.java} | 24 ++-
.../segment/data/ColumnarDoublesSerializer.java | 1 +
.../data/CompressedColumnarIntsSerializer.java | 9 +-
.../data/CompressedColumnarIntsSupplier.java | 20 ++
.../CompressedVSizeColumnarIntsSerializer.java | 10 +
.../data/CompressedVSizeColumnarIntsSupplier.java | 29 +++
.../druid/segment/data/CompressionFactory.java | 38 +++-
.../EntireLayoutColumnarDoublesSerializer.java | 13 +-
.../data/EntireLayoutColumnarFloatsSerializer.java | 9 +-
.../data/EntireLayoutColumnarLongsSerializer.java | 6 +
.../data/IntermediateColumnarLongsSerializer.java | 9 +-
...CompressedVSizeColumnarMultiIntsSerializer.java | 10 +
...V3CompressedVSizeColumnarMultiIntsSupplier.java | 21 ++
.../data/VSizeColumnarMultiIntsSerializer.java | 11 +-
.../data/CompressedColumnarIntsSerializerTest.java | 125 +++++++----
...deTest.java => CompressedDoublesSerdeTest.java} | 140 +++++++-----
.../segment/data/CompressedFloatsSerdeTest.java | 40 ++++
.../data/CompressedLongsAutoEncodingSerdeTest.java | 1 +
.../segment/data/CompressedLongsSerdeTest.java | 45 ++++
.../CompressedVSizeColumnarIntsSerializerTest.java | 52 ++++-
...ressedVSizeColumnarMultiIntsSerializerTest.java | 237 +++++++++++++++------
34 files changed, 779 insertions(+), 217 deletions(-)
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java
index ef3bffc..1570619 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java
@@ -65,21 +65,26 @@ public class FloatCompressionBenchmarkFileGenerator
dirPath = args[0];
}
- BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated("", ValueType.FLOAT, true, 1, 0d,
- ImmutableList.of(
- 0f,
- 1.1f,
- 2.2f,
- 3.3f,
- 4.4f
- ),
- ImmutableList.of(
- 0.95,
- 0.001,
- 0.0189,
- 0.03,
- 0.0001
- )
+ BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated(
+ "",
+ ValueType.FLOAT,
+ true,
+ 1,
+ 0d,
+ ImmutableList.of(
+ 0f,
+ 1.1f,
+ 2.2f,
+ 3.3f,
+ 4.4f
+ ),
+ ImmutableList.of(
+ 0.95,
+ 0.001,
+ 0.0189,
+ 0.03,
+ 0.0001
+ )
);
BenchmarkColumnSchema zipfLowSchema = BenchmarkColumnSchema.makeZipf(
"",
@@ -151,6 +156,7 @@ public class FloatCompressionBenchmarkFileGenerator
File dataFile = new File(dir, entry.getKey());
ColumnarFloatsSerializer writer = CompressionFactory.getFloatSerializer(
+ "float-benchmark",
new OffHeapMemorySegmentWriteOutMedium(),
"float",
ByteOrder.nativeOrder(),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java b/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java
index 31c39fe..ead3dd0 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java
@@ -66,21 +66,26 @@ public class LongCompressionBenchmarkFileGenerator
dirPath = args[0];
}
- BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated("", ValueType.LONG, true, 1, 0d,
- ImmutableList.of(
- 0,
- 1,
- 2,
- 3,
- 4
- ),
- ImmutableList.of(
- 0.95,
- 0.001,
- 0.0189,
- 0.03,
- 0.0001
- )
+ BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated(
+ "",
+ ValueType.LONG,
+ true,
+ 1,
+ 0d,
+ ImmutableList.of(
+ 0,
+ 1,
+ 2,
+ 3,
+ 4
+ ),
+ ImmutableList.of(
+ 0.95,
+ 0.001,
+ 0.0189,
+ 0.03,
+ 0.0001
+ )
);
BenchmarkColumnSchema zipfLowSchema = BenchmarkColumnSchema.makeZipf("", ValueType.LONG, true, 1, 0d, -1, 1000, 1d);
BenchmarkColumnSchema zipfHighSchema = BenchmarkColumnSchema.makeZipf(
@@ -144,6 +149,7 @@ public class LongCompressionBenchmarkFileGenerator
File dataFile = new File(dir, entry.getKey());
ColumnarLongsSerializer writer = CompressionFactory.getLongSerializer(
+ "long-benchmark",
new OffHeapMemorySegmentWriteOutMedium(),
"long",
ByteOrder.nativeOrder(),
diff --git a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java
index 67b68ed..cc525cb 100644
--- a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializer.java
@@ -33,14 +33,16 @@ import java.nio.channels.WritableByteChannel;
public class DoubleColumnSerializer implements GenericColumnSerializer<Object>
{
public static DoubleColumnSerializer create(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
CompressionStrategy compression
)
{
- return new DoubleColumnSerializer(segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression);
+ return new DoubleColumnSerializer(columnName, segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression);
}
+ private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final String filenameBase;
private final ByteOrder byteOrder;
@@ -48,12 +50,14 @@ public class DoubleColumnSerializer implements GenericColumnSerializer<Object>
private ColumnarDoublesSerializer writer;
private DoubleColumnSerializer(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
CompressionStrategy compression
)
{
+ this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.filenameBase = filenameBase;
this.byteOrder = byteOrder;
@@ -64,6 +68,7 @@ public class DoubleColumnSerializer implements GenericColumnSerializer<Object>
public void open() throws IOException
{
writer = CompressionFactory.getDoubleSerializer(
+ columnName,
segmentWriteOutMedium,
StringUtils.format("%s.double_column", filenameBase),
byteOrder,
diff --git a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java
index 3078a6a..02903eb 100644
--- a/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java
+++ b/processing/src/main/java/org/apache/druid/segment/DoubleColumnSerializerV2.java
@@ -45,6 +45,7 @@ import java.nio.channels.WritableByteChannel;
public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
{
public static DoubleColumnSerializerV2 create(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
CompressionStrategy compression,
@@ -52,6 +53,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
)
{
return new DoubleColumnSerializerV2(
+ columnName,
segmentWriteOutMedium,
filenameBase,
IndexIO.BYTE_ORDER,
@@ -60,6 +62,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
);
}
+ private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final String filenameBase;
private final ByteOrder byteOrder;
@@ -72,6 +75,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
private int rowCount = 0;
private DoubleColumnSerializerV2(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
@@ -79,6 +83,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
BitmapSerdeFactory bitmapSerdeFactory
)
{
+ this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.filenameBase = filenameBase;
this.byteOrder = byteOrder;
@@ -90,6 +95,7 @@ public class DoubleColumnSerializerV2 implements GenericColumnSerializer<Object>
public void open() throws IOException
{
writer = CompressionFactory.getDoubleSerializer(
+ columnName,
segmentWriteOutMedium,
StringUtils.format("%s.double_column", filenameBase),
byteOrder,
diff --git a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java
index 8d413d5..b96d520 100644
--- a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializer.java
@@ -33,14 +33,16 @@ import java.nio.channels.WritableByteChannel;
public class FloatColumnSerializer implements GenericColumnSerializer<Object>
{
public static FloatColumnSerializer create(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
CompressionStrategy compression
)
{
- return new FloatColumnSerializer(segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression);
+ return new FloatColumnSerializer(columnName, segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression);
}
+ private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final String filenameBase;
private final ByteOrder byteOrder;
@@ -48,12 +50,14 @@ public class FloatColumnSerializer implements GenericColumnSerializer<Object>
private ColumnarFloatsSerializer writer;
private FloatColumnSerializer(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
CompressionStrategy compression
)
{
+ this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.filenameBase = filenameBase;
this.byteOrder = byteOrder;
@@ -64,6 +68,7 @@ public class FloatColumnSerializer implements GenericColumnSerializer<Object>
public void open() throws IOException
{
writer = CompressionFactory.getFloatSerializer(
+ columnName,
segmentWriteOutMedium,
StringUtils.format("%s.float_column", filenameBase),
byteOrder,
diff --git a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java
index 41e7b68..b5371a0 100644
--- a/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java
+++ b/processing/src/main/java/org/apache/druid/segment/FloatColumnSerializerV2.java
@@ -45,6 +45,7 @@ import java.nio.channels.WritableByteChannel;
public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
{
public static FloatColumnSerializerV2 create(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
CompressionStrategy compression,
@@ -52,6 +53,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
)
{
return new FloatColumnSerializerV2(
+ columnName,
segmentWriteOutMedium,
filenameBase,
IndexIO.BYTE_ORDER,
@@ -60,6 +62,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
);
}
+ private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final String filenameBase;
private final ByteOrder byteOrder;
@@ -72,6 +75,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
private int rowCount = 0;
private FloatColumnSerializerV2(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
@@ -79,6 +83,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
BitmapSerdeFactory bitmapSerdeFactory
)
{
+ this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.filenameBase = filenameBase;
this.byteOrder = byteOrder;
@@ -90,6 +95,7 @@ public class FloatColumnSerializerV2 implements GenericColumnSerializer<Object>
public void open() throws IOException
{
writer = CompressionFactory.getFloatSerializer(
+ columnName,
segmentWriteOutMedium,
StringUtils.format("%s.float_column", filenameBase),
byteOrder,
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
index 91e202a..e1a92f7 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java
@@ -621,6 +621,7 @@ public class IndexMergerV9 implements IndexMerger
// If using default values for null use LongColumnSerializer to allow rollback to previous versions.
if (NullHandling.replaceWithDefault()) {
return LongColumnSerializer.create(
+ columnName,
segmentWriteOutMedium,
columnName,
indexSpec.getMetricCompression(),
@@ -628,6 +629,7 @@ public class IndexMergerV9 implements IndexMerger
);
} else {
return LongColumnSerializerV2.create(
+ columnName,
segmentWriteOutMedium,
columnName,
indexSpec.getMetricCompression(),
@@ -646,12 +648,14 @@ public class IndexMergerV9 implements IndexMerger
// If using default values for null use DoubleColumnSerializer to allow rollback to previous versions.
if (NullHandling.replaceWithDefault()) {
return DoubleColumnSerializer.create(
+ columnName,
segmentWriteOutMedium,
columnName,
indexSpec.getMetricCompression()
);
} else {
return DoubleColumnSerializerV2.create(
+ columnName,
segmentWriteOutMedium,
columnName,
indexSpec.getMetricCompression(),
@@ -669,12 +673,14 @@ public class IndexMergerV9 implements IndexMerger
// If using default values for null use FloatColumnSerializer to allow rollback to previous versions.
if (NullHandling.replaceWithDefault()) {
return FloatColumnSerializer.create(
+ columnName,
segmentWriteOutMedium,
columnName,
indexSpec.getMetricCompression()
);
} else {
return FloatColumnSerializerV2.create(
+ columnName,
segmentWriteOutMedium,
columnName,
indexSpec.getMetricCompression(),
diff --git a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java
index 8f4cc58..1f8d03b 100644
--- a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializer.java
@@ -36,15 +36,17 @@ import java.nio.channels.WritableByteChannel;
public class LongColumnSerializer implements GenericColumnSerializer<Object>
{
public static LongColumnSerializer create(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
CompressionStrategy compression,
CompressionFactory.LongEncodingStrategy encoding
)
{
- return new LongColumnSerializer(segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression, encoding);
+ return new LongColumnSerializer(columnName, segmentWriteOutMedium, filenameBase, IndexIO.BYTE_ORDER, compression, encoding);
}
+ private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final String filenameBase;
private final ByteOrder byteOrder;
@@ -53,6 +55,7 @@ public class LongColumnSerializer implements GenericColumnSerializer<Object>
private ColumnarLongsSerializer writer;
private LongColumnSerializer(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
@@ -60,6 +63,7 @@ public class LongColumnSerializer implements GenericColumnSerializer<Object>
CompressionFactory.LongEncodingStrategy encoding
)
{
+ this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.filenameBase = filenameBase;
this.byteOrder = byteOrder;
@@ -71,6 +75,7 @@ public class LongColumnSerializer implements GenericColumnSerializer<Object>
public void open() throws IOException
{
writer = CompressionFactory.getLongSerializer(
+ columnName,
segmentWriteOutMedium,
StringUtils.format("%s.long_column", filenameBase),
byteOrder,
diff --git a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
index cacac59..364a7af 100644
--- a/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
+++ b/processing/src/main/java/org/apache/druid/segment/LongColumnSerializerV2.java
@@ -45,6 +45,7 @@ import java.nio.channels.WritableByteChannel;
public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
{
public static LongColumnSerializerV2 create(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
CompressionStrategy compression,
@@ -53,6 +54,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
)
{
return new LongColumnSerializerV2(
+ columnName,
segmentWriteOutMedium,
filenameBase,
IndexIO.BYTE_ORDER,
@@ -62,6 +64,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
);
}
+ private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final String filenameBase;
private final ByteOrder byteOrder;
@@ -75,6 +78,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
private int rowCount = 0;
private LongColumnSerializerV2(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
@@ -83,6 +87,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
BitmapSerdeFactory bitmapSerdeFactory
)
{
+ this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.filenameBase = filenameBase;
this.byteOrder = byteOrder;
@@ -95,6 +100,7 @@ public class LongColumnSerializerV2 implements GenericColumnSerializer<Object>
public void open() throws IOException
{
writer = CompressionFactory.getLongSerializer(
+ columnName,
segmentWriteOutMedium,
StringUtils.format("%s.long_column", filenameBase),
byteOrder,
diff --git a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
index 2ccc0dd..cff7ef9 100644
--- a/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
+++ b/processing/src/main/java/org/apache/druid/segment/StringDimensionMergerV9.java
@@ -224,17 +224,20 @@ public class StringDimensionMergerV9 implements DimensionMergerV9
if (capabilities.hasMultipleValues()) {
if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
encodedValueSerializer = V3CompressedVSizeColumnarMultiIntsSerializer.create(
+ dimensionName,
segmentWriteOutMedium,
filenameBase,
cardinality,
compressionStrategy
);
} else {
- encodedValueSerializer = new VSizeColumnarMultiIntsSerializer(segmentWriteOutMedium, cardinality);
+ encodedValueSerializer =
+ new VSizeColumnarMultiIntsSerializer(dimensionName, segmentWriteOutMedium, cardinality);
}
} else {
if (compressionStrategy != CompressionStrategy.UNCOMPRESSED) {
encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
+ dimensionName,
segmentWriteOutMedium,
filenameBase,
cardinality,
diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
index c2247ac..2b4612e 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarDoublesSerializer.java
@@ -43,6 +43,7 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
.writeInt(x -> CompressedPools.BUFFER_SIZE / Double.BYTES)
.writeByte(x -> x.compression.getId());
+ private final String columnName;
private final GenericIndexedWriter<ByteBuffer> flattener;
private final CompressionStrategy compression;
@@ -51,12 +52,14 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
private ByteBuffer endBuffer;
BlockLayoutColumnarDoublesSerializer(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
CompressionStrategy compression
)
{
+ this.columnName = columnName;
this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(
segmentWriteOutMedium,
filenameBase,
@@ -76,6 +79,12 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
}
@Override
+ public int size()
+ {
+ return numInserted;
+ }
+
+ @Override
public void add(double value) throws IOException
{
if (endBuffer == null) {
@@ -89,6 +98,9 @@ public class BlockLayoutColumnarDoublesSerializer implements ColumnarDoublesSeri
endBuffer.putDouble(value);
++numInserted;
+ if (numInserted < 0) {
+ throw new ColumnCapacityExceededException(columnName);
+ }
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
index e247252..94a3ef6 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarFloatsSerializer.java
@@ -43,6 +43,7 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
.writeInt(x -> CompressedPools.BUFFER_SIZE / Float.BYTES)
.writeByte(x -> x.compression.getId());
+ private final String columnName;
private final GenericIndexedWriter<ByteBuffer> flattener;
private final CompressionStrategy compression;
@@ -51,12 +52,14 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
private ByteBuffer endBuffer;
BlockLayoutColumnarFloatsSerializer(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
CompressionStrategy compression
)
{
+ this.columnName = columnName;
this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(
segmentWriteOutMedium,
filenameBase,
@@ -94,6 +97,9 @@ public class BlockLayoutColumnarFloatsSerializer implements ColumnarFloatsSerial
}
endBuffer.putFloat(value);
++numInserted;
+ if (numInserted < 0) {
+ throw new ColumnCapacityExceededException(columnName);
+ }
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
index cb40402..ff47a34 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
@@ -42,6 +42,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
.writeInt(x -> x.sizePer)
.writeSomething(CompressionFactory.longEncodingWriter(x -> x.writer, x -> x.compression));
+ private final String columnName;
private final int sizePer;
private final CompressionFactory.LongEncodingWriter writer;
private final GenericIndexedWriter<ByteBuffer> flattener;
@@ -53,6 +54,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
private ByteBuffer endBuffer;
BlockLayoutColumnarLongsSerializer(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
@@ -60,6 +62,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
CompressionStrategy compression
)
{
+ this.columnName = columnName;
this.sizePer = writer.getBlockSize(CompressedPools.BUFFER_SIZE);
int bufferSize = writer.getNumBytes(sizePer);
this.flattener = GenericIndexedWriter.ofCompressedByteBuffers(segmentWriteOutMedium, filenameBase, compression, bufferSize);
@@ -100,6 +103,9 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
writer.write(value);
++numInserted;
+ if (numInserted < 0) {
+ throw new ColumnCapacityExceededException(columnName);
+ }
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/ColumnCapacityExceededException.java
similarity index 62%
copy from processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
copy to processing/src/main/java/org/apache/druid/segment/data/ColumnCapacityExceededException.java
index da1b977..f7a2a60 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/ColumnCapacityExceededException.java
@@ -19,15 +19,21 @@
package org.apache.druid.segment.data;
-import org.apache.druid.segment.serde.Serializer;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.StringUtils;
-import java.io.IOException;
-
-/**
- * Serializer that produces {@link ColumnarDoubles}.
- */
-public interface ColumnarDoublesSerializer extends Serializer
+public class ColumnCapacityExceededException extends RuntimeException
{
- void open() throws IOException;
- void add(double value) throws IOException;
+ @VisibleForTesting
+ public static String formatMessage(String columnName)
+ {
+ return StringUtils.format(
+ "Too many values to store for %s column, try reducing maxRowsPerSegment",
+ columnName
+ );
+ }
+ public ColumnCapacityExceededException(String columnName)
+ {
+ super(formatMessage(columnName));
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
index da1b977..f33cdee 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/ColumnarDoublesSerializer.java
@@ -29,5 +29,6 @@ import java.io.IOException;
public interface ColumnarDoublesSerializer extends Serializer
{
void open() throws IOException;
+ int size();
void add(double value) throws IOException;
}
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
index 0d82b4c..a0442d9 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializer.java
@@ -25,7 +25,6 @@ import org.apache.druid.segment.serde.MetaSerdeHelper;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import javax.annotation.Nullable;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -44,6 +43,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
.writeInt(x -> x.chunkFactor)
.writeByte(x -> x.compression.getId());
+ private final String columnName;
private final int chunkFactor;
private final CompressionStrategy compression;
private final GenericIndexedWriter<ByteBuffer> flattener;
@@ -53,6 +53,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
private ByteBuffer endBuffer;
CompressedColumnarIntsSerializer(
+ final String columnName,
final SegmentWriteOutMedium segmentWriteOutMedium,
final String filenameBase,
final int chunkFactor,
@@ -61,6 +62,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
)
{
this(
+ columnName,
segmentWriteOutMedium,
chunkFactor,
byteOrder,
@@ -75,6 +77,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
}
CompressedColumnarIntsSerializer(
+ final String columnName,
final SegmentWriteOutMedium segmentWriteOutMedium,
final int chunkFactor,
final ByteOrder byteOrder,
@@ -82,6 +85,7 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
final GenericIndexedWriter<ByteBuffer> flattener
)
{
+ this.columnName = columnName;
this.chunkFactor = chunkFactor;
this.compression = compression;
this.flattener = flattener;
@@ -110,6 +114,9 @@ public class CompressedColumnarIntsSerializer extends SingleValueColumnarIntsSer
}
endBuffer.putInt(val);
numInserted++;
+ if (numInserted < 0) {
+ throw new ColumnCapacityExceededException(columnName);
+ }
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java
index f58ea59..d9b3cf9 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedColumnarIntsSupplier.java
@@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.CompressedPools;
import org.apache.druid.segment.serde.MetaSerdeHelper;
@@ -133,6 +134,25 @@ public class CompressedColumnarIntsSupplier implements WritableSupplier<Columnar
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
+ public static CompressedColumnarIntsSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order, SmooshedFileMapper mapper)
+ {
+ byte versionFromBuffer = buffer.get();
+
+ if (versionFromBuffer == VERSION) {
+ final int totalSize = buffer.getInt();
+ final int sizePer = buffer.getInt();
+ final CompressionStrategy compression = CompressionStrategy.forId(buffer.get());
+ return new CompressedColumnarIntsSupplier(
+ totalSize,
+ sizePer,
+ GenericIndexed.read(buffer, new DecompressingByteBufferObjectStrategy(order, compression), mapper),
+ compression
+ );
+ }
+
+ throw new IAE("Unknown version[%s]", versionFromBuffer);
+ }
+
@VisibleForTesting
static CompressedColumnarIntsSupplier fromIntBuffer(
final IntBuffer buffer,
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
index 0f6c54a..6060be2 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializer.java
@@ -47,6 +47,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
.writeByte(x -> x.compression.getId());
public static CompressedVSizeColumnarIntsSerializer create(
+ final String columnName,
final SegmentWriteOutMedium segmentWriteOutMedium,
final String filenameBase,
final int maxValue,
@@ -54,6 +55,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
)
{
return new CompressedVSizeColumnarIntsSerializer(
+ columnName,
segmentWriteOutMedium,
filenameBase,
maxValue,
@@ -63,6 +65,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
);
}
+ private final String columnName;
private final int numBytes;
private final int chunkFactor;
private final boolean isBigEndian;
@@ -75,6 +78,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
private ByteBuffer endBuffer;
CompressedVSizeColumnarIntsSerializer(
+ final String columnName,
final SegmentWriteOutMedium segmentWriteOutMedium,
final String filenameBase,
final int maxValue,
@@ -84,6 +88,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
)
{
this(
+ columnName,
segmentWriteOutMedium,
maxValue,
chunkFactor,
@@ -99,6 +104,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
}
CompressedVSizeColumnarIntsSerializer(
+ final String columnName,
final SegmentWriteOutMedium segmentWriteOutMedium,
final int maxValue,
final int chunkFactor,
@@ -107,6 +113,7 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
final GenericIndexedWriter<ByteBuffer> flattener
)
{
+ this.columnName = columnName;
this.numBytes = VSizeColumnarInts.getNumBytesForMax(maxValue);
this.chunkFactor = chunkFactor;
int chunkBytes = chunkFactor * numBytes;
@@ -149,6 +156,9 @@ public class CompressedVSizeColumnarIntsSerializer extends SingleValueColumnarIn
endBuffer.put(intBuffer.array(), 0, numBytes);
}
numInserted++;
+ if (numInserted < 0) {
+ throw new ColumnCapacityExceededException(columnName);
+ }
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
index 323945c..84d8b8d 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.CompressedPools;
import org.apache.druid.segment.serde.MetaSerdeHelper;
@@ -167,6 +168,34 @@ public class CompressedVSizeColumnarIntsSupplier implements WritableSupplier<Col
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
+ public static CompressedVSizeColumnarIntsSupplier fromByteBuffer(
+ ByteBuffer buffer,
+ ByteOrder order,
+ SmooshedFileMapper mapper
+ )
+ {
+ byte versionFromBuffer = buffer.get();
+
+ if (versionFromBuffer == VERSION) {
+ final int numBytes = buffer.get();
+ final int totalSize = buffer.getInt();
+ final int sizePer = buffer.getInt();
+
+ final CompressionStrategy compression = CompressionStrategy.forId(buffer.get());
+
+ return new CompressedVSizeColumnarIntsSupplier(
+ totalSize,
+ sizePer,
+ numBytes,
+ GenericIndexed.read(buffer, new DecompressingByteBufferObjectStrategy(order, compression), mapper),
+ compression
+ );
+
+ }
+
+ throw new IAE("Unknown version[%s]", versionFromBuffer);
+ }
+
@VisibleForTesting
public static CompressedVSizeColumnarIntsSupplier fromList(
final IntList list,
diff --git a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
index ddacb90..7bf647a 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/CompressionFactory.java
@@ -331,6 +331,7 @@ public class CompressionFactory
}
public static ColumnarLongsSerializer getLongSerializer(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder order,
@@ -339,12 +340,23 @@ public class CompressionFactory
)
{
if (encodingStrategy == LongEncodingStrategy.AUTO) {
- return new IntermediateColumnarLongsSerializer(segmentWriteOutMedium, filenameBase, order, compressionStrategy);
+ return new IntermediateColumnarLongsSerializer(
+ columnName,
+ segmentWriteOutMedium,
+ filenameBase,
+ order,
+ compressionStrategy
+ );
} else if (encodingStrategy == LongEncodingStrategy.LONGS) {
if (compressionStrategy == CompressionStrategy.NONE) {
- return new EntireLayoutColumnarLongsSerializer(segmentWriteOutMedium, new LongsLongEncodingWriter(order));
+ return new EntireLayoutColumnarLongsSerializer(
+ columnName,
+ segmentWriteOutMedium,
+ new LongsLongEncodingWriter(order)
+ );
} else {
return new BlockLayoutColumnarLongsSerializer(
+ columnName,
segmentWriteOutMedium,
filenameBase,
order,
@@ -375,6 +387,7 @@ public class CompressionFactory
}
public static ColumnarFloatsSerializer getFloatSerializer(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder order,
@@ -382,9 +395,15 @@ public class CompressionFactory
)
{
if (compressionStrategy == CompressionStrategy.NONE) {
- return new EntireLayoutColumnarFloatsSerializer(segmentWriteOutMedium, order);
+ return new EntireLayoutColumnarFloatsSerializer(columnName, segmentWriteOutMedium, order);
} else {
- return new BlockLayoutColumnarFloatsSerializer(segmentWriteOutMedium, filenameBase, order, compressionStrategy);
+ return new BlockLayoutColumnarFloatsSerializer(
+ columnName,
+ segmentWriteOutMedium,
+ filenameBase,
+ order,
+ compressionStrategy
+ );
}
}
@@ -406,6 +425,7 @@ public class CompressionFactory
}
public static ColumnarDoublesSerializer getDoubleSerializer(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder byteOrder,
@@ -413,9 +433,15 @@ public class CompressionFactory
)
{
if (compression == CompressionStrategy.NONE) {
- return new EntireLayoutColumnarDoublesSerializer(segmentWriteOutMedium, byteOrder);
+ return new EntireLayoutColumnarDoublesSerializer(columnName, segmentWriteOutMedium, byteOrder);
} else {
- return new BlockLayoutColumnarDoublesSerializer(segmentWriteOutMedium, filenameBase, byteOrder, compression);
+ return new BlockLayoutColumnarDoublesSerializer(
+ columnName,
+ segmentWriteOutMedium,
+ filenameBase,
+ byteOrder,
+ compression
+ );
}
}
}
diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java
index 540b0b7..b330319 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java
@@ -40,14 +40,16 @@ public class EntireLayoutColumnarDoublesSerializer implements ColumnarDoublesSer
.writeInt(x -> 0)
.writeByte(x -> CompressionStrategy.NONE.getId());
+ private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final ByteBuffer orderBuffer;
private WriteOutBytes valuesOut;
private int numInserted = 0;
- public EntireLayoutColumnarDoublesSerializer(SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order)
+ public EntireLayoutColumnarDoublesSerializer(String columnName, SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order)
{
+ this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.orderBuffer = ByteBuffer.allocate(Double.BYTES);
orderBuffer.order(order);
@@ -60,12 +62,21 @@ public class EntireLayoutColumnarDoublesSerializer implements ColumnarDoublesSer
}
@Override
+ public int size()
+ {
+ return numInserted;
+ }
+
+ @Override
public void add(double value) throws IOException
{
orderBuffer.rewind();
orderBuffer.putDouble(value);
valuesOut.write(orderBuffer.array());
++numInserted;
+ if (numInserted < 0) {
+ throw new ColumnCapacityExceededException(columnName);
+ }
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java
index 1209b76..75b7290 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java
@@ -39,16 +39,18 @@ public class EntireLayoutColumnarFloatsSerializer implements ColumnarFloatsSeria
.writeInt(x -> 0)
.writeByte(x -> CompressionStrategy.NONE.getId());
+ private final String columnName;
private final boolean isLittleEndian;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private WriteOutBytes valuesOut;
private int numInserted = 0;
- EntireLayoutColumnarFloatsSerializer(SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order)
+ EntireLayoutColumnarFloatsSerializer(String columnName, SegmentWriteOutMedium segmentWriteOutMedium, ByteOrder order)
{
+ this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
- isLittleEndian = order.equals(ByteOrder.LITTLE_ENDIAN);
+ this.isLittleEndian = order.equals(ByteOrder.LITTLE_ENDIAN);
}
@Override
@@ -73,6 +75,9 @@ public class EntireLayoutColumnarFloatsSerializer implements ColumnarFloatsSeria
}
valuesOut.writeInt(valueBits);
++numInserted;
+ if (numInserted < 0) {
+ throw new ColumnCapacityExceededException(columnName);
+ }
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarLongsSerializer.java
index b29081b..9513836 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarLongsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarLongsSerializer.java
@@ -38,6 +38,7 @@ public class EntireLayoutColumnarLongsSerializer implements ColumnarLongsSeriali
.writeInt(x -> 0)
.writeSomething(CompressionFactory.longEncodingWriter(x -> x.writer, x -> CompressionStrategy.NONE));
+ private final String columnName;
private final CompressionFactory.LongEncodingWriter writer;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private WriteOutBytes valuesOut;
@@ -45,10 +46,12 @@ public class EntireLayoutColumnarLongsSerializer implements ColumnarLongsSeriali
private int numInserted = 0;
EntireLayoutColumnarLongsSerializer(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
CompressionFactory.LongEncodingWriter writer
)
{
+ this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.writer = writer;
}
@@ -71,6 +74,9 @@ public class EntireLayoutColumnarLongsSerializer implements ColumnarLongsSeriali
{
writer.write(value);
++numInserted;
+ if (numInserted < 0) {
+ throw new ColumnCapacityExceededException(columnName);
+ }
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
index e08e463..c0f9355 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/IntermediateColumnarLongsSerializer.java
@@ -40,6 +40,7 @@ import java.nio.channels.WritableByteChannel;
*/
public class IntermediateColumnarLongsSerializer implements ColumnarLongsSerializer
{
+ private final String columnName;
private final SegmentWriteOutMedium segmentWriteOutMedium;
private final String filenameBase;
private final ByteOrder order;
@@ -59,12 +60,14 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali
private ColumnarLongsSerializer delegate;
IntermediateColumnarLongsSerializer(
+ String columnName,
SegmentWriteOutMedium segmentWriteOutMedium,
String filenameBase,
ByteOrder order,
CompressionStrategy compression
)
{
+ this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.filenameBase = filenameBase;
this.order = order;
@@ -92,6 +95,9 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali
}
tempOut.add(value);
++numInserted;
+ if (numInserted < 0) {
+ throw new ColumnCapacityExceededException(columnName);
+ }
if (uniqueValues.size() <= CompressionFactory.MAX_TABLE_SIZE && !uniqueValues.containsKey(value)) {
uniqueValues.put(value, uniqueValues.size());
valuesAddedInOrder.add(value);
@@ -127,9 +133,10 @@ public class IntermediateColumnarLongsSerializer implements ColumnarLongsSeriali
}
if (compression == CompressionStrategy.NONE) {
- delegate = new EntireLayoutColumnarLongsSerializer(segmentWriteOutMedium, writer);
+ delegate = new EntireLayoutColumnarLongsSerializer(columnName, segmentWriteOutMedium, writer);
} else {
delegate = new BlockLayoutColumnarLongsSerializer(
+ columnName,
segmentWriteOutMedium,
filenameBase,
order,
diff --git a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java
index ebf0c4b..f669029 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializer.java
@@ -36,6 +36,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI
private static final byte VERSION = V3CompressedVSizeColumnarMultiIntsSupplier.VERSION;
public static V3CompressedVSizeColumnarMultiIntsSerializer create(
+ final String columnName,
final SegmentWriteOutMedium segmentWriteOutMedium,
final String filenameBase,
final int maxValue,
@@ -43,7 +44,9 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI
)
{
return new V3CompressedVSizeColumnarMultiIntsSerializer(
+ columnName,
new CompressedColumnarIntsSerializer(
+ columnName,
segmentWriteOutMedium,
filenameBase,
CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER,
@@ -51,6 +54,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI
compression
),
new CompressedVSizeColumnarIntsSerializer(
+ columnName,
segmentWriteOutMedium,
filenameBase,
maxValue,
@@ -61,16 +65,19 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI
);
}
+ private final String columnName;
private final CompressedColumnarIntsSerializer offsetWriter;
private final CompressedVSizeColumnarIntsSerializer valueWriter;
private int offset;
private boolean lastOffsetWritten = false;
V3CompressedVSizeColumnarMultiIntsSerializer(
+ String columnName,
CompressedColumnarIntsSerializer offsetWriter,
CompressedVSizeColumnarIntsSerializer valueWriter
)
{
+ this.columnName = columnName;
this.offsetWriter = offsetWriter;
this.valueWriter = valueWriter;
this.offset = 0;
@@ -95,6 +102,9 @@ public class V3CompressedVSizeColumnarMultiIntsSerializer extends ColumnarMultiI
valueWriter.addValue(ints.get(i));
}
offset += numValues;
+ if (offset < 0) {
+ throw new ColumnCapacityExceededException(columnName);
+ }
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java
index 2beb43d..3bb934c 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSupplier.java
@@ -24,6 +24,7 @@ import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -73,6 +74,26 @@ public class V3CompressedVSizeColumnarMultiIntsSupplier implements WritableSuppl
throw new IAE("Unknown version[%s]", versionFromBuffer);
}
+ public static V3CompressedVSizeColumnarMultiIntsSupplier fromByteBuffer(ByteBuffer buffer, ByteOrder order, SmooshedFileMapper mapper)
+ {
+ byte versionFromBuffer = buffer.get();
+
+ if (versionFromBuffer == VERSION) {
+ CompressedColumnarIntsSupplier offsetSupplier = CompressedColumnarIntsSupplier.fromByteBuffer(
+ buffer,
+ order,
+ mapper
+ );
+ CompressedVSizeColumnarIntsSupplier valueSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
+ buffer,
+ order,
+ mapper
+ );
+ return new V3CompressedVSizeColumnarMultiIntsSupplier(offsetSupplier, valueSupplier);
+ }
+ throw new IAE("Unknown version[%s]", versionFromBuffer);
+ }
+
@VisibleForTesting
public static V3CompressedVSizeColumnarMultiIntsSupplier fromIterable(
final Iterable<IndexedInts> objectsIterable,
diff --git a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java
index 16fe48d..088f2b5 100644
--- a/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java
+++ b/processing/src/main/java/org/apache/druid/segment/data/VSizeColumnarMultiIntsSerializer.java
@@ -81,6 +81,7 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize
abstract void write(WriteOutBytes out, int v) throws IOException;
}
+ private final String columnName;
private final int maxId;
private final WriteInt writeInt;
@@ -92,8 +93,13 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize
private int numWritten = 0;
private boolean numBytesForMaxWritten = false;
- public VSizeColumnarMultiIntsSerializer(SegmentWriteOutMedium segmentWriteOutMedium, int maxId)
+ public VSizeColumnarMultiIntsSerializer(
+ String columnName,
+ SegmentWriteOutMedium segmentWriteOutMedium,
+ int maxId
+ )
{
+ this.columnName = columnName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.maxId = maxId;
this.writeInt = WriteInt.values()[VSizeColumnarInts.getNumBytesForMax(maxId) - 1];
@@ -120,6 +126,9 @@ public class VSizeColumnarMultiIntsSerializer extends ColumnarMultiIntsSerialize
headerOut.writeInt(Ints.checkedCast(valuesOut.size()));
++numWritten;
+ if (numWritten < 0) {
+ throw new ColumnCapacityExceededException(columnName);
+ }
}
@Override
diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
index 9c852ed..43ea5eb 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedColumnarIntsSerializerTest.java
@@ -33,22 +33,27 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.WriteOutBytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
@RunWith(Parameterized.class)
public class CompressedColumnarIntsSerializerTest
@@ -64,6 +69,9 @@ public class CompressedColumnarIntsSerializerTest
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
public CompressedColumnarIntsSerializerTest(
CompressionStrategy compressionStrategy,
ByteOrder byteOrder
@@ -99,6 +107,77 @@ public class CompressedColumnarIntsSerializerTest
segmentWriteOutMedium.close();
}
+ @Test
+ public void testSmallData() throws Exception
+ {
+ // less than one chunk
+ for (int maxValue : MAX_VALUES) {
+ for (int chunkFactor : CHUNK_FACTORS) {
+ generateVals(rand.nextInt(chunkFactor), maxValue);
+ checkSerializedSizeAndData(chunkFactor);
+ }
+ }
+ }
+
+ @Test
+ public void testLargeData() throws Exception
+ {
+ // more than one chunk
+ for (int maxValue : MAX_VALUES) {
+ for (int chunkFactor : CHUNK_FACTORS) {
+ generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue);
+ checkSerializedSizeAndData(chunkFactor);
+ }
+ }
+ }
+
+ @Test
+ public void testWriteEmpty() throws Exception
+ {
+ vals = new int[0];
+ checkSerializedSizeAndData(2);
+ }
+
+ @Test
+ public void testMultiValueFileLargeData() throws Exception
+ {
+ // more than one chunk
+ for (int maxValue : MAX_VALUES) {
+ for (int chunkFactor : CHUNK_FACTORS) {
+ generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue);
+ checkV2SerializedSizeAndData(chunkFactor);
+ }
+ }
+ }
+
+ // this test takes ~30 minutes to run
+ @Ignore
+ @Test
+ public void testTooManyValues() throws IOException
+ {
+ expectedException.expect(ColumnCapacityExceededException.class);
+ expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
+ try (
+ SegmentWriteOutMedium segmentWriteOutMedium =
+ TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
+ ) {
+ CompressedColumnarIntsSerializer serializer = new CompressedColumnarIntsSerializer(
+ "test",
+ segmentWriteOutMedium,
+ "test",
+ CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER,
+ byteOrder,
+ compressionStrategy
+ );
+ serializer.open();
+
+ final long numRows = Integer.MAX_VALUE + 100L;
+ for (long i = 0L; i < numRows; i++) {
+ serializer.addValue(ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE));
+ }
+ }
+ }
+
private void generateVals(final int totalSize, final int maxValue)
{
vals = new int[totalSize];
@@ -112,6 +191,7 @@ public class CompressedColumnarIntsSerializerTest
FileSmoosher smoosher = new FileSmoosher(temporaryFolder.newFolder());
CompressedColumnarIntsSerializer writer = new CompressedColumnarIntsSerializer(
+ "test",
segmentWriteOutMedium,
"test",
chunkFactor,
@@ -149,44 +229,13 @@ public class CompressedColumnarIntsSerializerTest
CloseQuietly.close(columnarInts);
}
- @Test
- public void testSmallData() throws Exception
- {
- // less than one chunk
- for (int maxValue : MAX_VALUES) {
- for (int chunkFactor : CHUNK_FACTORS) {
- generateVals(rand.nextInt(chunkFactor), maxValue);
- checkSerializedSizeAndData(chunkFactor);
- }
- }
- }
-
- @Test
- public void testLargeData() throws Exception
- {
- // more than one chunk
- for (int maxValue : MAX_VALUES) {
- for (int chunkFactor : CHUNK_FACTORS) {
- generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue);
- checkSerializedSizeAndData(chunkFactor);
- }
- }
- }
-
- @Test
- public void testWriteEmpty() throws Exception
- {
- vals = new int[0];
- checkSerializedSizeAndData(2);
- }
-
-
private void checkV2SerializedSizeAndData(int chunkFactor) throws Exception
{
File tmpDirectory = FileUtils.createTempDir(StringUtils.format("CompressedIntsIndexedWriterTest_%d", chunkFactor));
FileSmoosher smoosher = new FileSmoosher(tmpDirectory);
CompressedColumnarIntsSerializer writer = new CompressedColumnarIntsSerializer(
+ "test",
segmentWriteOutMedium,
chunkFactor,
byteOrder,
@@ -223,16 +272,4 @@ public class CompressedColumnarIntsSerializerTest
CloseQuietly.close(columnarInts);
mapper.close();
}
-
- @Test
- public void testMultiValueFileLargeData() throws Exception
- {
- // more than one chunk
- for (int maxValue : MAX_VALUES) {
- for (int chunkFactor : CHUNK_FACTORS) {
- generateVals((rand.nextInt(5) + 5) * chunkFactor + rand.nextInt(chunkFactor), maxValue);
- checkV2SerializedSizeAndData(chunkFactor);
- }
- }
- }
}
diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
similarity index 64%
copy from processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
copy to processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
index 4b78b52..284aea3 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedDoublesSerdeTest.java
@@ -20,13 +20,19 @@
package org.apache.druid.segment.data;
import com.google.common.base.Supplier;
-import com.google.common.primitives.Floats;
+import com.google.common.primitives.Doubles;
import it.unimi.dsi.fastutil.ints.IntArrays;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -42,8 +48,15 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+/**
+ * This is a copy-pasta of {@link CompressedFloatsSerdeTest} without {@link CompressedFloatsSerdeTest#testSupplierSerde}
+ * because doubles do not have a supplier serde (e.g. {@link CompressedColumnarFloatsSupplier} or
+ * {@link CompressedColumnarLongsSupplier}).
+ *
+ * It is not important that it remain a copy, the committer is just lazy
+ */
@RunWith(Parameterized.class)
-public class CompressedFloatsSerdeTest
+public class CompressedDoublesSerdeTest
{
@Parameterized.Parameters(name = "{0} {1} {2}")
public static Iterable<Object[]> compressionStrategies()
@@ -58,22 +71,35 @@ public class CompressedFloatsSerdeTest
private static final double DELTA = 0.00001;
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
protected final CompressionStrategy compressionStrategy;
protected final ByteOrder order;
- private final float[] values0 = {};
- private final float[] values1 = {0f, 1f, 1f, 0f, 1f, 1f, 1f, 1f, 0f, 0f, 1f, 1f};
- private final float[] values2 = {13.2f, 6.1f, 0.001f, 123f, 12572f, 123.1f, 784.4f, 6892.8634f, 8.341111f};
- private final float[] values3 = {0.001f, 0.001f, 0.001f, 0.001f, 0.001f, 100f, 100f, 100f, 100f, 100f};
- private final float[] values4 = {0f, 0f, 0f, 0f, 0.01f, 0f, 0f, 0f, 21.22f, 0f, 0f, 0f, 0f, 0f, 0f};
- private final float[] values5 = {123.16f, 1.12f, 62.00f, 462.12f, 517.71f, 56.54f, 971.32f, 824.22f, 472.12f, 625.26f};
- private final float[] values6 = {1000000f, 1000001f, 1000002f, 1000003f, 1000004f, 1000005f, 1000006f, 1000007f, 1000008f};
- private final float[] values7 = {
- Float.POSITIVE_INFINITY, Float.NEGATIVE_INFINITY, 12378.5734f, -12718243.7496f, -93653653.1f, 12743153.385534f,
- 21431.414538f, 65487435436632.123f, -43734526234564.65f
+ private final double[] values0 = {};
+ private final double[] values1 = {0, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1};
+ private final double[] values2 = {13.2, 6.1, 0.001, 123, 12572, 123.1, 784.4, 6892.8634, 8.341111};
+ private final double[] values3 = {0.001, 0.001, 0.001, 0.001, 0.001, 100, 100, 100, 100, 100};
+ private final double[] values4 = {0, 0, 0, 0, 0.01, 0, 0, 0, 21.22, 0, 0, 0, 0, 0, 0};
+ private final double[] values5 = {123.16, 1.12, 62.00, 462.12, 517.71, 56.54, 971.32, 824.22, 472.12, 625.26};
+ private final double[] values6 = {1000000, 1000001, 1000002, 1000003, 1000004, 1000005, 1000006, 1000007, 1000008};
+ private final double[] values7 = {
+ Double.POSITIVE_INFINITY,
+ Double.NEGATIVE_INFINITY,
+ 12378.5734,
+ -12718243.7496,
+ -93653653.1,
+ 12743153.385534,
+ 21431.414538,
+ 65487435436632.123,
+ -43734526234564.65
};
- public CompressedFloatsSerdeTest(
+ public CompressedDoublesSerdeTest(
CompressionStrategy compressionStrategy,
ByteOrder order
)
@@ -98,16 +124,44 @@ public class CompressedFloatsSerdeTest
@Test
public void testChunkSerde() throws Exception
{
- float[] chunk = new float[10000];
+ double[] chunk = new double[10000];
for (int i = 0; i < 10000; i++) {
chunk[i] = i;
}
testWithValues(chunk);
}
- public void testWithValues(float[] values) throws Exception
+ // this test takes ~45 minutes to run
+ @Ignore
+ @Test
+ public void testTooManyValues() throws IOException
+ {
+ expectedException.expect(ColumnCapacityExceededException.class);
+ expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
+ try (
+ SegmentWriteOutMedium segmentWriteOutMedium =
+ TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
+ ) {
+ ColumnarDoublesSerializer serializer = CompressionFactory.getDoubleSerializer(
+ "test",
+ segmentWriteOutMedium,
+ "test",
+ order,
+ compressionStrategy
+ );
+ serializer.open();
+
+ final long numRows = Integer.MAX_VALUE + 100L;
+ for (long i = 0L; i < numRows; i++) {
+ serializer.add(ThreadLocalRandom.current().nextDouble());
+ }
+ }
+ }
+
+ public void testWithValues(double[] values) throws Exception
{
- ColumnarFloatsSerializer serializer = CompressionFactory.getFloatSerializer(
+ ColumnarDoublesSerializer serializer = CompressionFactory.getDoubleSerializer(
+ "test",
new OffHeapMemorySegmentWriteOutMedium(),
"test",
order,
@@ -115,7 +169,7 @@ public class CompressedFloatsSerdeTest
);
serializer.open();
- for (float value : values) {
+ for (double value : values) {
serializer.add(value);
}
Assert.assertEquals(values.length, serializer.size());
@@ -123,27 +177,26 @@ public class CompressedFloatsSerdeTest
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializer.writeTo(Channels.newChannel(baos), null);
Assert.assertEquals(baos.size(), serializer.getSerializedSize());
- CompressedColumnarFloatsSupplier supplier = CompressedColumnarFloatsSupplier
+ Supplier<ColumnarDoubles> supplier = CompressedColumnarDoublesSuppliers
.fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order);
- ColumnarFloats floats = supplier.get();
+ ColumnarDoubles doubles = supplier.get();
- assertIndexMatchesVals(floats, values);
+ assertIndexMatchesVals(doubles, values);
for (int i = 0; i < 10; i++) {
int a = (int) (ThreadLocalRandom.current().nextDouble() * values.length);
int b = (int) (ThreadLocalRandom.current().nextDouble() * values.length);
int start = a < b ? a : b;
int end = a < b ? b : a;
- tryFill(floats, values, start, end - start);
+ tryFill(doubles, values, start, end - start);
}
- testSupplierSerde(supplier, values);
- testConcurrentThreadReads(supplier, floats, values);
+ testConcurrentThreadReads(supplier, doubles, values);
- floats.close();
+ doubles.close();
}
- private void tryFill(ColumnarFloats indexed, float[] vals, final int startIndex, final int size)
+ private void tryFill(ColumnarDoubles indexed, double[] vals, final int startIndex, final int size)
{
- float[] filled = new float[size];
+ double[] filled = new double[size];
indexed.get(filled, startIndex, filled.length);
for (int i = startIndex; i < filled.length; i++) {
@@ -151,7 +204,7 @@ public class CompressedFloatsSerdeTest
}
}
- private void assertIndexMatchesVals(ColumnarFloats indexed, float[] vals)
+ private void assertIndexMatchesVals(ColumnarDoubles indexed, double[] vals)
{
Assert.assertEquals(vals.length, indexed.size());
@@ -171,25 +224,12 @@ public class CompressedFloatsSerdeTest
}
}
- private void testSupplierSerde(CompressedColumnarFloatsSupplier supplier, float[] vals) throws IOException
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- supplier.writeTo(Channels.newChannel(baos), null);
-
- final byte[] bytes = baos.toByteArray();
- Assert.assertEquals(supplier.getSerializedSize(), bytes.length);
- CompressedColumnarFloatsSupplier anotherSupplier = CompressedColumnarFloatsSupplier.fromByteBuffer(
- ByteBuffer.wrap(bytes), order
- );
- ColumnarFloats indexed = anotherSupplier.get();
- assertIndexMatchesVals(indexed, vals);
- }
-
// This test attempts to cause a race condition with the DirectByteBuffers, it's non-deterministic in causing it,
// which sucks but I can't think of a way to deterministically cause it...
private void testConcurrentThreadReads(
- final Supplier<ColumnarFloats> supplier,
- final ColumnarFloats indexed, final float[] vals
+ final Supplier<ColumnarDoubles> supplier,
+ final ColumnarDoubles indexed,
+ final double[] vals
) throws Exception
{
final AtomicReference<String> reason = new AtomicReference<String>("none");
@@ -216,9 +256,9 @@ public class CompressedFloatsSerdeTest
try {
for (int i = 0; i < numRuns; ++i) {
for (int j = 0; j < indexed.size(); ++j) {
- final float val = vals[j];
- final float indexedVal = indexed.get(j);
- if (Floats.compare(val, indexedVal) != 0) {
+ final double val = vals[j];
+ final double indexedVal = indexed.get(j);
+ if (Doubles.compare(val, indexedVal) != 0) {
failureHappened.set(true);
reason.set(StringUtils.format("Thread1[%d]: %f != %f", j, val, indexedVal));
stopLatch.countDown();
@@ -237,7 +277,7 @@ public class CompressedFloatsSerdeTest
}
}).start();
- final ColumnarFloats indexed2 = supplier.get();
+ final ColumnarDoubles indexed2 = supplier.get();
try {
new Thread(new Runnable()
{
@@ -255,9 +295,9 @@ public class CompressedFloatsSerdeTest
try {
for (int i = 0; i < numRuns; ++i) {
for (int j = indexed2.size() - 1; j >= 0; --j) {
- final float val = vals[j];
- final float indexedVal = indexed2.get(j);
- if (Floats.compare(val, indexedVal) != 0) {
+ final double val = vals[j];
+ final double indexedVal = indexed2.get(j);
+ if (Doubles.compare(val, indexedVal) != 0) {
failureHappened.set(true);
reason.set(StringUtils.format("Thread2[%d]: %f != %f", j, val, indexedVal));
stopLatch.countDown();
diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
index 4b78b52..d11c089 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedFloatsSerdeTest.java
@@ -25,8 +25,14 @@ import it.unimi.dsi.fastutil.ints.IntArrays;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -58,6 +64,12 @@ public class CompressedFloatsSerdeTest
private static final double DELTA = 0.00001;
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
protected final CompressionStrategy compressionStrategy;
protected final ByteOrder order;
@@ -105,9 +117,37 @@ public class CompressedFloatsSerdeTest
testWithValues(chunk);
}
+ // this test takes ~30 minutes to run
+ @Ignore
+ @Test
+ public void testTooManyValues() throws IOException
+ {
+ expectedException.expect(ColumnCapacityExceededException.class);
+ expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
+ try (
+ SegmentWriteOutMedium segmentWriteOutMedium =
+ TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
+ ) {
+ ColumnarFloatsSerializer serializer = CompressionFactory.getFloatSerializer(
+ "test",
+ segmentWriteOutMedium,
+ "test",
+ order,
+ compressionStrategy
+ );
+ serializer.open();
+
+ final long numRows = Integer.MAX_VALUE + 100L;
+ for (long i = 0L; i < numRows; i++) {
+ serializer.add(ThreadLocalRandom.current().nextFloat());
+ }
+ }
+ }
+
public void testWithValues(float[] values) throws Exception
{
ColumnarFloatsSerializer serializer = CompressionFactory.getFloatSerializer(
+ "test",
new OffHeapMemorySegmentWriteOutMedium(),
"test",
order,
diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
index ca6dd69..1186d32 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
@@ -95,6 +95,7 @@ public class CompressedLongsAutoEncodingSerdeTest
public void testValues(long[] values) throws Exception
{
ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer(
+ "test",
new OffHeapMemorySegmentWriteOutMedium(),
"test",
order,
diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
index 0fde252..675c494 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedLongsSerdeTest.java
@@ -25,8 +25,14 @@ import it.unimi.dsi.fastutil.ints.IntArrays;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -58,6 +64,12 @@ public class CompressedLongsSerdeTest
return data;
}
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
protected final CompressionFactory.LongEncodingStrategy encodingStrategy;
protected final CompressionStrategy compressionStrategy;
protected final ByteOrder order;
@@ -121,6 +133,38 @@ public class CompressedLongsSerdeTest
testWithValues(chunk);
}
+ // this test takes ~50 minutes to run (even skipping 'auto')
+ @Ignore
+ @Test
+ public void testTooManyValues() throws IOException
+ {
+ // uncomment this if 'auto' encoded long unbounded heap usage gets put in check and this can actually pass
+ if (encodingStrategy.equals(CompressionFactory.LongEncodingStrategy.AUTO)) {
+ return;
+ }
+ expectedException.expect(ColumnCapacityExceededException.class);
+ expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
+ try (
+ SegmentWriteOutMedium segmentWriteOutMedium =
+ TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
+ ) {
+ ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer(
+ "test",
+ segmentWriteOutMedium,
+ "test",
+ order,
+ encodingStrategy,
+ compressionStrategy
+ );
+ serializer.open();
+
+ final long numRows = Integer.MAX_VALUE + 100L;
+ for (long i = 0L; i < numRows; i++) {
+ serializer.add(ThreadLocalRandom.current().nextLong());
+ }
+ }
+ }
+
public void testWithValues(long[] values) throws Exception
{
testValues(values);
@@ -130,6 +174,7 @@ public class CompressedLongsSerdeTest
public void testValues(long[] values) throws Exception
{
ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer(
+ "test",
new OffHeapMemorySegmentWriteOutMedium(),
"test",
order,
diff --git a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java
index 150dd0d..5ba8420 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSerializerTest.java
@@ -32,22 +32,27 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.WriteOutBytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
@RunWith(Parameterized.class)
public class CompressedVSizeColumnarIntsSerializerTest
@@ -62,6 +67,9 @@ public class CompressedVSizeColumnarIntsSerializerTest
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
public CompressedVSizeColumnarIntsSerializerTest(
CompressionStrategy compressionStrategy,
ByteOrder byteOrder
@@ -108,8 +116,9 @@ public class CompressedVSizeColumnarIntsSerializerTest
private void checkSerializedSizeAndData(int chunkSize) throws Exception
{
FileSmoosher smoosher = new FileSmoosher(temporaryFolder.newFolder());
-
+ final String columnName = "test";
CompressedVSizeColumnarIntsSerializer writer = new CompressedVSizeColumnarIntsSerializer(
+ columnName,
segmentWriteOutMedium,
"test",
vals.length > 0 ? Ints.max(vals) : 0,
@@ -170,6 +179,44 @@ public class CompressedVSizeColumnarIntsSerializerTest
}
}
+
+ // this test takes ~18 minutes to run
+ @Ignore
+ @Test
+ public void testTooManyValues() throws IOException
+ {
+ final int maxValue = 0x0FFFFFFF;
+ final int maxChunkSize = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
+ expectedException.expect(ColumnCapacityExceededException.class);
+ expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
+ try (
+ SegmentWriteOutMedium segmentWriteOutMedium =
+ TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
+ ) {
+ GenericIndexedWriter genericIndexed = GenericIndexedWriter.ofCompressedByteBuffers(
+ segmentWriteOutMedium,
+ "test",
+ compressionStrategy,
+ Long.BYTES * 10000
+ );
+ CompressedVSizeColumnarIntsSerializer serializer = new CompressedVSizeColumnarIntsSerializer(
+ "test",
+ segmentWriteOutMedium,
+ maxValue,
+ maxChunkSize,
+ byteOrder,
+ compressionStrategy,
+ genericIndexed
+ );
+ serializer.open();
+
+ final long numRows = Integer.MAX_VALUE + 100L;
+ for (long i = 0L; i < numRows; i++) {
+ serializer.addValue(ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE));
+ }
+ }
+ }
+
@Test
public void testEmpty() throws Exception
{
@@ -181,7 +228,7 @@ public class CompressedVSizeColumnarIntsSerializerTest
{
File tmpDirectory = temporaryFolder.newFolder();
FileSmoosher smoosher = new FileSmoosher(tmpDirectory);
-
+ final String columnName = "test";
GenericIndexedWriter genericIndexed = GenericIndexedWriter.ofCompressedByteBuffers(
segmentWriteOutMedium,
"test",
@@ -189,6 +236,7 @@ public class CompressedVSizeColumnarIntsSerializerTest
Long.BYTES * 10000
);
CompressedVSizeColumnarIntsSerializer writer = new CompressedVSizeColumnarIntsSerializer(
+ columnName,
segmentWriteOutMedium,
vals.length > 0 ? Ints.max(vals) : 0,
chunkSize,
diff --git a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java
index bb9f0b2..de6485f 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/V3CompressedVSizeColumnarMultiIntsSerializerTest.java
@@ -32,11 +32,14 @@ import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.io.smoosh.SmooshedWriter;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.WriteOutBytes;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -54,6 +57,7 @@ import java.util.stream.IntStream;
@RunWith(Parameterized.class)
public class V3CompressedVSizeColumnarMultiIntsSerializerTest
{
+ private static final String TEST_COLUMN_NAME = "test";
private static final int[] OFFSET_CHUNK_FACTORS = new int[]{
1,
2,
@@ -69,6 +73,9 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
public V3CompressedVSizeColumnarMultiIntsSerializerTest(
CompressionStrategy compressionStrategy,
ByteOrder byteOrder
@@ -92,19 +99,101 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
);
}
- private void generateVals(final int totalSize, final int maxValue)
+ @Before
+ public void setUp()
+ {
+ vals = null;
+ }
+
+ @Test
+ public void testSmallData() throws Exception
+ {
+ // less than one chunk
+ for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
+ for (int maxValue : MAX_VALUES) {
+ final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
+ generateVals(rand.nextInt(valueChunk), maxValue, 2);
+ checkSerializedSizeAndData(offsetChunk, valueChunk);
+ }
+ }
+ }
+
+ @Test
+ public void testLargeData() throws Exception
{
- vals = new ArrayList<>(totalSize);
- for (int i = 0; i < totalSize; ++i) {
- int len = rand.nextInt(2) + 1;
- int[] subVals = new int[len];
- for (int j = 0; j < len; ++j) {
- subVals[j] = rand.nextInt(maxValue);
+ // more than one chunk
+ for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
+ for (int maxValue : MAX_VALUES) {
+ final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
+ generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue, 2);
+ checkSerializedSizeAndData(offsetChunk, valueChunk);
+ }
+ }
+ }
+
+ @Test
+ public void testEmpty() throws Exception
+ {
+ vals = new ArrayList<>();
+ checkSerializedSizeAndData(1, 2);
+ }
+
+ @Test
+ public void testMultiValueFileLargeData() throws Exception
+ {
+ // more than one chunk
+ for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
+ for (int maxValue : MAX_VALUES) {
+ final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
+ generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue, 2);
+ checkV2SerializedSizeAndData(offsetChunk, valueChunk);
}
- vals.add(subVals);
}
}
+ // this test takes ~30 minutes to run
+ @Ignore
+ @Test
+ public void testTooManyValues() throws Exception
+ {
+ expectedException.expect(ColumnCapacityExceededException.class);
+ expectedException.expectMessage(ColumnCapacityExceededException.formatMessage("test"));
+ // more than one chunk
+ final int offsetChunk = CompressedColumnarIntsSupplier.MAX_INTS_IN_BUFFER;
+ final int maxValue = 0x0FFFFFFF;
+ final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
+ final int numRows = 10000000;
+ final int maxValuesPerRow = 1000;
+ generateV2SerializedSizeAndData(numRows, maxValue, maxValuesPerRow, offsetChunk, valueChunk);
+ }
+
+ private void generateVals(final int rowCount, final int maxValue, final int numValuesPerRow)
+ {
+ vals = new ArrayList<>(rowCount);
+ for (int i = 0; i < rowCount; ++i) {
+ vals.add(generateRow(rand, maxValue, numValuesPerRow));
+ }
+ }
+
+ private int[] generateRow(Random rand, final int maxValue, final int numValuesPerRow)
+ {
+ int len = rand.nextInt(numValuesPerRow) + 1;
+ int[] subVals = new int[len];
+ for (int j = 0; j < len; ++j) {
+ subVals[j] = rand.nextInt(maxValue);
+ }
+ return subVals;
+ }
+
+ private int getMaxValue(final List<int[]> vals)
+ {
+ return vals
+ .stream()
+ .mapToInt(array -> IntStream.of(array).max().orElse(0))
+ .max()
+ .orElseThrow(NoSuchElementException::new);
+ }
+
private void checkSerializedSizeAndData(int offsetChunkFactor, int valueChunkFactor) throws Exception
{
FileSmoosher smoosher = new FileSmoosher(temporaryFolder.newFolder());
@@ -112,6 +201,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
try (SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium()) {
int maxValue = vals.size() > 0 ? getMaxValue(vals) : 0;
CompressedColumnarIntsSerializer offsetWriter = new CompressedColumnarIntsSerializer(
+ TEST_COLUMN_NAME,
segmentWriteOutMedium,
"offset",
offsetChunkFactor,
@@ -119,6 +209,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
compressionStrategy
);
CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer(
+ TEST_COLUMN_NAME,
segmentWriteOutMedium,
"value",
maxValue,
@@ -127,7 +218,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
compressionStrategy
);
V3CompressedVSizeColumnarMultiIntsSerializer writer =
- new V3CompressedVSizeColumnarMultiIntsSerializer(offsetWriter, valueWriter);
+ new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, offsetWriter, valueWriter);
V3CompressedVSizeColumnarMultiIntsSupplier supplierFromIterable =
V3CompressedVSizeColumnarMultiIntsSupplier.fromIterable(
Iterables.transform(vals, ArrayBasedIndexedInts::new),
@@ -167,54 +258,6 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
}
}
- private int getMaxValue(final List<int[]> vals)
- {
- return vals
- .stream()
- .mapToInt(array -> IntStream.of(array).max().orElse(0))
- .max()
- .orElseThrow(NoSuchElementException::new);
- }
-
- @Before
- public void setUp()
- {
- vals = null;
- }
-
- @Test
- public void testSmallData() throws Exception
- {
- // less than one chunk
- for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
- for (int maxValue : MAX_VALUES) {
- final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
- generateVals(rand.nextInt(valueChunk), maxValue);
- checkSerializedSizeAndData(offsetChunk, valueChunk);
- }
- }
- }
-
- @Test
- public void testLargeData() throws Exception
- {
- // more than one chunk
- for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
- for (int maxValue : MAX_VALUES) {
- final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
- generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue);
- checkSerializedSizeAndData(offsetChunk, valueChunk);
- }
- }
- }
-
- @Test
- public void testEmpty() throws Exception
- {
- vals = new ArrayList<>();
- checkSerializedSizeAndData(1, 2);
- }
-
private void checkV2SerializedSizeAndData(int offsetChunkFactor, int valueChunkFactor) throws Exception
{
File tmpDirectory = FileUtils.createTempDir(StringUtils.format(
@@ -227,6 +270,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
try (SegmentWriteOutMedium segmentWriteOutMedium = new OffHeapMemorySegmentWriteOutMedium()) {
CompressedColumnarIntsSerializer offsetWriter = new CompressedColumnarIntsSerializer(
+ TEST_COLUMN_NAME,
segmentWriteOutMedium,
offsetChunkFactor,
byteOrder,
@@ -246,6 +290,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
Long.BYTES * 250000
);
CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer(
+ TEST_COLUMN_NAME,
segmentWriteOutMedium,
maxValue,
valueChunkFactor,
@@ -254,7 +299,7 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
genericIndexed
);
V3CompressedVSizeColumnarMultiIntsSerializer writer =
- new V3CompressedVSizeColumnarMultiIntsSerializer(offsetWriter, valueWriter);
+ new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, offsetWriter, valueWriter);
writer.open();
for (int[] val : vals) {
writer.addValues(new ArrayBasedIndexedInts(val));
@@ -282,16 +327,76 @@ public class V3CompressedVSizeColumnarMultiIntsSerializerTest
}
}
- @Test
- public void testMultiValueFileLargeData() throws Exception
+ private void generateV2SerializedSizeAndData(long numRows, int maxValue, int maxValuesPerRow, int offsetChunkFactor, int valueChunkFactor) throws Exception
{
- // more than one chunk
- for (int offsetChunk : OFFSET_CHUNK_FACTORS) {
- for (int maxValue : MAX_VALUES) {
- final int valueChunk = CompressedVSizeColumnarIntsSupplier.maxIntsInBufferForValue(maxValue);
- generateVals((rand.nextInt(2) + 1) * valueChunk + rand.nextInt(valueChunk), maxValue);
- checkV2SerializedSizeAndData(offsetChunk, valueChunk);
+ File tmpDirectory = FileUtils.createTempDir(StringUtils.format(
+ "CompressedVSizeIndexedV3WriterTest_%d_%d",
+ offsetChunkFactor,
+ offsetChunkFactor
+ ));
+ FileSmoosher smoosher = new FileSmoosher(tmpDirectory);
+
+ try (
+ SegmentWriteOutMedium segmentWriteOutMedium =
+ TmpFileSegmentWriteOutMediumFactory.instance().makeSegmentWriteOutMedium(temporaryFolder.newFolder())
+ ) {
+ CompressedColumnarIntsSerializer offsetWriter = new CompressedColumnarIntsSerializer(
+ TEST_COLUMN_NAME,
+ segmentWriteOutMedium,
+ offsetChunkFactor,
+ byteOrder,
+ compressionStrategy,
+ GenericIndexedWriter.ofCompressedByteBuffers(
+ segmentWriteOutMedium,
+ "offset",
+ compressionStrategy,
+ Long.BYTES * 250000
+ )
+ );
+
+ GenericIndexedWriter genericIndexed = GenericIndexedWriter.ofCompressedByteBuffers(
+ segmentWriteOutMedium,
+ "value",
+ compressionStrategy,
+ Long.BYTES * 250000
+ );
+ CompressedVSizeColumnarIntsSerializer valueWriter = new CompressedVSizeColumnarIntsSerializer(
+ TEST_COLUMN_NAME,
+ segmentWriteOutMedium,
+ maxValue,
+ valueChunkFactor,
+ byteOrder,
+ compressionStrategy,
+ genericIndexed
+ );
+ V3CompressedVSizeColumnarMultiIntsSerializer writer =
+ new V3CompressedVSizeColumnarMultiIntsSerializer(TEST_COLUMN_NAME, offsetWriter, valueWriter);
+ writer.open();
+ for (long l = 0L; l < numRows; l++) {
+ writer.addValues(new ArrayBasedIndexedInts(generateRow(rand, maxValue, maxValuesPerRow)));
}
+
+ final SmooshedWriter channel = smoosher.addWithSmooshedWriter("test", writer.getSerializedSize());
+ writer.writeTo(channel, smoosher);
+ channel.close();
+ smoosher.close();
+ SmooshedFileMapper mapper = Smoosh.map(tmpDirectory);
+
+ V3CompressedVSizeColumnarMultiIntsSupplier supplierFromByteBuffer =
+ V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(mapper.mapFile("test"), byteOrder, mapper);
+ ColumnarMultiInts columnarMultiInts = supplierFromByteBuffer.get();
+ Assert.assertEquals(columnarMultiInts.size(), numRows);
+ Random verifier = new Random(0);
+ for (int i = 0; i < numRows; ++i) {
+ IndexedInts subVals = columnarMultiInts.get(i);
+ int[] expected = generateRow(verifier, maxValue, maxValuesPerRow);
+ Assert.assertEquals(subVals.size(), expected.length);
+ for (int j = 0, size = subVals.size(); j < size; ++j) {
+ Assert.assertEquals(subVals.get(j), expected[j]);
+ }
+ }
+ CloseQuietly.close(columnarMultiInts);
+ mapper.close();
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org