You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/02/02 21:20:44 UTC
hive git commit: HIVE-15700: BytesColumnVector can get stuck trying
to resize byte buffer (Jason Dere, reviewed by Matt McCline)
Repository: hive
Updated Branches:
refs/heads/master f766b8fe1 -> ed3b05185
HIVE-15700: BytesColumnVector can get stuck trying to resize byte buffer (Jason Dere, reviewed by Matt McCline)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ed3b0518
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ed3b0518
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ed3b0518
Branch: refs/heads/master
Commit: ed3b0518536ed902cb49ed41b97fa6d1ecafcdb1
Parents: f766b8f
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Feb 2 13:20:11 2017 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Feb 2 13:20:11 2017 -0800
----------------------------------------------------------------------
.../TestVectorStringExpressions.java | 5 +-
.../hive/ql/exec/vector/BytesColumnVector.java | 89 +++++++++++++++----
.../ql/exec/vector/TestBytesColumnVector.java | 90 ++++++++++++++++++++
3 files changed, 166 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ed3b0518/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestVectorStringExpressions.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestVectorStringExpressions.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestVectorStringExpressions.java
index 5c323ba..d97152f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestVectorStringExpressions.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestVectorStringExpressions.java
@@ -3178,6 +3178,8 @@ public class TestVectorStringExpressions {
public void testLoadBytesColumnVectorByValueLargeData() {
BytesColumnVector bcv = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
bcv.initBuffer(10); // initialize with estimated element size 10
+ // Record initial buffer size
+ int initialBufferSize = bcv.bufferSize();
String s = "0123456789";
while (s.length() < 500) {
s += s;
@@ -3191,7 +3193,8 @@ public class TestVectorStringExpressions {
for (int i = 0; i != VectorizedRowBatch.DEFAULT_SIZE; i++) {
bcv.setVal(i, b, 0, b.length);
}
- Assert.assertTrue(bcv.bufferSize() >= b.length * VectorizedRowBatch.DEFAULT_SIZE);
+ // Current buffer size should be larger than initial size
+ Assert.assertTrue(bcv.bufferSize() > initialBufferSize);
}
@Test
http://git-wip-us.apache.org/repos/asf/hive/blob/ed3b0518/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
index 552982c..bbd9ca6 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
@@ -43,15 +43,27 @@ public class BytesColumnVector extends ColumnVector {
* in vector[0] and isRepeating from the superclass is set to true.
*/
public int[] length;
+
+ // A call to increaseBufferSpace() or ensureValPreallocated() will ensure that buffer[] points to
+ // a byte[] with sufficient space for the specified size.
private byte[] buffer; // optional buffer to use when actually copying in data
private int nextFree; // next free position in buffer
+ // Hang onto a byte array for holding smaller byte values
+ private byte[] smallBuffer;
+ private int smallBufferNextFree;
+
+ private int bufferAllocationCount;
+
// Estimate that there will be 16 bytes per entry
static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE;
// Proportion of extra space to provide when allocating more buffer space.
static final float EXTRA_SPACE_FACTOR = (float) 1.2;
+ // Largest size allowed in smallBuffer
+ static final int MAX_SIZE_FOR_SMALL_BUFFER = 1024 * 1024;
+
/**
* Use this constructor for normal operation.
* All column vectors should be the default size normally.
@@ -103,18 +115,27 @@ public class BytesColumnVector extends ColumnVector {
*/
public void initBuffer(int estimatedValueSize) {
nextFree = 0;
+ smallBufferNextFree = 0;
// if buffer is already allocated, keep using it, don't re-allocate
if (buffer != null) {
- return;
- }
-
- // allocate a little extra space to limit need to re-allocate
- int bufferSize = this.vector.length * (int)(estimatedValueSize * EXTRA_SPACE_FACTOR);
- if (bufferSize < DEFAULT_BUFFER_SIZE) {
- bufferSize = DEFAULT_BUFFER_SIZE;
+ // Free up any previously allocated buffers that are referenced by vector
+ if (bufferAllocationCount > 0) {
+ for (int idx = 0; idx < vector.length; ++idx) {
+ vector[idx] = null;
+ }
+ buffer = smallBuffer; // In case last row was a large bytes value
+ }
+ } else {
+ // allocate a little extra space to limit need to re-allocate
+ int bufferSize = this.vector.length * (int)(estimatedValueSize * EXTRA_SPACE_FACTOR);
+ if (bufferSize < DEFAULT_BUFFER_SIZE) {
+ bufferSize = DEFAULT_BUFFER_SIZE;
+ }
+ buffer = new byte[bufferSize];
+ smallBuffer = buffer;
}
- buffer = new byte[bufferSize];
+ bufferAllocationCount = 0;
}
/**
@@ -238,17 +259,51 @@ public class BytesColumnVector extends ColumnVector {
* @param nextElemLength size of next element to be added
*/
public void increaseBufferSpace(int nextElemLength) {
+ // A call to increaseBufferSpace() or ensureValPreallocated() will ensure that buffer[] points to
+ // a byte[] with sufficient space for the specified size.
+ // This will either point to smallBuffer, or to a newly allocated byte array for larger values.
+
+ if (nextElemLength > MAX_SIZE_FOR_SMALL_BUFFER) {
+ // Larger allocations will be special-cased and will not use the normal buffer.
+ // buffer/nextFree will be set to a newly allocated array just for the current row.
+ // The next row will require another call to increaseBufferSpace() since this new buffer should be used up.
+ byte[] newBuffer = new byte[nextElemLength];
+ ++bufferAllocationCount;
+ // If the buffer was pointing to smallBuffer, then nextFree keeps track of the current state
+ // of the free index for smallBuffer. We now need to save this value to smallBufferNextFree
+ // so we don't lose this. A bit of a weird dance here.
+ if (smallBuffer == buffer) {
+ smallBufferNextFree = nextFree;
+ }
+ buffer = newBuffer;
+ nextFree = 0;
+ } else {
+ // This value should go into smallBuffer.
+ if (smallBuffer != buffer) {
+ // Previous row was for a large bytes value ( > MAX_SIZE_FOR_SMALL_BUFFER).
+ // Use smallBuffer if possible.
+ buffer = smallBuffer;
+ nextFree = smallBufferNextFree;
+ }
- // Keep doubling buffer size until there will be enough space for next element.
- int newLength = 2 * buffer.length;
- while((nextFree + nextElemLength) > newLength) {
- newLength *= 2;
+ // smallBuffer might still be out of space
+ if ((nextFree + nextElemLength) > buffer.length) {
+ int newLength = smallBuffer.length * 2;
+ while (newLength < nextElemLength) {
+ if (newLength < 0) {
+ throw new RuntimeException("Overflow of newLength. smallBuffer.length="
+ + smallBuffer.length + ", nextElemLength=" + nextElemLength);
+ }
+ newLength *= 2;
+ }
+ smallBuffer = new byte[newLength];
+ ++bufferAllocationCount;
+ smallBufferNextFree = 0;
+ // Update buffer
+ buffer = smallBuffer;
+ nextFree = 0;
+ }
}
-
- // Allocate new buffer, copy data to it, and set buffer to new buffer.
- byte[] newBuffer = new byte[newLength];
- System.arraycopy(buffer, 0, newBuffer, 0, nextFree);
- buffer = newBuffer;
}
/** Copy the current object contents into the output. Only copy selected entries,
http://git-wip-us.apache.org/repos/asf/hive/blob/ed3b0518/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java b/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java
new file mode 100644
index 0000000..e14abf1
--- /dev/null
+++ b/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestBytesColumnVector {
+ @Test
+ public void testSmallBufferReuse() {
+ BytesColumnVector col = new BytesColumnVector();
+ int smallWriteSize = 1024;
+ int largeWriteSize = 1024 * 1024 * 2;
+
+ int rowIdx = 0;
+ int bytesWrittenToBytes1 = 0;
+ col.reset();
+
+ // Initial write (small value)
+ byte[] bytes1 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1);
+ bytesWrittenToBytes1 += smallWriteSize;
+
+ // Write a large value. This should use a different byte buffer
+ rowIdx++;
+ byte[] bytes2 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, (byte) 2);
+ assertFalse(bytes1 == bytes2);
+
+ // Another small write. smallBuffer should be re-used for this write
+ rowIdx++;
+ byte[] bytes3 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1);
+ bytesWrittenToBytes1 += smallWriteSize;
+ assertTrue(bytes1 == bytes3);
+
+ // Write another large value. This should use a different byte buffer
+ rowIdx++;
+ byte[] bytes4 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, (byte) 3);
+ assertFalse(bytes1 == bytes4);
+ assertFalse(bytes2 == bytes4);
+
+ // Eventually enough small writes should result in another buffer getting created
+ boolean gotNewBuffer = false;
+ // Test is dependent on getting a new buffer within 1MB.
+ // This may need to change as the implementation changes.
+ for (int i = 0; i < 1024; ++i) {
+ rowIdx++;
+ byte[] currBytes = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1);
+ if (currBytes == bytes1) {
+ bytesWrittenToBytes1 += smallWriteSize;
+ } else {
+ gotNewBuffer = true;
+ break;
+ }
+ }
+
+ assertTrue(gotNewBuffer);
+
+ // All small writes to the first buffer should be in contiguous memory
+ for (int i = 0; i < bytesWrittenToBytes1; ++i) {
+ assertEquals((byte) 1, bytes1[i]);
+ }
+ }
+
+ // Write a value to the column vector, and return back the byte buffer used.
+ private static byte[] writeToBytesColumnVector(int rowIdx, BytesColumnVector col, int writeSize, byte val) {
+ col.ensureValPreallocated(writeSize);
+ byte[] bytes = col.getValPreallocatedBytes();
+ int startIdx = col.getValPreallocatedStart();
+ Arrays.fill(bytes, startIdx, startIdx + writeSize, val);
+ col.setValPreallocated(rowIdx, writeSize);
+ return bytes;
+ }
+}