You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2021/07/19 21:19:46 UTC
[hive] branch storage-branch-2.8 updated: HIVE-25190: Fix many
small allocations in BytesColumnVector
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch storage-branch-2.8
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/storage-branch-2.8 by this push:
new b304acd HIVE-25190: Fix many small allocations in BytesColumnVector
b304acd is described below
commit b304acd36334e54d276e9ded5851ae24d2f23595
Author: Owen O'Malley <oo...@linkedin.com>
AuthorDate: Fri Jun 18 16:30:13 2021 -0700
HIVE-25190: Fix many small allocations in BytesColumnVector
Fixes #2408
Signed-off-by: Owen O'Malley <oo...@linkedin.com>
---
storage-api/pom.xml | 2 +-
.../hive/ql/exec/vector/BytesColumnVector.java | 163 ++++++++++-----------
.../hive/ql/exec/vector/TestBytesColumnVector.java | 124 ++++++++++++++--
3 files changed, 187 insertions(+), 102 deletions(-)
diff --git a/storage-api/pom.xml b/storage-api/pom.xml
index 53fa3c0..c87aed7 100644
--- a/storage-api/pom.xml
+++ b/storage-api/pom.xml
@@ -185,7 +185,7 @@
<version>3.0.0-M4</version>
<configuration>
<reuseForks>false</reuseForks>
- <argLine>-Xmx2048m</argLine>
+ <argLine>-Xmx3g</argLine>
<failIfNoTests>false</failIfNoTests>
<systemPropertyVariables>
<test.tmp.dir>${project.build.directory}/tmp</test.tmp.dir>
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
index 6618807..a8c58ac 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
@@ -49,14 +49,15 @@ public class BytesColumnVector extends ColumnVector {
*/
public int[] length;
- // A call to increaseBufferSpace() or ensureValPreallocated() will ensure that buffer[] points to
- // a byte[] with sufficient space for the specified size.
- private byte[] buffer; // optional buffer to use when actually copying in data
- private int nextFree; // next free position in buffer
+ // Calls to ensureValPreallocated() ensure that currentValue and currentOffset
+ // are set to enough space for the value.
+ private byte[] currentValue; // bytes for the next value
+ private int currentOffset; // starting position in the current buffer
- // Hang onto a byte array for holding smaller byte values
- private byte[] smallBuffer;
- private int smallBufferNextFree;
+ // A shared static buffer allocation that we use for the small values
+ private byte[] sharedBuffer;
+ // The next unused offset in the sharedBuffer.
+ private int sharedBufferOffset;
private int bufferAllocationCount;
@@ -66,8 +67,11 @@ public class BytesColumnVector extends ColumnVector {
// Proportion of extra space to provide when allocating more buffer space.
static final float EXTRA_SPACE_FACTOR = (float) 1.2;
- // Largest size allowed in smallBuffer
- static final int MAX_SIZE_FOR_SMALL_BUFFER = 1024 * 1024;
+ // Largest item size allowed in sharedBuffer
+ static final int MAX_SIZE_FOR_SMALL_ITEM = 1024 * 1024;
+
+ // Largest size allowed for sharedBuffer
+ static final int MAX_SIZE_FOR_SHARED_BUFFER = 1024 * 1024 * 1024;
/**
* Use this constructor for normal operation.
@@ -121,30 +125,30 @@ public class BytesColumnVector extends ColumnVector {
* Provide the estimated number of bytes needed to hold
* a full column vector worth of byte string data.
*
- * @param estimatedValueSize Estimated size of buffer space needed
+ * @param estimatedValueSize Estimated size of buffer space needed per row
*/
public void initBuffer(int estimatedValueSize) {
- nextFree = 0;
- smallBufferNextFree = 0;
+ sharedBufferOffset = 0;
// if buffer is already allocated, keep using it, don't re-allocate
- if (buffer != null) {
+ if (sharedBuffer != null) {
// Free up any previously allocated buffers that are referenced by vector
if (bufferAllocationCount > 0) {
for (int idx = 0; idx < vector.length; ++idx) {
vector[idx] = null;
length[idx] = 0;
}
- buffer = smallBuffer; // In case last row was a large bytes value
}
} else {
// allocate a little extra space to limit need to re-allocate
- int bufferSize = this.vector.length * (int)(estimatedValueSize * EXTRA_SPACE_FACTOR);
+ long bufferSize = (long) (this.vector.length * estimatedValueSize * EXTRA_SPACE_FACTOR);
if (bufferSize < DEFAULT_BUFFER_SIZE) {
bufferSize = DEFAULT_BUFFER_SIZE;
}
- buffer = new byte[bufferSize];
- smallBuffer = buffer;
+ if (bufferSize > MAX_SIZE_FOR_SHARED_BUFFER) {
+ bufferSize = MAX_SIZE_FOR_SHARED_BUFFER;
+ }
+ sharedBuffer = new byte[(int) bufferSize];
}
bufferAllocationCount = 0;
}
@@ -160,10 +164,7 @@ public class BytesColumnVector extends ColumnVector {
* @return amount of buffer space currently allocated
*/
public int bufferSize() {
- if (buffer == null) {
- return 0;
- }
- return buffer.length;
+ return sharedBuffer == null ? 0 : sharedBuffer.length;
}
/**
@@ -182,16 +183,13 @@ public class BytesColumnVector extends ColumnVector {
* @param length length of source byte sequence
*/
public void setVal(int elementNum, byte[] sourceBuf, int start, int length) {
- if ((nextFree + length) > buffer.length) {
- increaseBufferSpace(length);
- }
+ ensureValPreallocated(length);
if (length > 0) {
- System.arraycopy(sourceBuf, start, buffer, nextFree, length);
+ System.arraycopy(sourceBuf, start, currentValue, currentOffset, length);
}
- vector[elementNum] = buffer;
- this.start[elementNum] = nextFree;
+ vector[elementNum] = currentValue;
+ this.start[elementNum] = currentOffset;
this.length[elementNum] = length;
- nextFree += length;
}
/**
@@ -212,23 +210,31 @@ public class BytesColumnVector extends ColumnVector {
}
/**
- * Preallocate space in the local buffer so the caller can fill in the value bytes themselves.
+ * Ensures that we have space allocated for the next value, which has size
+ * length bytes.
+ *
+ * Updates currentValue, currentOffset, and sharedBufferOffset for this value.
*
- * Always use with getValPreallocatedBytes, getValPreallocatedStart, and setValPreallocated.
+ * Always use before getValPreallocatedBytes, getValPreallocatedStart,
+ * and setValPreallocated.
*/
public void ensureValPreallocated(int length) {
- if ((nextFree + length) > buffer.length) {
- increaseBufferSpace(length);
+ if ((sharedBufferOffset + length) > sharedBuffer.length) {
+ currentValue = allocateBuffer(length);
+ } else {
+ currentValue = sharedBuffer;
+ currentOffset = sharedBufferOffset;
+ sharedBufferOffset += length;
}
}
@SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "Expose internal rep for efficiency")
public byte[] getValPreallocatedBytes() {
- return buffer;
+ return currentValue;
}
public int getValPreallocatedStart() {
- return nextFree;
+ return currentOffset;
}
/**
@@ -237,10 +243,9 @@ public class BytesColumnVector extends ColumnVector {
* @param length
*/
public void setValPreallocated(int elementNum, int length) {
- vector[elementNum] = buffer;
- this.start[elementNum] = nextFree;
+ vector[elementNum] = currentValue;
+ this.start[elementNum] = currentOffset;
this.length[elementNum] = length;
- nextFree += length;
}
/**
@@ -258,73 +263,55 @@ public class BytesColumnVector extends ColumnVector {
public void setConcat(int elementNum, byte[] leftSourceBuf, int leftStart, int leftLen,
byte[] rightSourceBuf, int rightStart, int rightLen) {
int newLen = leftLen + rightLen;
- if ((nextFree + newLen) > buffer.length) {
- increaseBufferSpace(newLen);
- }
- vector[elementNum] = buffer;
- this.start[elementNum] = nextFree;
+ ensureValPreallocated(newLen);
+ vector[elementNum] = currentValue;
+ this.start[elementNum] = currentOffset;
this.length[elementNum] = newLen;
- System.arraycopy(leftSourceBuf, leftStart, buffer, nextFree, leftLen);
- nextFree += leftLen;
- System.arraycopy(rightSourceBuf, rightStart, buffer, nextFree, rightLen);
- nextFree += rightLen;
+ System.arraycopy(leftSourceBuf, leftStart, currentValue, currentOffset, leftLen);
+ System.arraycopy(rightSourceBuf, rightStart, currentValue,
+ currentOffset + leftLen, rightLen);
}
/**
- * Increase buffer space enough to accommodate next element.
+ * Allocate/reuse enough buffer space to accommodate next element.
+ * currentOffset is set to the first available byte in the returned array.
+ * If sharedBuffer is used, sharedBufferOffset is updated to point after the
+ * current record.
+ *
* This uses an exponential increase mechanism to rapidly
* increase buffer size to enough to hold all data.
* As batches get re-loaded, buffer space allocated will quickly
* stabilize.
*
* @param nextElemLength size of next element to be added
+ * @return the buffer to use for the next element
*/
- public void increaseBufferSpace(int nextElemLength) {
- // A call to increaseBufferSpace() or ensureValPreallocated() will ensure that buffer[] points to
- // a byte[] with sufficient space for the specified size.
- // This will either point to smallBuffer, or to a newly allocated byte array for larger values.
-
- if (nextElemLength > MAX_SIZE_FOR_SMALL_BUFFER) {
- // Larger allocations will be special-cased and will not use the normal buffer.
- // buffer/nextFree will be set to a newly allocated array just for the current row.
- // The next row will require another call to increaseBufferSpace() since this new buffer should be used up.
- byte[] newBuffer = new byte[nextElemLength];
+ private byte[] allocateBuffer(int nextElemLength) {
+ // If this is a large value or shared buffer is maxed out, allocate a
+ // single use buffer. Assumes that sharedBuffer length and
+ // MAX_SIZE_FOR_SHARED_BUFFER are powers of 2.
+ if (nextElemLength > MAX_SIZE_FOR_SMALL_ITEM ||
+ sharedBufferOffset + nextElemLength >= MAX_SIZE_FOR_SHARED_BUFFER) {
+ // allocate a value for the next value
++bufferAllocationCount;
- // If the buffer was pointing to smallBuffer, then nextFree keeps track of the current state
- // of the free index for smallBuffer. We now need to save this value to smallBufferNextFree
- // so we don't lose this. A bit of a weird dance here.
- if (smallBuffer == buffer) {
- smallBufferNextFree = nextFree;
- }
- buffer = newBuffer;
- nextFree = 0;
+ currentOffset = 0;
+ return new byte[nextElemLength];
} else {
- // This value should go into smallBuffer.
- if (smallBuffer != buffer) {
- // Previous row was for a large bytes value ( > MAX_SIZE_FOR_SMALL_BUFFER).
- // Use smallBuffer if possible.
- buffer = smallBuffer;
- nextFree = smallBufferNextFree;
- }
- // smallBuffer might still be out of space
- if ((nextFree + nextElemLength) > buffer.length) {
- int newLength = smallBuffer.length * 2;
+ // sharedBuffer might still be out of space
+ if ((sharedBufferOffset + nextElemLength) > sharedBuffer.length) {
+ int newLength = sharedBuffer.length * 2;
while (newLength < nextElemLength) {
- if (newLength > 0) {
- newLength *= 2;
- } else { // integer overflow happened; maximize size of next smallBuffer
- newLength = Integer.MAX_VALUE;
- }
+ newLength *= 2;
}
- smallBuffer = new byte[newLength];
+ sharedBuffer = new byte[newLength];
++bufferAllocationCount;
- smallBufferNextFree = 0;
- // Update buffer
- buffer = smallBuffer;
- nextFree = 0;
+ sharedBufferOffset = 0;
}
+ currentOffset = sharedBufferOffset;
+ sharedBufferOffset += nextElemLength;
+ return sharedBuffer;
}
}
@@ -575,10 +562,12 @@ public class BytesColumnVector extends ColumnVector {
public void shallowCopyTo(ColumnVector otherCv) {
BytesColumnVector other = (BytesColumnVector)otherCv;
super.shallowCopyTo(other);
- other.nextFree = nextFree;
+ other.currentOffset = currentOffset;
other.vector = vector;
other.start = start;
other.length = length;
- other.buffer = buffer;
+ other.currentValue = currentValue;
+ other.sharedBuffer = sharedBuffer;
+ other.sharedBufferOffset = sharedBufferOffset;
}
}
diff --git a/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java b/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java
index da89122..3b42d17 100644
--- a/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java
+++ b/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java
@@ -18,10 +18,16 @@
package org.apache.hadoop.hive.ql.exec.vector;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
import org.junit.Test;
-import static org.junit.Assert.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
public class TestBytesColumnVector {
@Test
@@ -35,33 +41,41 @@ public class TestBytesColumnVector {
col.reset();
// Initial write (small value)
- byte[] bytes1 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1);
+ byte[] bytes1 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, 1);
bytesWrittenToBytes1 += smallWriteSize;
+ assertEquals(0, col.start[0]);
+ assertEquals(smallWriteSize, col.length[0]);
// Write a large value. This should use a different byte buffer
rowIdx++;
- byte[] bytes2 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, (byte) 2);
- assertFalse(bytes1 == bytes2);
+ byte[] bytes2 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, -1);
+ assertNotSame(bytes1, bytes2);
+ assertEquals(0, col.start[1]);
+ assertEquals(largeWriteSize, col.length[1]);
// Another small write. smallBuffer should be re-used for this write
rowIdx++;
- byte[] bytes3 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1);
+ byte[] bytes3 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, 2);
bytesWrittenToBytes1 += smallWriteSize;
- assertTrue(bytes1 == bytes3);
+ assertSame(bytes1, bytes3);
+ assertEquals(smallWriteSize, col.start[2]);
+ assertEquals(smallWriteSize, col.length[2]);
// Write another large value. This should use a different byte buffer
rowIdx++;
- byte[] bytes4 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, (byte) 3);
- assertFalse(bytes1 == bytes4);
- assertFalse(bytes2 == bytes4);
+ byte[] bytes4 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, -2);
+ assertNotSame(bytes1, bytes4);
+ assertNotSame(bytes2, bytes4);
+ assertEquals(0, col.start[3]);
+ assertEquals(largeWriteSize, col.length[3]);
// Eventually enough small writes should result in another buffer getting created
boolean gotNewBuffer = false;
// Test is dependent on getting a new buffer within 1MB.
// This may need to change as the implementation changes.
- for (int i = 0; i < 1024; ++i) {
+ for (int i = 3; i < 1024; ++i) {
rowIdx++;
- byte[] currBytes = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1);
+ byte[] currBytes = writeToBytesColumnVector(rowIdx, col, smallWriteSize, i);
if (currBytes == bytes1) {
bytesWrittenToBytes1 += smallWriteSize;
} else {
@@ -74,16 +88,98 @@ public class TestBytesColumnVector {
// All small writes to the first buffer should be in contiguous memory
for (int i = 0; i < bytesWrittenToBytes1; ++i) {
- assertEquals((byte) 1, bytes1[i]);
+ assertEquals((byte) (i / smallWriteSize + 1), bytes1[i]);
+ }
+ }
+
+ /**
+ * Test the setVal, setConcat, and StringExpr.padRight methods.
+ */
+ @Test
+ public void testConcatAndPadding() {
+ BytesColumnVector col = new BytesColumnVector();
+ col.reset();
+ byte[] prefix = "緑".getBytes(StandardCharsets.UTF_8);
+
+ // fill the column with 'test'
+ for(int row=0; row < col.vector.length; ++row) {
+ col.setVal(row, prefix, 0, prefix.length);
+ }
+ for(int row=0; row < col.vector.length; ++row) {
+ assertEquals("row " + row, "緑", col.toString(row));
+ }
+
+ // pad out to 6 characters
+ for(int row=0; row < col.vector.length; ++row) {
+ StringExpr.padRight(col, row, col.vector[row], col.start[row],
+ col.length[row], 6);
+ }
+ for(int row=0; row < col.vector.length; ++row) {
+ assertEquals("row " + row, "緑 ", col.toString(row));
+ }
+
+ // concat the row digits
+ for(int row=0; row < col.vector.length; ++row) {
+ byte[] rowStr = Integer.toString(row).getBytes(StandardCharsets.UTF_8);
+ col.setConcat(row, col.vector[row], col.start[row], col.length[row],
+ rowStr, 0, rowStr.length);
+ }
+ for(int row=0; row < col.vector.length; ++row) {
+ assertEquals("row " + row, "緑 " + row, col.toString(row));
+ }
+
+ // We end up allocating 20k, so we should have expanded the small buffer
+ assertEquals(32 * 1024, col.bufferSize());
+ }
+
+ @Test
+ public void testBufferOverflow() {
+ BytesColumnVector col = new BytesColumnVector(2048);
+ col.reset();
+ assertEquals(BytesColumnVector.DEFAULT_BUFFER_SIZE, col.bufferSize());
+
+ // pick a size below 1m so that we use the small buffer;
+ final int size = BytesColumnVector.MAX_SIZE_FOR_SMALL_ITEM - 1024;
+
+ // run through once to expand the small value buffer
+ for(int row=0; row < col.vector.length; ++row) {
+ writeToBytesColumnVector(row, col, size, row);
+ }
+ // it should have resized a bunch of times
+ byte[] sharedBuffer = col.getValPreallocatedBytes();
+ assertNotSame(sharedBuffer, col.vector[0]);
+ assertSame(sharedBuffer, col.vector[1024]);
+
+ // reset the column, but make sure the buffer isn't reallocated
+ col.reset();
+ assertEquals(BytesColumnVector.MAX_SIZE_FOR_SHARED_BUFFER, col.bufferSize());
+
+ // fill up the vector now with the large buffer
+ for(int row=0; row < col.vector.length; ++row) {
+ writeToBytesColumnVector(row, col, size, row);
+ }
+ assertEquals(BytesColumnVector.MAX_SIZE_FOR_SHARED_BUFFER, col.bufferSize());
+ // Now the first 1025 rows should all be the shared buffer,
+ // because 1025 * size < MAX_SIZE_FOR_SMALL_BUFFER
+ for(int row=0; row < 1025; ++row) {
+ assertSame("row " + row, sharedBuffer, col.vector[row]);
+ assertEquals("row " + row, row * size, col.start[row]);
+ assertEquals("row " + row, size, col.length[row]);
+ }
+ // the rest should be custom buffers
+ for(int row=1025; row < col.vector.length; ++row) {
+ assertNotSame("row " + row, sharedBuffer, col.vector[row]);
+ assertEquals("row " + row, 0, col.start[row]);
+ assertEquals("row " + row, size, col.length[row]);
}
}
// Write a value to the column vector, and return back the byte buffer used.
- private static byte[] writeToBytesColumnVector(int rowIdx, BytesColumnVector col, int writeSize, byte val) {
+ private static byte[] writeToBytesColumnVector(int rowIdx, BytesColumnVector col, int writeSize, int val) {
col.ensureValPreallocated(writeSize);
byte[] bytes = col.getValPreallocatedBytes();
int startIdx = col.getValPreallocatedStart();
- Arrays.fill(bytes, startIdx, startIdx + writeSize, val);
+ Arrays.fill(bytes, startIdx, startIdx + writeSize, (byte) val);
col.setValPreallocated(rowIdx, writeSize);
return bytes;
}