You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2021/07/19 21:19:46 UTC

[hive] branch storage-branch-2.8 updated: HIVE-25190: Fix many small allocations in BytesColumnVector

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch storage-branch-2.8
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/storage-branch-2.8 by this push:
     new b304acd  HIVE-25190: Fix many small allocations in BytesColumnVector
b304acd is described below

commit b304acd36334e54d276e9ded5851ae24d2f23595
Author: Owen O'Malley <oo...@linkedin.com>
AuthorDate: Fri Jun 18 16:30:13 2021 -0700

    HIVE-25190: Fix many small allocations in BytesColumnVector
    
    Fixes #2408
    
    Signed-off-by: Owen O'Malley <oo...@linkedin.com>
---
 storage-api/pom.xml                                |   2 +-
 .../hive/ql/exec/vector/BytesColumnVector.java     | 163 ++++++++++-----------
 .../hive/ql/exec/vector/TestBytesColumnVector.java | 124 ++++++++++++++--
 3 files changed, 187 insertions(+), 102 deletions(-)

diff --git a/storage-api/pom.xml b/storage-api/pom.xml
index 53fa3c0..c87aed7 100644
--- a/storage-api/pom.xml
+++ b/storage-api/pom.xml
@@ -185,7 +185,7 @@
         <version>3.0.0-M4</version>
         <configuration>
           <reuseForks>false</reuseForks>
-          <argLine>-Xmx2048m</argLine>
+          <argLine>-Xmx3g</argLine>
           <failIfNoTests>false</failIfNoTests>
           <systemPropertyVariables>
             <test.tmp.dir>${project.build.directory}/tmp</test.tmp.dir>
diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
index 6618807..a8c58ac 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
@@ -49,14 +49,15 @@ public class BytesColumnVector extends ColumnVector {
    */
   public int[] length;
 
-  // A call to increaseBufferSpace() or ensureValPreallocated() will ensure that buffer[] points to
-  // a byte[] with sufficient space for the specified size.
-  private byte[] buffer;   // optional buffer to use when actually copying in data
-  private int nextFree;    // next free position in buffer
+  // Calls to ensureValPreallocated() ensure that currentValue and currentOffset
+  // are set to enough space for the value.
+  private byte[] currentValue;   // bytes for the next value
+  private int currentOffset;    // starting position in the current buffer
 
-  // Hang onto a byte array for holding smaller byte values
-  private byte[] smallBuffer;
-  private int smallBufferNextFree;
+  // A shared static buffer allocation that we use for the small values
+  private byte[] sharedBuffer;
+  // The next unused offset in the sharedBuffer.
+  private int sharedBufferOffset;
 
   private int bufferAllocationCount;
 
@@ -66,8 +67,11 @@ public class BytesColumnVector extends ColumnVector {
   // Proportion of extra space to provide when allocating more buffer space.
   static final float EXTRA_SPACE_FACTOR = (float) 1.2;
 
-  // Largest size allowed in smallBuffer
-  static final int MAX_SIZE_FOR_SMALL_BUFFER = 1024 * 1024;
+  // Largest item size allowed in sharedBuffer
+  static final int MAX_SIZE_FOR_SMALL_ITEM = 1024 * 1024;
+
+  // Largest size allowed for sharedBuffer
+  static final int MAX_SIZE_FOR_SHARED_BUFFER = 1024 * 1024 * 1024;
 
   /**
    * Use this constructor for normal operation.
@@ -121,30 +125,30 @@ public class BytesColumnVector extends ColumnVector {
    * Provide the estimated number of bytes needed to hold
    * a full column vector worth of byte string data.
    *
-   * @param estimatedValueSize  Estimated size of buffer space needed
+   * @param estimatedValueSize  Estimated size of buffer space needed per row
    */
   public void initBuffer(int estimatedValueSize) {
-    nextFree = 0;
-    smallBufferNextFree = 0;
+    sharedBufferOffset = 0;
 
     // if buffer is already allocated, keep using it, don't re-allocate
-    if (buffer != null) {
+    if (sharedBuffer != null) {
       // Free up any previously allocated buffers that are referenced by vector
       if (bufferAllocationCount > 0) {
         for (int idx = 0; idx < vector.length; ++idx) {
           vector[idx] = null;
           length[idx] = 0;
         }
-        buffer = smallBuffer; // In case last row was a large bytes value
       }
     } else {
       // allocate a little extra space to limit need to re-allocate
-      int bufferSize = this.vector.length * (int)(estimatedValueSize * EXTRA_SPACE_FACTOR);
+      long bufferSize = (long) (this.vector.length * estimatedValueSize * EXTRA_SPACE_FACTOR);
       if (bufferSize < DEFAULT_BUFFER_SIZE) {
         bufferSize = DEFAULT_BUFFER_SIZE;
       }
-      buffer = new byte[bufferSize];
-      smallBuffer = buffer;
+      if (bufferSize > MAX_SIZE_FOR_SHARED_BUFFER) {
+        bufferSize = MAX_SIZE_FOR_SHARED_BUFFER;
+      }
+      sharedBuffer = new byte[(int) bufferSize];
     }
     bufferAllocationCount = 0;
   }
@@ -160,10 +164,7 @@ public class BytesColumnVector extends ColumnVector {
    * @return amount of buffer space currently allocated
    */
   public int bufferSize() {
-    if (buffer == null) {
-      return 0;
-    }
-    return buffer.length;
+    return sharedBuffer == null ? 0 : sharedBuffer.length;
   }
 
   /**
@@ -182,16 +183,13 @@ public class BytesColumnVector extends ColumnVector {
    * @param length  length of source byte sequence
    */
   public void setVal(int elementNum, byte[] sourceBuf, int start, int length) {
-    if ((nextFree + length) > buffer.length) {
-      increaseBufferSpace(length);
-    }
+    ensureValPreallocated(length);
     if (length > 0) {
-      System.arraycopy(sourceBuf, start, buffer, nextFree, length);
+      System.arraycopy(sourceBuf, start, currentValue, currentOffset, length);
     }
-    vector[elementNum] = buffer;
-    this.start[elementNum] = nextFree;
+    vector[elementNum] = currentValue;
+    this.start[elementNum] = currentOffset;
     this.length[elementNum] = length;
-    nextFree += length;
   }
 
   /**
@@ -212,23 +210,31 @@ public class BytesColumnVector extends ColumnVector {
   }
 
   /**
-   * Preallocate space in the local buffer so the caller can fill in the value bytes themselves.
+   * Ensures that we have space allocated for the next value, which has size
+   * length bytes.
+   *
+   * Updates currentValue, currentOffset, and sharedBufferOffset for this value.
    *
-   * Always use with getValPreallocatedBytes, getValPreallocatedStart, and setValPreallocated.
+   * Always use before getValPreallocatedBytes, getValPreallocatedStart,
+   * and setValPreallocated.
    */
   public void ensureValPreallocated(int length) {
-    if ((nextFree + length) > buffer.length) {
-      increaseBufferSpace(length);
+    if ((sharedBufferOffset + length) > sharedBuffer.length) {
+      currentValue = allocateBuffer(length);
+    } else {
+      currentValue = sharedBuffer;
+      currentOffset = sharedBufferOffset;
+      sharedBufferOffset += length;
     }
   }
 
   @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "Expose internal rep for efficiency")
   public byte[] getValPreallocatedBytes() {
-    return buffer;
+    return currentValue;
   }
 
   public int getValPreallocatedStart() {
-    return nextFree;
+    return currentOffset;
   }
 
   /**
@@ -237,10 +243,9 @@ public class BytesColumnVector extends ColumnVector {
    * @param length
    */
   public void setValPreallocated(int elementNum, int length) {
-    vector[elementNum] = buffer;
-    this.start[elementNum] = nextFree;
+    vector[elementNum] = currentValue;
+    this.start[elementNum] = currentOffset;
     this.length[elementNum] = length;
-    nextFree += length;
   }
 
   /**
@@ -258,73 +263,55 @@ public class BytesColumnVector extends ColumnVector {
   public void setConcat(int elementNum, byte[] leftSourceBuf, int leftStart, int leftLen,
       byte[] rightSourceBuf, int rightStart, int rightLen) {
     int newLen = leftLen + rightLen;
-    if ((nextFree + newLen) > buffer.length) {
-      increaseBufferSpace(newLen);
-    }
-    vector[elementNum] = buffer;
-    this.start[elementNum] = nextFree;
+    ensureValPreallocated(newLen);
+    vector[elementNum] = currentValue;
+    this.start[elementNum] = currentOffset;
     this.length[elementNum] = newLen;
 
-    System.arraycopy(leftSourceBuf, leftStart, buffer, nextFree, leftLen);
-    nextFree += leftLen;
-    System.arraycopy(rightSourceBuf, rightStart, buffer, nextFree, rightLen);
-    nextFree += rightLen;
+    System.arraycopy(leftSourceBuf, leftStart, currentValue, currentOffset, leftLen);
+    System.arraycopy(rightSourceBuf, rightStart, currentValue,
+        currentOffset + leftLen, rightLen);
   }
 
   /**
-   * Increase buffer space enough to accommodate next element.
+   * Allocate/reuse enough buffer space to accommodate next element.
+   * currentOffset is set to the first available byte in the returned array.
+   * If sharedBuffer is used, sharedBufferOffset is updated to point after the
+   * current record.
+   *
    * This uses an exponential increase mechanism to rapidly
    * increase buffer size to enough to hold all data.
    * As batches get re-loaded, buffer space allocated will quickly
    * stabilize.
    *
    * @param nextElemLength size of next element to be added
+   * @return the buffer to use for the next element
    */
-  public void increaseBufferSpace(int nextElemLength) {
-    // A call to increaseBufferSpace() or ensureValPreallocated() will ensure that buffer[] points to
-    // a byte[] with sufficient space for the specified size.
-    // This will either point to smallBuffer, or to a newly allocated byte array for larger values.
-
-    if (nextElemLength > MAX_SIZE_FOR_SMALL_BUFFER) {
-      // Larger allocations will be special-cased and will not use the normal buffer.
-      // buffer/nextFree will be set to a newly allocated array just for the current row.
-      // The next row will require another call to increaseBufferSpace() since this new buffer should be used up.
-      byte[] newBuffer = new byte[nextElemLength];
+  private byte[] allocateBuffer(int nextElemLength) {
+    // If this is a large value or shared buffer is maxed out, allocate a
+    // single use buffer. Assumes that sharedBuffer length and
+    // MAX_SIZE_FOR_SHARED_BUFFER are powers of 2.
+    if (nextElemLength > MAX_SIZE_FOR_SMALL_ITEM ||
+        sharedBufferOffset + nextElemLength >= MAX_SIZE_FOR_SHARED_BUFFER) {
+      // allocate a value for the next value
       ++bufferAllocationCount;
-      // If the buffer was pointing to smallBuffer, then nextFree keeps track of the current state
-      // of the free index for smallBuffer. We now need to save this value to smallBufferNextFree
-      // so we don't lose this. A bit of a weird dance here.
-      if (smallBuffer == buffer) {
-        smallBufferNextFree = nextFree;
-      }
-      buffer = newBuffer;
-      nextFree = 0;
+      currentOffset = 0;
+      return new byte[nextElemLength];
     } else {
-      // This value should go into smallBuffer.
-      if (smallBuffer != buffer) {
-        // Previous row was for a large bytes value ( > MAX_SIZE_FOR_SMALL_BUFFER).
-        // Use smallBuffer if possible.
-        buffer = smallBuffer;
-        nextFree = smallBufferNextFree;
-      }
 
-      // smallBuffer might still be out of space
-      if ((nextFree + nextElemLength) > buffer.length) {
-        int newLength = smallBuffer.length * 2;
+      // sharedBuffer might still be out of space
+      if ((sharedBufferOffset + nextElemLength) > sharedBuffer.length) {
+        int newLength = sharedBuffer.length * 2;
         while (newLength < nextElemLength) {
-          if (newLength > 0) {
-            newLength *= 2;
-          } else { // integer overflow happened; maximize size of next smallBuffer
-            newLength = Integer.MAX_VALUE;
-          }
+          newLength *= 2;
         }
-        smallBuffer = new byte[newLength];
+        sharedBuffer = new byte[newLength];
         ++bufferAllocationCount;
-        smallBufferNextFree = 0;
-        // Update buffer
-        buffer = smallBuffer;
-        nextFree = 0;
+        sharedBufferOffset = 0;
       }
+      currentOffset = sharedBufferOffset;
+      sharedBufferOffset += nextElemLength;
+      return sharedBuffer;
     }
   }
 
@@ -575,10 +562,12 @@ public class BytesColumnVector extends ColumnVector {
   public void shallowCopyTo(ColumnVector otherCv) {
     BytesColumnVector other = (BytesColumnVector)otherCv;
     super.shallowCopyTo(other);
-    other.nextFree = nextFree;
+    other.currentOffset = currentOffset;
     other.vector = vector;
     other.start = start;
     other.length = length;
-    other.buffer = buffer;
+    other.currentValue = currentValue;
+    other.sharedBuffer = sharedBuffer;
+    other.sharedBufferOffset = sharedBufferOffset;
   }
 }
diff --git a/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java b/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java
index da89122..3b42d17 100644
--- a/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java
+++ b/storage-api/src/test/org/apache/hadoop/hive/ql/exec/vector/TestBytesColumnVector.java
@@ -18,10 +18,16 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
 import org.junit.Test;
-import static org.junit.Assert.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 
 public class TestBytesColumnVector {
   @Test
@@ -35,33 +41,41 @@ public class TestBytesColumnVector {
     col.reset();
 
     // Initial write (small value)
-    byte[] bytes1 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1);
+    byte[] bytes1 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, 1);
     bytesWrittenToBytes1 += smallWriteSize;
+    assertEquals(0, col.start[0]);
+    assertEquals(smallWriteSize, col.length[0]);
 
     // Write a large value. This should use a different byte buffer
     rowIdx++;
-    byte[] bytes2 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, (byte) 2);
-    assertFalse(bytes1 == bytes2);
+    byte[] bytes2 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, -1);
+    assertNotSame(bytes1, bytes2);
+    assertEquals(0, col.start[1]);
+    assertEquals(largeWriteSize, col.length[1]);
 
     // Another small write. smallBuffer should be re-used for this write
     rowIdx++;
-    byte[] bytes3 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1);
+    byte[] bytes3 = writeToBytesColumnVector(rowIdx, col, smallWriteSize, 2);
     bytesWrittenToBytes1 += smallWriteSize;
-    assertTrue(bytes1 == bytes3);
+    assertSame(bytes1, bytes3);
+    assertEquals(smallWriteSize, col.start[2]);
+    assertEquals(smallWriteSize, col.length[2]);
 
     // Write another large value. This should use a different byte buffer
     rowIdx++;
-    byte[] bytes4 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, (byte) 3);
-    assertFalse(bytes1 == bytes4);
-    assertFalse(bytes2 == bytes4);
+    byte[] bytes4 = writeToBytesColumnVector(rowIdx, col, largeWriteSize, -2);
+    assertNotSame(bytes1, bytes4);
+    assertNotSame(bytes2, bytes4);
+    assertEquals(0, col.start[3]);
+    assertEquals(largeWriteSize, col.length[3]);
 
     // Eventually enough small writes should result in another buffer getting created
     boolean gotNewBuffer = false;
     // Test is dependent on getting a new buffer within 1MB.
     // This may need to change as the implementation changes.
-    for (int i = 0; i < 1024; ++i) {
+    for (int i = 3; i < 1024; ++i) {
       rowIdx++;
-      byte[] currBytes = writeToBytesColumnVector(rowIdx, col, smallWriteSize, (byte) 1);
+      byte[] currBytes = writeToBytesColumnVector(rowIdx, col, smallWriteSize, i);
       if (currBytes == bytes1) {
         bytesWrittenToBytes1 += smallWriteSize;
       } else {
@@ -74,16 +88,98 @@ public class TestBytesColumnVector {
 
     // All small writes to the first buffer should be in contiguous memory
     for (int i = 0; i < bytesWrittenToBytes1; ++i) {
-      assertEquals((byte) 1, bytes1[i]);
+      assertEquals((byte) (i / smallWriteSize + 1), bytes1[i]);
+    }
+  }
+
+  /**
+   * Test the setVal, setConcat, and StringExpr.padRight methods.
+   */
+  @Test
+  public void testConcatAndPadding() {
+    BytesColumnVector col = new BytesColumnVector();
+    col.reset();
+    byte[] prefix = "緑".getBytes(StandardCharsets.UTF_8);
+
+    // fill the column with 'test'
+    for(int row=0; row < col.vector.length; ++row) {
+      col.setVal(row, prefix, 0, prefix.length);
+    }
+    for(int row=0; row < col.vector.length; ++row) {
+      assertEquals("row " + row, "緑", col.toString(row));
+    }
+
+    // pad out to 6 characters
+    for(int row=0; row < col.vector.length; ++row) {
+      StringExpr.padRight(col, row, col.vector[row], col.start[row],
+          col.length[row], 6);
+    }
+    for(int row=0; row < col.vector.length; ++row) {
+      assertEquals("row " + row, "緑     ", col.toString(row));
+    }
+
+    // concat the row digits
+    for(int row=0; row < col.vector.length; ++row) {
+      byte[] rowStr = Integer.toString(row).getBytes(StandardCharsets.UTF_8);
+      col.setConcat(row, col.vector[row], col.start[row], col.length[row],
+          rowStr, 0, rowStr.length);
+    }
+    for(int row=0; row < col.vector.length; ++row) {
+      assertEquals("row " + row, "緑     " + row, col.toString(row));
+    }
+
+    // We end up allocating 20k, so we should have expanded the small buffer
+    assertEquals(32 * 1024, col.bufferSize());
+  }
+
+  @Test
+  public void testBufferOverflow() {
+    BytesColumnVector col = new BytesColumnVector(2048);
+    col.reset();
+    assertEquals(BytesColumnVector.DEFAULT_BUFFER_SIZE, col.bufferSize());
+
+    // pick a size below 1m so that we use the small buffer;
+    final int size = BytesColumnVector.MAX_SIZE_FOR_SMALL_ITEM - 1024;
+
+    // run through once to expand the small value buffer
+    for(int row=0; row < col.vector.length; ++row) {
+      writeToBytesColumnVector(row, col, size, row);
+    }
+    // it should have resized a bunch of times
+    byte[] sharedBuffer = col.getValPreallocatedBytes();
+    assertNotSame(sharedBuffer, col.vector[0]);
+    assertSame(sharedBuffer, col.vector[1024]);
+
+    // reset the column, but make sure the buffer isn't reallocated
+    col.reset();
+    assertEquals(BytesColumnVector.MAX_SIZE_FOR_SHARED_BUFFER, col.bufferSize());
+
+    // fill up the vector now with the large buffer
+    for(int row=0; row < col.vector.length; ++row) {
+      writeToBytesColumnVector(row, col, size, row);
+    }
+    assertEquals(BytesColumnVector.MAX_SIZE_FOR_SHARED_BUFFER, col.bufferSize());
+    // Now the first 1025 rows should all be the shared buffer,
+    // because 1025 * size < MAX_SIZE_FOR_SMALL_BUFFER
+    for(int row=0; row < 1025; ++row) {
+      assertSame("row " + row, sharedBuffer, col.vector[row]);
+      assertEquals("row " + row, row * size, col.start[row]);
+      assertEquals("row " + row, size, col.length[row]);
+    }
+    // the rest should be custom buffers
+    for(int row=1025; row < col.vector.length; ++row) {
+      assertNotSame("row " + row, sharedBuffer, col.vector[row]);
+      assertEquals("row " + row, 0, col.start[row]);
+      assertEquals("row " + row, size, col.length[row]);
     }
   }
 
   // Write a value to the column vector, and return back the byte buffer used.
-  private static byte[] writeToBytesColumnVector(int rowIdx, BytesColumnVector col, int writeSize, byte val) {
+  private static byte[] writeToBytesColumnVector(int rowIdx, BytesColumnVector col, int writeSize, int val) {
     col.ensureValPreallocated(writeSize);
     byte[] bytes = col.getValPreallocatedBytes();
     int startIdx = col.getValPreallocatedStart();
-    Arrays.fill(bytes, startIdx, startIdx + writeSize, val);
+    Arrays.fill(bytes, startIdx, startIdx + writeSize, (byte) val);
     col.setValPreallocated(rowIdx, writeSize);
     return bytes;
   }