You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/07/15 18:29:47 UTC

[incubator-pinot] branch master updated: #4401 Reuse a ThreadLocal byte[] when reading String elements from the variable length value reader. (#4432)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 12e77bd  #4401 Reuse a ThreadLocal byte[] when reading String elements from the variable length value reader. (#4432)
12e77bd is described below

commit 12e77bd2aa6a6264a772ac98ea20663a9baf8f28
Author: Buchi Reddy Busi Reddy <ma...@gmail.com>
AuthorDate: Mon Jul 15 11:29:40 2019 -0700

    #4401 Reuse a ThreadLocal byte[] when reading String elements from the variable length value reader. (#4432)
    
    Reuse a ThreadLocal byte[] when reading String elements from the variable length value reader.
---
 .../org/apache/pinot/common/utils/StringUtil.java  |  4 +-
 .../io/util/VarLengthBytesValueReaderWriter.java   | 40 ++++++++++++++++-
 .../util/VarLengthBytesValueReaderWriterTest.java  | 52 ++++++++++++++++++++++
 3 files changed, 93 insertions(+), 3 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/StringUtil.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/StringUtil.java
index e871b32..efc4b59 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/StringUtil.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/StringUtil.java
@@ -72,9 +72,9 @@ public class StringUtil {
     return decodeUtf8(bytes, 0, bytes.length);
   }
 
