You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/07/15 18:29:47 UTC
[incubator-pinot] branch master updated: #4401 Reuse a ThreadLocal
byte[] when reading String elements from the variable length value reader.
(#4432)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 12e77bd #4401 Reuse a ThreadLocal byte[] when reading String elements from the variable length value reader. (#4432)
12e77bd is described below
commit 12e77bd2aa6a6264a772ac98ea20663a9baf8f28
Author: Buchi Reddy Busi Reddy <ma...@gmail.com>
AuthorDate: Mon Jul 15 11:29:40 2019 -0700
#4401 Reuse a ThreadLocal byte[] when reading String elements from the variable length value reader. (#4432)
Reuse a ThreadLocal byte[] when reading String elements from the variable length value reader.
---
.../org/apache/pinot/common/utils/StringUtil.java | 4 +-
.../io/util/VarLengthBytesValueReaderWriter.java | 40 ++++++++++++++++-
.../util/VarLengthBytesValueReaderWriterTest.java | 52 ++++++++++++++++++++++
3 files changed, 93 insertions(+), 3 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/StringUtil.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/StringUtil.java
index e871b32..efc4b59 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/StringUtil.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/StringUtil.java
@@ -72,9 +72,9 @@ public class StringUtil {
return decodeUtf8(bytes, 0, bytes.length);
}
- public static String decodeUtf8(byte[] bytes, int startIndex, int endIndex) {
+ public static String decodeUtf8(byte[] bytes, int startIndex, int length) {
try {
- return new String(bytes, startIndex, endIndex, charSet);
+ return new String(bytes, startIndex, length, charSet);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
index 45e840b..7d15eba 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
@@ -93,6 +93,12 @@ public class VarLengthBytesValueReaderWriter implements Closeable, ValueReader {
private final int _numElements;
/**
+ * ThreadLocal buffer to reuse the byte[] wherever we can. The byte[] is created with a decent
+ * size and later resized dynamically based on the size of elements.
+ */
+ private final ThreadLocal<byte[]> _reusableBytes = ThreadLocal.withInitial(() -> new byte[16]);
+
+ /**
* Constructor to create a VarLengthBytesValueReaderWriter from a previously written buffer.
*/
public VarLengthBytesValueReaderWriter(PinotDataBuffer dataBuffer) {
@@ -146,6 +152,19 @@ public class VarLengthBytesValueReaderWriter implements Closeable, ValueReader {
return false;
}
+ /**
+ * Returns the power of 2 >= n.
+ */
+ private static int nextPowerOf2(int n) {
+ n -= 1;
+ n |= n >>> 1;
+ n |= n >>> 2;
+ n |= n >>> 4;
+ n |= n >>> 8;
+ n |= n >>> 16;
+ return n + 1;
+ }
+
private void writeHeader() {
for (int offset = 0; offset < MAGIC_BYTES.length; offset++) {
_dataBuffer.putByte(offset, MAGIC_BYTES[offset]);
@@ -204,7 +223,26 @@ public class VarLengthBytesValueReaderWriter implements Closeable, ValueReader {
@Override
public String getUnpaddedString(int index, int numBytesPerValue, byte paddingByte, byte[] buffer) {
- return StringUtil.decodeUtf8(getBytes(index, numBytesPerValue, buffer));
+ // Read the offset of the byte array first and then read the actual byte array.
+ int offset = _dataBuffer.getInt(_dataSectionStartOffSet + Integer.BYTES * index);
+
+ // To get the length of the byte array, we use the next byte array offset.
+ int length = _dataBuffer.getInt(_dataSectionStartOffSet + Integer.BYTES * (index + 1)) - offset;
+
+ byte[] b;
+ if (buffer != null && buffer.length >= length) {
+ b = buffer;
+ } else {
+ // Check if the current instance of ThreadLocal buffer is big enough. If not, resize it to double the size.
+ b = _reusableBytes.get();
+ if (b.length < length) {
+ b = new byte[nextPowerOf2(length)];
+ _reusableBytes.set(b);
+ }
+ }
+
+ _dataBuffer.copyTo(offset, b, 0, length);
+ return StringUtil.decodeUtf8(b, 0, length);
}
@Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriterTest.java b/pinot-core/src/test/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriterTest.java
index 069da08..86966a2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriterTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriterTest.java
@@ -24,6 +24,8 @@ import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Random;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -32,6 +34,8 @@ import org.testng.annotations.Test;
* Unit test for {@link VarLengthBytesValueReaderWriter}
*/
public class VarLengthBytesValueReaderWriterTest {
+ private static final int MAX_STRING_LENGTH = 200;
+
private final Random random = new Random();
@Test
@@ -50,6 +54,7 @@ public class VarLengthBytesValueReaderWriterTest {
}
try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, true, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+ Assert.assertTrue(VarLengthBytesValueReaderWriter.isVarLengthBytesDictBuffer(buffer));
VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer);
Assert.assertEquals(readerWriter.getNumElements(), 0);
} finally {
@@ -74,6 +79,7 @@ public class VarLengthBytesValueReaderWriterTest {
}
try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, true, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+ Assert.assertTrue(VarLengthBytesValueReaderWriter.isVarLengthBytesDictBuffer(buffer));
VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer);
Assert.assertEquals(readerWriter.getNumElements(), 1);
byte[] newArray = readerWriter.getBytes(0, -1, null);
@@ -104,6 +110,7 @@ public class VarLengthBytesValueReaderWriterTest {
}
try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, false, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+ Assert.assertTrue(VarLengthBytesValueReaderWriter.isVarLengthBytesDictBuffer(buffer));
VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer);
Assert.assertEquals(byteArrays.length, readerWriter.getNumElements());
for (int i = 0; i < byteArrays.length; i++) {
@@ -115,4 +122,49 @@ public class VarLengthBytesValueReaderWriterTest {
FileUtils.forceDelete(tempFile);
}
}
+
+ @Test
+ public void testArbitraryLengthStringDictionary()
+ throws IOException {
+ Random random = new Random();
+ int numStrings = random.nextInt(100);
+ String[] strings = new String[numStrings];
+ for (int i = 0; i < numStrings; i++) {
+ strings[i] = RandomStringUtils.randomAlphanumeric(1 + random.nextInt(MAX_STRING_LENGTH));
+ }
+
+ byte[][] byteArrays = new byte[numStrings][];
+ for (int i = 0; i < numStrings; i++) {
+ byteArrays[i] = StringUtil.encodeUtf8(strings[i]);
+ }
+ long size = VarLengthBytesValueReaderWriter.getRequiredSize(byteArrays);
+
+ final File tempFile =
+ new File(FileUtils.getTempDirectory(), VarLengthBytesValueReaderWriterTest.class.getName() + random.nextInt());
+
+ try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, false, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+ VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer, byteArrays);
+ Assert.assertEquals(byteArrays.length, readerWriter.getNumElements());
+ }
+
+ try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, false, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+ Assert.assertTrue(VarLengthBytesValueReaderWriter.isVarLengthBytesDictBuffer(buffer));
+ VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer);
+ Assert.assertEquals(byteArrays.length, readerWriter.getNumElements());
+ for (int i = 0; i < strings.length; i++) {
+ String value = readerWriter.getUnpaddedString(i, -1, (byte) 0, null);
+ Assert.assertEquals(value, strings[i]);
+ }
+
+ // Reading a padded string should fail.
+ try {
+ readerWriter.getPaddedString(0, -1, null);
+ Assert.fail("getPaddedString() should fail on VarLengthBytesValueReader.");
+ } catch (UnsupportedOperationException ignore) {
+ // Expected.
+ }
+ } finally {
+ FileUtils.forceDelete(tempFile);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org