You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2018/07/31 01:35:23 UTC
[incubator-druid] branch master updated: Fix 'auto' encoded longs +
compression serializer (#6045)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 20ae8aa Fix 'auto' encoded longs + compression serializer (#6045)
20ae8aa is described below
commit 20ae8aa6263e623564fc6bc0f3081554d12f44d5
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Mon Jul 30 18:35:20 2018 -0700
Fix 'auto' encoded longs + compression serializer (#6045)
* Fix 'auto' encoded longs + compression serializer
Fixes #6044
changes:
* Fixes `VSizeLongSerde` serializers to treat 'close' as 'flush' when used with `BlockLayoutColumnarLongsSerializer`, allowing unwritten values to be flushed to the buffer when the block is compressed
* Add exhaustive unit test that flexes a variety of value sizes, row counts, and compression strategies to catch issues such as these
:
* refactor LongSerializer close to be named flush instead
* revert and just make new serializers per block
---
.../data/BlockLayoutColumnarLongsSerializer.java | 1 +
.../data/CompressedLongsAutoEncodingSerdeTest.java | 138 +++++++++++++++++++++
2 files changed, 139 insertions(+)
diff --git a/processing/src/main/java/io/druid/segment/data/BlockLayoutColumnarLongsSerializer.java b/processing/src/main/java/io/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
index f9b5cff..1031bc2 100644
--- a/processing/src/main/java/io/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
+++ b/processing/src/main/java/io/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
@@ -92,6 +92,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
endBuffer.flip();
flattener.write(endBuffer);
endBuffer.clear();
+ writer.setBuffer(endBuffer);
}
writer.write(value);
diff --git a/processing/src/test/java/io/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java b/processing/src/test/java/io/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
new file mode 100644
index 0000000..a969b34
--- /dev/null
+++ b/processing/src/test/java/io/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment.data;
+
+import io.druid.java.util.common.StringUtils;
+import io.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.Channels;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+@RunWith(Parameterized.class)
+public class CompressedLongsAutoEncodingSerdeTest
+{
+ @Parameterized.Parameters(name = "{0} {1} {2}")
+ public static Iterable<Object[]> compressionStrategies()
+ {
+ List<Object[]> data = new ArrayList<>();
+ for (long bpv : bitsPerValueParameters) {
+ for (CompressionStrategy strategy : CompressionStrategy.values()) {
+ data.add(new Object[]{bpv, strategy, ByteOrder.BIG_ENDIAN});
+ data.add(new Object[]{bpv, strategy, ByteOrder.LITTLE_ENDIAN});
+ }
+ }
+ return data;
+ }
+
+ private static final long[] bitsPerValueParameters = new long[]{1, 2, 4, 7, 11, 14, 18, 23, 31, 39, 46, 55, 62};
+
+ protected final CompressionFactory.LongEncodingStrategy encodingStrategy = CompressionFactory.LongEncodingStrategy.AUTO;
+ protected final CompressionStrategy compressionStrategy;
+ protected final ByteOrder order;
+ protected final long bitsPerValue;
+
+ public CompressedLongsAutoEncodingSerdeTest(
+ long bitsPerValue,
+ CompressionStrategy compressionStrategy,
+ ByteOrder order
+ )
+ {
+ this.bitsPerValue = bitsPerValue;
+ this.compressionStrategy = compressionStrategy;
+ this.order = order;
+ }
+
+ @Test
+ public void testFidelity() throws Exception
+ {
+ final long bound = 1L << bitsPerValue;
+ // big enough to have at least 2 blocks, and a handful of sizes offset by 1 from each other
+ int blockSize = 1 << 16;
+ int numBits = (Long.SIZE - Long.numberOfLeadingZeros(1 << (bitsPerValue - 1)));
+ double numValuesPerByte = 8.0 / (double) numBits;
+
+ int numRows = (int) (blockSize * numValuesPerByte) * 2 + ThreadLocalRandom.current().nextInt(1, 101);
+ long chunk[] = new long[numRows];
+ for (int i = 0; i < numRows; i++) {
+ chunk[i] = ThreadLocalRandom.current().nextLong(bound);
+ }
+ testValues(chunk);
+
+ numRows++;
+ chunk = new long[numRows];
+ for (int i = 0; i < numRows; i++) {
+ chunk[i] = ThreadLocalRandom.current().nextLong(bound);
+ }
+ testValues(chunk);
+ }
+
+ public void testValues(long[] values) throws Exception
+ {
+ ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer(
+ new OffHeapMemorySegmentWriteOutMedium(),
+ "test",
+ order,
+ encodingStrategy,
+ compressionStrategy
+ );
+ serializer.open();
+
+ for (long value : values) {
+ serializer.add(value);
+ }
+ Assert.assertEquals(values.length, serializer.size());
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ serializer.writeTo(Channels.newChannel(baos), null);
+ Assert.assertEquals(baos.size(), serializer.getSerializedSize());
+ CompressedColumnarLongsSupplier supplier =
+ CompressedColumnarLongsSupplier.fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order);
+ ColumnarLongs longs = supplier.get();
+
+ assertIndexMatchesVals(longs, values);
+ longs.close();
+ }
+
+ private void assertIndexMatchesVals(ColumnarLongs indexed, long[] vals)
+ {
+ Assert.assertEquals(vals.length, indexed.size());
+ for (int i = 0; i < indexed.size(); ++i) {
+ Assert.assertEquals(
+ StringUtils.format(
+ "Value [%d] at row '%d' does not match [%d]",
+ indexed.get(i),
+ i,
+ vals[i]
+ ),
+ vals[i],
+ indexed.get(i)
+ );
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org