-  public static String decodeUtf8(byte[] bytes, int startIndex, int endIndex) {
+  public static String decodeUtf8(byte[] bytes, int startIndex, int length) {
     try {
-      return new String(bytes, startIndex, endIndex, charSet);
+      return new String(bytes, startIndex, length, charSet);
     } catch (UnsupportedEncodingException e) {
       throw new RuntimeException(e);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
index 45e840b..7d15eba 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
@@ -93,6 +93,12 @@ public class VarLengthBytesValueReaderWriter implements Closeable, ValueReader {
   private final int _numElements;
 
   /**
+   * ThreadLocal buffer to reuse the byte[] wherever we can. The byte[] is created with a decent
+   * size and later resized dynamically based on the size of elements.
+   */
+  private final ThreadLocal<byte[]> _reusableBytes = ThreadLocal.withInitial(() -> new byte[16]);
+
+  /**
    * Constructor to create a VarLengthBytesValueReaderWriter from a previously written buffer.
    */
   public VarLengthBytesValueReaderWriter(PinotDataBuffer dataBuffer) {
@@ -146,6 +152,19 @@ public class VarLengthBytesValueReaderWriter implements Closeable, ValueReader {
     return false;
   }
 
+  /**
+   * Returns the power of 2 >= n.
+   */
+  private static int nextPowerOf2(int n) {
+    n -= 1;
+    n |= n >>> 1;
+    n |= n >>> 2;
+    n |= n >>> 4;
+    n |= n >>> 8;
+    n |= n >>> 16;
+    return n + 1;
+  }
+
   private void writeHeader() {
     for (int offset = 0; offset < MAGIC_BYTES.length; offset++) {
       _dataBuffer.putByte(offset, MAGIC_BYTES[offset]);
@@ -204,7 +223,26 @@ public class VarLengthBytesValueReaderWriter implements Closeable, ValueReader {
 
   @Override
   public String getUnpaddedString(int index, int numBytesPerValue, byte paddingByte, byte[] buffer) {
-    return StringUtil.decodeUtf8(getBytes(index, numBytesPerValue, buffer));
+    // Read the offset of the byte array first and then read the actual byte array.
+    int offset = _dataBuffer.getInt(_dataSectionStartOffSet + Integer.BYTES * index);
+
+    // To get the length of the byte array, we use the next byte array offset.
+    int length = _dataBuffer.getInt(_dataSectionStartOffSet + Integer.BYTES * (index + 1)) - offset;
+
+    byte[] b;
+    if (buffer != null && buffer.length >= length) {
+      b = buffer;
+    } else {
+      // Check if the current instance of ThreadLocal buffer is big enough. If not, resize it to double the size.
+      b = _reusableBytes.get();
+      if (b.length < length) {
+        b = new byte[nextPowerOf2(length)];
+        _reusableBytes.set(b);
+      }
+    }
+
+    _dataBuffer.copyTo(offset, b, 0, length);
+    return StringUtil.decodeUtf8(b, 0, length);
   }
 
   @Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriterTest.java b/pinot-core/src/test/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriterTest.java
index 069da08..86966a2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriterTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriterTest.java
@@ -24,6 +24,8 @@ import java.nio.ByteOrder;
 import java.util.Arrays;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -32,6 +34,8 @@ import org.testng.annotations.Test;
  * Unit test for {@link VarLengthBytesValueReaderWriter}
  */
 public class VarLengthBytesValueReaderWriterTest {
+  private static final int MAX_STRING_LENGTH = 200;
+
   private final Random random = new Random();
 
   @Test
@@ -50,6 +54,7 @@ public class VarLengthBytesValueReaderWriterTest {
     }
 
     try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, true, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+      Assert.assertTrue(VarLengthBytesValueReaderWriter.isVarLengthBytesDictBuffer(buffer));
       VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer);
       Assert.assertEquals(readerWriter.getNumElements(), 0);
     } finally {
@@ -74,6 +79,7 @@ public class VarLengthBytesValueReaderWriterTest {
     }
 
     try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, true, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+      Assert.assertTrue(VarLengthBytesValueReaderWriter.isVarLengthBytesDictBuffer(buffer));
       VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer);
       Assert.assertEquals(readerWriter.getNumElements(), 1);
       byte[] newArray = readerWriter.getBytes(0, -1, null);
@@ -104,6 +110,7 @@ public class VarLengthBytesValueReaderWriterTest {
     }
 
     try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, false, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+      Assert.assertTrue(VarLengthBytesValueReaderWriter.isVarLengthBytesDictBuffer(buffer));
       VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer);
       Assert.assertEquals(byteArrays.length, readerWriter.getNumElements());
       for (int i = 0; i < byteArrays.length; i++) {
@@ -115,4 +122,49 @@ public class VarLengthBytesValueReaderWriterTest {
       FileUtils.forceDelete(tempFile);
     }
   }
+
+  @Test
+  public void testArbitraryLengthStringDictionary()
+      throws IOException {
+    Random random = new Random();
+    int numStrings = random.nextInt(100);
+    String[] strings = new String[numStrings];
+    for (int i = 0; i < numStrings; i++) {
+      strings[i] = RandomStringUtils.randomAlphanumeric(1 + random.nextInt(MAX_STRING_LENGTH));
+    }
+
+    byte[][] byteArrays = new byte[numStrings][];
+    for (int i = 0; i < numStrings; i++) {
+      byteArrays[i] = StringUtil.encodeUtf8(strings[i]);
+    }
+    long size = VarLengthBytesValueReaderWriter.getRequiredSize(byteArrays);
+
+    final File tempFile =
+        new File(FileUtils.getTempDirectory(), VarLengthBytesValueReaderWriterTest.class.getName() + random.nextInt());
+
+    try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, false, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+      VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer, byteArrays);
+      Assert.assertEquals(byteArrays.length, readerWriter.getNumElements());
+    }
+
+    try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, false, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+      Assert.assertTrue(VarLengthBytesValueReaderWriter.isVarLengthBytesDictBuffer(buffer));
+      VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer);
+      Assert.assertEquals(byteArrays.length, readerWriter.getNumElements());
+      for (int i = 0; i < strings.length; i++) {
+        String value = readerWriter.getUnpaddedString(i, -1, (byte) 0, null);
+        Assert.assertEquals(value, strings[i]);
+      }
+
+      // Reading a padded string should fail.
+      try {
+        readerWriter.getPaddedString(0, -1, null);
+        Assert.fail("getPaddedString() should fail on VarLengthBytesValueReader.");
+      } catch (UnsupportedOperationException ignore) {
+        // Expected.
+      }
+    } finally {
+      FileUtils.forceDelete(tempFile);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org