You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/02/02 21:20:44 UTC

hive git commit: HIVE-15700: BytesColumnVector can get stuck trying to resize byte buffer (Jason Dere, reviewed by Matt McCline)

Repository: hive
Updated Branches:
  refs/heads/master f766b8fe1 -> ed3b05185


HIVE-15700: BytesColumnVector can get stuck trying to resize byte buffer (Jason Dere, reviewed by Matt McCline)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ed3b0518
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ed3b0518
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ed3b0518

Branch: refs/heads/master
Commit: ed3b0518536ed902cb49ed41b97fa6d1ecafcdb1
Parents: f766b8f
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Feb 2 13:20:11 2017 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Feb 2 13:20:11 2017 -0800

----------------------------------------------------------------------
 .../TestVectorStringExpressions.java            |  5 +-
 .../hive/ql/exec/vector/BytesColumnVector.java  | 89 +++++++++++++++----
 .../ql/exec/vector/TestBytesColumnVector.java   | 90 ++++++++++++++++++++
 3 files changed, 166 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ed3b0518/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestVectorStringExpressions.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestVectorStringExpressions.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestVectorStringExpressions.java
index 5c323ba..d97152f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestVectorStringExpressions.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestVectorStringExpressions.java
@@ -3178,6 +3178,8 @@ public class TestVectorStringExpressions {
   public void testLoadBytesColumnVectorByValueLargeData()  {
     BytesColumnVector bcv = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
     bcv.initBuffer(10); // initialize with estimated element size 10
+    // Record initial buffer size
+    int initialBufferSize = bcv.bufferSize();
     String s = "0123456789";
     while (s.length() < 500) {
       s += s;
@@ -3191,7 +3193,8 @@ public class TestVectorStringExpressions {
     for (int i = 0; i != VectorizedRowBatch.DEFAULT_SIZE; i++) {
       bcv.setVal(i, b, 0, b.length);
     }
-    Assert.assertTrue(bcv.bufferSize() >= b.length * VectorizedRowBatch.DEFAULT_SIZE);
+    // Current buffer size should be larger than initial size
+    Assert.assertTrue(bcv.bufferSize() > initialBufferSize);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/ed3b0518/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
index 552982c..bbd9ca6 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
@@ -43,15 +43,27 @@ public class BytesColumnVector extends ColumnVector {
    * in vector[0] and isRepeating from the superclass is set to true.
    */
   public int[] length;
+
+  // A call to increaseBufferSpace() or ensureValPreallocated() will ensure that buffer[] points to
+  // a byte[] with sufficient space for the specified size.
   private byte[] buffer;   // optional buffer to use when actually copying in data
   private int nextFree;    // next free position in buffer
 
+  // Hang onto a byte array for holding smaller byte values
+  private byte[] smallBuffer;
+  private int smallBufferNextFree;
+
+  private int bufferAllocationCount;
+
   // Estimate that there will be 16 bytes per entry
   static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE;
 
   // Proportion of extra space to provide when allocating more buffer space.
   static final float EXTRA_SPACE_FACTOR = (float) 1.2;
 
+  // Largest size allowed in smallBuffer
+  static final int MAX_SIZE_FOR_SMALL_BUFFER = 1024 * 1024;
+
   /**
    * Use this constructor for normal operation.
    * All column vectors should be the default size normally.
@@ -103,18 +115,27 @@ public class BytesColumnVector extends ColumnVector {
    */
   public void initBuffer(int estimatedValueSize) {
     nextFree = 0;
+    smallBufferNextFree = 0;
 
     // if buffer is already allocated, keep using it, don't re-allocate
     if (buffer != null) {
-      return;
-    }
-
-    // allocate a little extra space to limit need to re-allocate
-    int bufferSize = this.vector.length * (int)(estimatedValueSize * EXTRA_SPACE_FACTOR);
-    if (bufferSize < DEFAULT_BUFFER_SIZE) {
-      bufferSize = DEFAULT_BUFFER_SIZE;
+      // Free up any previously allocated buffers that are referenced by vector
+      if (bufferAllocationCount > 0) {
+        for (int idx = 0; idx < vector.length; ++idx) {
+          vector[idx] = null;
+        }
+        buffer = smallBuffer; // In case last row was a large bytes value
+      }
+    } else {
+      // allocate a little extra space to limit need to re-allocate
+      int bufferSize = this.vector.length * (int)(estimatedValueSize * EXTRA_SPACE_FACTOR);
+      if (bufferSize < DEFAULT_BUFFER_SIZE) {
+        bufferSize = DEFAULT_BUFFER_SIZE;
+      }
+      buffer = new byte[bufferSize];
+      smallBuffer = buffer;
     }
-    buffer = new byte[bufferSize];
+    bufferAllocationCount = 0;
   }
 
   /**
@@ -238,17 +259,51 @@ public class BytesColumnVector extends ColumnVector {
    * @param nextElemLength size of next element to be added
    */
   public void increaseBufferSpace(int nextElemLength) {
+    // A call to increaseBufferSpace() or ensureValPreallocated() will ensure that buffer[] points to
+    // a byte[] with sufficient space for the specified size.
+    // This will either point to smallBuffer, or to a newly allocated byte array for larger values.
+
+    if (nextElemLength > MAX_SIZE_FOR_SMALL_BUFFER) {
+      // Larger allocations will be special-cased and will not use the normal buffer.
+      // buffer/nextFree will be set to a newly allocated array just for the current row.
+      // The next row will require another call to increaseBufferSpace() since this new buffer should be used up.
+      byte[] newBuffer = new byte[nextElemLength];
+      ++bufferAllocationCount;
+      // If the buffer was pointing to smallBuffer, then nextFree keeps track of the current state
+      // of the free index for smallBuffer. We now need to save this value to smallBufferNextFree
+      // so we don't lose this. A bit of a weird dance here.
+      if (smallBuffer == buffer) {
+        smallBufferNextFree = nextFree;
+      }
+      buffer = newBuffer;
+      nextFree = 0;
+    } else {
+      // This value should go into smallBuffer.
+      if (smallBuffer != buffer) {
+        // Previous row was for a large bytes value ( > MAX_SIZE_FOR_SMALL_BUFFER).
+        // Use smallBuffer if possible.
+        buffer = smallBuffer;
+        nextFree = smallBufferNextFree;
+      }
 
-    // Keep doubling buffer size until there will be enough space for next element.
-    int newLength = 2 * buffer.length;
-    while((nextFree + nextElemLength) > newLength) {
-      newLength *= 2;
+      // smallBuffer might still be out of space
+      if ((nextFree + nextElemLength) > buffer.length) {
+        int newLength = smallBuffer.length * 2;
+        while (newLength < nextElemLength) {
+          if (newLength < 0) {
+            throw new RuntimeException("Overflow of newLength. smallBuffer.length="
+                + smallBuffer.length + ", nextElemLength=" + nextElemLength);
+          }
+          newLength *= 2;
+        }
+        smallBuffer = new byte[newLength];
+        ++bufferAllocationCount;
+        smallBufferNextFree = 0;
+        // Update buffer
+        buffer = smallBuffer;
+        nextFree = 0;
+      }
     }
-
-    // Allocate new buffer, copy data to it, and set buffer to new buffer.
-    byte[] newBuffer = new byte[newLength];
-    System.arraycopy(buffer, 0, newBuffer, 0, nextFree);
-    buffer = newBuffer;
   }
 
   /** Copy the current object contents into the output. Only copy selected entries,

http://git-wip-us.apache.org/repos/asf/hive/blob/ed3b0518/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java
----------------------------------------------------------------------
diff --git a/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java b/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java
new file mode 100644
index 0000000..e14abf1
--- /dev/null
+++ b/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestBytesColumnVector {
+  @Test
+  public void testSmallBufferReuse() {
+    BytesColumnVector col = new BytesColumnVector();
+    int smallWriteSize = 1024;
+    int largeWriteSize = 1024 * 1024 * 2;
+
+    int rowIdx = 0;
+    int bytesWrittenToBytes1 = 0;
+    col.reset();
+
+    // Initial write (small value)
+    byte[] bytes1 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1);
+    bytesWrittenToBytes1 += smallWriteSize;
+
+    // Write a large value. This should use a different byte buffer
+    rowIdx++;
+    byte[] bytes2 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, (byte) 2);
+    assertFalse(bytes1 == bytes2);
+
+    // Another small write. smallBuffer should be re-used for this write
+    rowIdx++;
+    byte[] bytes3 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1);
+    bytesWrittenToBytes1 += smallWriteSize;
+    assertTrue(bytes1 == bytes3);
+
+    // Write another large value. This should use a different byte buffer
+    rowIdx++;
+    byte[] bytes4 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, (byte) 3);
+    assertFalse(bytes1 == bytes4);
+    assertFalse(bytes2 == bytes4);
+
+    // Eventually enough small writes should result in another buffer getting created
+    boolean gotNewBuffer = false;
+    // Test is dependent on getting a new buffer within 1MB.
+    // This may need to change as the implementation changes.
+    for (int i = 0; i < 1024; ++i) {
+      rowIdx++;
+      byte[] currBytes = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1);
+      if (currBytes == bytes1) {
+        bytesWrittenToBytes1 += smallWriteSize;
+      } else {
+        gotNewBuffer = true;
+        break;
+      }
+    }
+
+    assertTrue(gotNewBuffer);
+
+    // All small writes to the first buffer should be in contiguous memory
+    for (int i = 0; i < bytesWrittenToBytes1; ++i) {
+      assertEquals((byte) 1, bytes1[i]);
+    }
+  }
+
+  // Write a value to the column vector, and return back the byte buffer used.
+  private static byte[] writeToBytesColumnVector(int rowIdx, BytesColumnVector col, int writeSize, byte val) {
+    col.ensureValPreallocated(writeSize);
+    byte[] bytes = col.getValPreallocatedBytes();
+    int startIdx = col.getValPreallocatedStart();
+    Arrays.fill(bytes, startIdx, startIdx + writeSize, val);
+    col.setValPreallocated(rowIdx, writeSize);
+    return bytes;
+  }
+}