You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2018/07/31 01:35:23 UTC

[incubator-druid] branch master updated: Fix 'auto' encoded longs + compression serializer (#6045)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 20ae8aa  Fix 'auto' encoded longs + compression serializer (#6045)
20ae8aa is described below

commit 20ae8aa6263e623564fc6bc0f3081554d12f44d5
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Mon Jul 30 18:35:20 2018 -0700

    Fix 'auto' encoded longs + compression serializer (#6045)
    
    * Fix 'auto' encoded longs + compression serializer
    Fixes #6044
    
    changes:
    * Fixes `VSizeLongSerde` serializers to treat 'close' as 'flush' when used with `BlockLayoutColumnarLongsSerializer`, allowing unwritten values to be flushed to the buffer when the block is compressed
    * Add exhaustive unit test that flexes a variety of value sizes, row counts, and compression strategies to catch issues such as these
    :
    
    * refactor LongSerializer close to be named flush instead
    
    * revert and just make new serializers per block
---
 .../data/BlockLayoutColumnarLongsSerializer.java   |   1 +
 .../data/CompressedLongsAutoEncodingSerdeTest.java | 138 +++++++++++++++++++++
 2 files changed, 139 insertions(+)

diff --git a/processing/src/main/java/io/druid/segment/data/BlockLayoutColumnarLongsSerializer.java b/processing/src/main/java/io/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
index f9b5cff..1031bc2 100644
--- a/processing/src/main/java/io/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
+++ b/processing/src/main/java/io/druid/segment/data/BlockLayoutColumnarLongsSerializer.java
@@ -92,6 +92,7 @@ public class BlockLayoutColumnarLongsSerializer implements ColumnarLongsSerializ
       endBuffer.flip();
       flattener.write(endBuffer);
       endBuffer.clear();
+      writer.setBuffer(endBuffer);
     }
 
     writer.write(value);
diff --git a/processing/src/test/java/io/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java b/processing/src/test/java/io/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
new file mode 100644
index 0000000..a969b34
--- /dev/null
+++ b/processing/src/test/java/io/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.segment.data;
+
+import io.druid.java.util.common.StringUtils;
+import io.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.Channels;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+@RunWith(Parameterized.class)
+public class CompressedLongsAutoEncodingSerdeTest
+{
+  @Parameterized.Parameters(name = "{0} {1} {2}")
+  public static Iterable<Object[]> compressionStrategies()
+  {
+    List<Object[]> data = new ArrayList<>();
+    for (long bpv : bitsPerValueParameters) {
+      for (CompressionStrategy strategy : CompressionStrategy.values()) {
+        data.add(new Object[]{bpv, strategy, ByteOrder.BIG_ENDIAN});
+        data.add(new Object[]{bpv, strategy, ByteOrder.LITTLE_ENDIAN});
+      }
+    }
+    return data;
+  }
+
+  private static final long[] bitsPerValueParameters = new long[]{1, 2, 4, 7, 11, 14, 18, 23, 31, 39, 46, 55, 62};
+
+  protected final CompressionFactory.LongEncodingStrategy encodingStrategy = CompressionFactory.LongEncodingStrategy.AUTO;
+  protected final CompressionStrategy compressionStrategy;
+  protected final ByteOrder order;
+  protected final long bitsPerValue;
+
+  public CompressedLongsAutoEncodingSerdeTest(
+      long bitsPerValue,
+      CompressionStrategy compressionStrategy,
+      ByteOrder order
+  )
+  {
+    this.bitsPerValue = bitsPerValue;
+    this.compressionStrategy = compressionStrategy;
+    this.order = order;
+  }
+
+  @Test
+  public void testFidelity() throws Exception
+  {
+    final long bound = 1L << bitsPerValue;
+    // big enough to have at least 2 blocks, and a handful of sizes offset by 1 from each other
+    int blockSize = 1 << 16;
+    int numBits = (Long.SIZE - Long.numberOfLeadingZeros(1 << (bitsPerValue - 1)));
+    double numValuesPerByte = 8.0 / (double) numBits;
+
+    int numRows = (int) (blockSize * numValuesPerByte) * 2 + ThreadLocalRandom.current().nextInt(1, 101);
+    long chunk[] = new long[numRows];
+    for (int i = 0; i < numRows; i++) {
+      chunk[i] = ThreadLocalRandom.current().nextLong(bound);
+    }
+    testValues(chunk);
+
+    numRows++;
+    chunk = new long[numRows];
+    for (int i = 0; i < numRows; i++) {
+      chunk[i] = ThreadLocalRandom.current().nextLong(bound);
+    }
+    testValues(chunk);
+  }
+
+  public void testValues(long[] values) throws Exception
+  {
+    ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer(
+        new OffHeapMemorySegmentWriteOutMedium(),
+        "test",
+        order,
+        encodingStrategy,
+        compressionStrategy
+    );
+    serializer.open();
+
+    for (long value : values) {
+      serializer.add(value);
+    }
+    Assert.assertEquals(values.length, serializer.size());
+
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    serializer.writeTo(Channels.newChannel(baos), null);
+    Assert.assertEquals(baos.size(), serializer.getSerializedSize());
+    CompressedColumnarLongsSupplier supplier =
+        CompressedColumnarLongsSupplier.fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order);
+    ColumnarLongs longs = supplier.get();
+
+    assertIndexMatchesVals(longs, values);
+    longs.close();
+  }
+
+  private void assertIndexMatchesVals(ColumnarLongs indexed, long[] vals)
+  {
+    Assert.assertEquals(vals.length, indexed.size());
+    for (int i = 0; i < indexed.size(); ++i) {
+      Assert.assertEquals(
+          StringUtils.format(
+              "Value [%d] at row '%d' does not match [%d]",
+              indexed.get(i),
+              i,
+              vals[i]
+          ),
+          vals[i],
+          indexed.get(i)
+      );
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org