You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/07/20 05:42:58 UTC

[21/26] hbase git commit: HBASE-17738 BucketCache startup is slow (Ram)

HBASE-17738 BucketCache startup is slow (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d0e4a643
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d0e4a643
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d0e4a643

Branch: refs/heads/HBASE-18147
Commit: d0e4a643a0a1085c98485d37fb433bc8865bc0ad
Parents: f10f819
Author: Ramkrishna <ra...@intel.com>
Authored: Wed Jul 19 21:51:11 2017 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Wed Jul 19 21:51:11 2017 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/util/ByteBufferArray.java      | 79 ++++++++++++++++++--
 .../hadoop/hbase/util/TestByteBufferArray.java  | 25 +++++++
 2 files changed, 97 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d0e4a643/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
index 2bb820e..60f8c79 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
@@ -20,6 +20,13 @@ package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,6 +36,8 @@ import org.apache.hadoop.hbase.nio.MultiByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class manages an array of ByteBuffers with a default size 4MB. These
  * buffers are sequential and could be considered as a large buffer.It supports
@@ -39,7 +48,8 @@ public final class ByteBufferArray {
   private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
 
   public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
-  private ByteBuffer buffers[];
+  @VisibleForTesting
+  ByteBuffer buffers[];
   private int bufferSize;
   private int bufferCount;
 
@@ -62,13 +72,68 @@ public final class ByteBufferArray {
         + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
         + bufferCount + ", direct=" + directByteBuffer);
     buffers = new ByteBuffer[bufferCount + 1];
-    for (int i = 0; i <= bufferCount; i++) {
-      if (i < bufferCount) {
-        buffers[i] = allocator.allocate(bufferSize, directByteBuffer);
-      } else {
-        // always create on heap
-        buffers[i] = ByteBuffer.allocate(0);
+    createBuffers(directByteBuffer, allocator);
+  }
+
+  private void createBuffers(boolean directByteBuffer, ByteBufferAllocator allocator)
+      throws IOException {
+    int threadCount = Runtime.getRuntime().availableProcessors();
+    ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L,
+        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+    int perThreadCount = Math.round((float) (bufferCount) / threadCount);
+    int lastThreadCount = bufferCount - (perThreadCount * (threadCount - 1));
+    Future<ByteBuffer[]>[] futures = new Future[threadCount];
+    try {
+      for (int i = 0; i < threadCount; i++) {
+        // Last thread will have to deal with a different number of buffers
+        int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount;
+        futures[i] = service.submit(
+          new BufferCreatorCallable(bufferSize, directByteBuffer, buffersToCreate, allocator));
+      }
+      int bufferIndex = 0;
+      for (Future<ByteBuffer[]> future : futures) {
+        try {
+          ByteBuffer[] buffers = future.get();
+          for (ByteBuffer buffer : buffers) {
+            this.buffers[bufferIndex++] = buffer;
+          }
+        } catch (InterruptedException | ExecutionException e) {
+          LOG.error("Buffer creation interrupted", e);
+          throw new IOException(e);
+        }
+      }
+    } finally {
+      service.shutdownNow();
+    }
+    // always create on heap empty dummy buffer at last
+    this.buffers[bufferCount] = ByteBuffer.allocate(0);
+  }
+
+  /**
+   * A callable that creates buffers of the specified length either onheap/offheap using the
+   * {@link ByteBufferAllocator}
+   */
+  private static class BufferCreatorCallable implements Callable<ByteBuffer[]> {
+    private final int bufferCapacity;
+    private final boolean directByteBuffer;
+    private final int bufferCount;
+    private final ByteBufferAllocator allocator;
+
+    BufferCreatorCallable(int bufferCapacity, boolean directByteBuffer, int bufferCount,
+        ByteBufferAllocator allocator) {
+      this.bufferCapacity = bufferCapacity;
+      this.directByteBuffer = directByteBuffer;
+      this.bufferCount = bufferCount;
+      this.allocator = allocator;
+    }
+
+    @Override
+    public ByteBuffer[] call() throws Exception {
+      ByteBuffer[] buffers = new ByteBuffer[this.bufferCount];
+      for (int i = 0; i < this.bufferCount; i++) {
+        buffers[i] = allocator.allocate(this.bufferCapacity, this.directByteBuffer);
       }
+      return buffers;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d0e4a643/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
index f2c8549..c71b86c 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.util;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -54,4 +55,28 @@ public class TestByteBufferArray {
     subBuf.get();
     assertFalse(subBuf.hasRemaining());
   }
+
+  @Test
+  public void testByteBufferCreation() throws Exception {
+    int capacity = 470 * 1021 * 1023;
+    ByteBufferAllocator allocator = new ByteBufferAllocator() {
+      @Override
+      public ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException {
+        if (directByteBuffer) {
+          return ByteBuffer.allocateDirect((int) size);
+        } else {
+          return ByteBuffer.allocate((int) size);
+        }
+      }
+    };
+    ByteBufferArray array = new ByteBufferArray(capacity, false, allocator);
+    assertEquals(119, array.buffers.length);
+    for (int i = 0; i < array.buffers.length; i++) {
+      if (i == array.buffers.length - 1) {
+        assertEquals(array.buffers[i].capacity(), 0);
+      } else {
+        assertEquals(array.buffers[i].capacity(), ByteBufferArray.DEFAULT_BUFFER_SIZE);
+      }
+    }
+  }
 }