You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/02/27 00:09:25 UTC

[04/37] hbase git commit: HBASE-13259 mmap() based BucketCache IOEngine (Zee Chen & Ram)

HBASE-13259 mmap() based BucketCache IOEngine (Zee Chen & Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3ba1a7fd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3ba1a7fd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3ba1a7fd

Branch: refs/heads/hbase-12439
Commit: 3ba1a7fd23f0b0ca06cf7a9a04cb45975e1c7d91
Parents: a8073c4
Author: ramkrishna <ra...@gmail.com>
Authored: Tue Feb 23 17:03:38 2016 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Tue Feb 23 17:03:38 2016 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/util/ByteBufferAllocator.java  |  39 +++++
 .../hadoop/hbase/util/ByteBufferArray.java      |  16 +-
 .../hadoop/hbase/util/TestByteBufferArray.java  |  16 +-
 .../hbase/io/hfile/bucket/BucketCache.java      |  11 +-
 .../io/hfile/bucket/ByteBufferIOEngine.java     |  18 +-
 .../hbase/io/hfile/bucket/FileMmapEngine.java   | 166 +++++++++++++++++++
 .../io/hfile/bucket/TestFileMmapEngine.java     |  68 ++++++++
 7 files changed, 320 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3ba1a7fd/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferAllocator.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferAllocator.java
new file mode 100644
index 0000000..b19a0a7
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferAllocator.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Defines the way the ByteBuffers are created
+ */
+@InterfaceAudience.Private
+public interface ByteBufferAllocator {
+
+  /**
+   * Allocates a bytebuffer
+   * @param size the size of the bytebuffer
+   * @param directByteBuffer indicator to create a direct bytebuffer
+   * @return the bytebuffer that is created
+   * @throws IOException exception thrown if there is an error while creating the ByteBuffer
+   */
+  ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3ba1a7fd/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
index 2334cf7..b09dc9a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.util;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -39,20 +40,23 @@ import org.apache.hadoop.util.StringUtils;
 public final class ByteBufferArray {
   private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
 
-  static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
+  public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
   private ByteBuffer buffers[];
   private Lock locks[];
   private int bufferSize;
   private int bufferCount;
-
+  private ByteBufferAllocator allocator;
   /**
    * We allocate a number of byte buffers as the capacity. In order not to out
    * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
    * we will allocate one additional buffer with capacity 0;
    * @param capacity total size of the byte buffer array
    * @param directByteBuffer true if we allocate direct buffer
+   * @param allocator the ByteBufferAllocator that will create the buffers
+   * @throws IOException throws IOException if there is an exception thrown by the allocator
    */
-  public ByteBufferArray(long capacity, boolean directByteBuffer) {
+  public ByteBufferArray(long capacity, boolean directByteBuffer, ByteBufferAllocator allocator)
+      throws IOException {
     this.bufferSize = DEFAULT_BUFFER_SIZE;
     if (this.bufferSize > (capacity / 16))
       this.bufferSize = (int) roundUp(capacity / 16, 32768);
@@ -62,15 +66,15 @@ public final class ByteBufferArray {
         + bufferCount + ", direct=" + directByteBuffer);
     buffers = new ByteBuffer[bufferCount + 1];
     locks = new Lock[bufferCount + 1];
+    this.allocator = allocator;
     for (int i = 0; i <= bufferCount; i++) {
       locks[i] = new ReentrantLock();
       if (i < bufferCount) {
-        buffers[i] = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize)
-            : ByteBuffer.allocate(bufferSize);
+        buffers[i] = allocator.allocate(bufferSize, directByteBuffer);
       } else {
+        // always create on heap
         buffers[i] = ByteBuffer.allocate(0);
       }
-
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3ba1a7fd/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
index 701601d..f2c8549 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.util;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -32,7 +35,18 @@ public class TestByteBufferArray {
   @Test
   public void testAsSubBufferWhenEndOffsetLandInLastBuffer() throws Exception {
     int capacity = 4 * 1024 * 1024;
-    ByteBufferArray array = new ByteBufferArray(capacity, false);
+    ByteBufferAllocator allocator = new ByteBufferAllocator() {
+      @Override
+      public ByteBuffer allocate(long size, boolean directByteBuffer)
+          throws IOException {
+        if (directByteBuffer) {
+          return ByteBuffer.allocateDirect((int) size);
+        } else {
+          return ByteBuffer.allocate((int) size);
+        }
+      }
+    };
+    ByteBufferArray array = new ByteBufferArray(capacity, false, allocator);
     ByteBuff subBuf = array.asSubByteBuff(0, capacity);
     subBuf.position(capacity - 1);// Position to the last byte
     assertTrue(subBuf.hasRemaining());

http://git-wip-us.apache.org/repos/asf/hbase/blob/3ba1a7fd/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 6024958..7436b71 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -303,15 +303,18 @@ public class BucketCache implements BlockCache, HeapSize {
    */
   private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
       throws IOException {
-    if (ioEngineName.startsWith("file:"))
+    if (ioEngineName.startsWith("file:")) {
       return new FileIOEngine(ioEngineName.substring(5), capacity);
-    else if (ioEngineName.startsWith("offheap"))
+    } else if (ioEngineName.startsWith("offheap")) {
       return new ByteBufferIOEngine(capacity, true);
-    else if (ioEngineName.startsWith("heap"))
+    } else if (ioEngineName.startsWith("heap")) {
       return new ByteBufferIOEngine(capacity, false);
-    else
+    } else if (ioEngineName.startsWith("mmap:")) {
+      return new FileMmapEngine(ioEngineName.substring(5), capacity);
+    } else {
       throw new IllegalArgumentException(
           "Don't understand io engine name for cache - prefix with file:, heap or offheap");
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/3ba1a7fd/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
index 2227312..45ed1ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable;
 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
 import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
 import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.util.ByteBufferAllocator;
 import org.apache.hadoop.hbase.util.ByteBufferArray;
 
 /**
@@ -42,13 +43,24 @@ public class ByteBufferIOEngine implements IOEngine {
    * Construct the ByteBufferIOEngine with the given capacity
    * @param capacity
    * @param direct true if allocate direct buffer
-   * @throws IOException
+   * @throws IOException ideally here no exception to be thrown from the allocator
    */
   public ByteBufferIOEngine(long capacity, boolean direct)
       throws IOException {
     this.capacity = capacity;
     this.direct = direct;
-    bufferArray = new ByteBufferArray(capacity, direct);
+    ByteBufferAllocator allocator = new ByteBufferAllocator() {
+      @Override
+      public ByteBuffer allocate(long size, boolean directByteBuffer)
+          throws IOException {
+        if (directByteBuffer) {
+          return ByteBuffer.allocateDirect((int) size);
+        } else {
+          return ByteBuffer.allocate((int) size);
+        }
+      }
+    };
+    bufferArray = new ByteBufferArray(capacity, direct, allocator);
   }
 
   @Override
@@ -85,7 +97,7 @@ public class ByteBufferIOEngine implements IOEngine {
    * @param srcBuffer the given byte buffer from which bytes are to be read
    * @param offset The offset in the ByteBufferArray of the first byte to be
    *          written
-   * @throws IOException
+   * @throws IOException throws IOException if writing to the array throws exception
    */
   @Override
   public void write(ByteBuffer srcBuffer, long offset) throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/3ba1a7fd/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java
new file mode 100644
index 0000000..7a2afe8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java
@@ -0,0 +1,166 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
+import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.util.ByteBufferAllocator;
+import org.apache.hadoop.hbase.util.ByteBufferArray;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * IO engine that stores data to a file on the local file system using memory mapping
+ * mechanism
+ */
+@InterfaceAudience.Private
+public class FileMmapEngine implements IOEngine {
+  static final Log LOG = LogFactory.getLog(FileMmapEngine.class);
+
+  private final String path;
+  private long size;
+  private ByteBufferArray bufferArray;
+  private final FileChannel fileChannel;
+  private RandomAccessFile raf = null;
+
+  public FileMmapEngine(String filePath, long capacity) throws IOException {
+    this.path = filePath;
+    this.size = capacity;
+    long fileSize = 0;
+    try {
+      raf = new RandomAccessFile(filePath, "rw");
+      fileSize = roundUp(capacity, ByteBufferArray.DEFAULT_BUFFER_SIZE);
+      raf.setLength(fileSize);
+      fileChannel = raf.getChannel();
+      LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath);
+    } catch (java.io.FileNotFoundException fex) {
+      LOG.error("Can't create bucket cache file " + filePath, fex);
+      throw fex;
+    } catch (IOException ioex) {
+      LOG.error("Can't extend bucket cache file; insufficient space for "
+          + StringUtils.byteDesc(fileSize), ioex);
+      shutdown();
+      throw ioex;
+    }
+    ByteBufferAllocator allocator = new ByteBufferAllocator() {
+      int pos = 0;
+      @Override
+      public ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException {
+        ByteBuffer buffer = null;
+        if (directByteBuffer) {
+          buffer = fileChannel.map(java.nio.channels.FileChannel.MapMode.READ_WRITE, pos * size,
+              size);
+        } else {
+          throw new IllegalArgumentException(
+              "Only Direct Bytebuffers allowed with FileMMap engine");
+        }
+        pos++;
+        return buffer;
+      }
+    };
+    bufferArray = new ByteBufferArray(fileSize, true, allocator);
+  }
+
+  private long roundUp(long n, long to) {
+    return ((n + to - 1) / to) * to;
+  }
+
+  @Override
+  public String toString() {
+    return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path +
+      ", size=" + String.format("%,d", this.size);
+  }
+
+  /**
+   * File IO engine is always able to support persistent storage for the cache
+   * @return true
+   */
+  @Override
+  public boolean isPersistent() {
+    return true;
+  }
+
+  @Override
+  public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
+      throws IOException {
+    byte[] dst = new byte[length];
+    bufferArray.getMultiple(offset, length, dst);
+    return deserializer.deserialize(new SingleByteBuff(ByteBuffer.wrap(dst)), true,
+        MemoryType.EXCLUSIVE);
+  }
+
+  /**
+   * Transfers data from the given byte buffer to file
+   * @param srcBuffer the given byte buffer from which bytes are to be read
+   * @param offset The offset in the file where the first byte to be written
+   * @throws IOException
+   */
+  @Override
+  public void write(ByteBuffer srcBuffer, long offset) throws IOException {
+    assert srcBuffer.hasArray();
+    bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
+        srcBuffer.arrayOffset());
+  }
+
+  @Override
+  public void write(ByteBuff srcBuffer, long offset) throws IOException {
+    // This singleByteBuff can be considered to be array backed
+    assert srcBuffer.hasArray();
+    bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
+        srcBuffer.arrayOffset());
+
+  }
+  /**
+   * Sync the data to file after writing
+   * @throws IOException
+   */
+  @Override
+  public void sync() throws IOException {
+    if (fileChannel != null) {
+      fileChannel.force(true);
+    }
+  }
+
+  /**
+   * Close the file
+   */
+  @Override
+  public void shutdown() {
+    try {
+      fileChannel.close();
+    } catch (IOException ex) {
+      LOG.error("Can't shutdown cleanly", ex);
+    }
+    try {
+      raf.close();
+    } catch (IOException ex) {
+      LOG.error("Can't shutdown cleanly", ex);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3ba1a7fd/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java
new file mode 100644
index 0000000..dfc18c7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Basic test for {@link FileMmapEngine}
+ */
+@Category({IOTests.class, SmallTests.class})
+public class TestFileMmapEngine {
+  @Test
+  public void testFileMmapEngine() throws IOException {
+    int size = 2 * 1024 * 1024; // 2 MB
+    String filePath = "testFileMmapEngine";
+    try {
+      FileMmapEngine fileMmapEngine = new FileMmapEngine(filePath, size);
+      for (int i = 0; i < 50; i++) {
+        int len = (int) Math.floor(Math.random() * 100);
+        long offset = (long) Math.floor(Math.random() * size % (size - len));
+        byte[] data1 = new byte[len];
+        for (int j = 0; j < data1.length; ++j) {
+          data1[j] = (byte) (Math.random() * 255);
+        }
+        fileMmapEngine.write(ByteBuffer.wrap(data1), offset);
+        BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
+        fileMmapEngine.read(offset, len, deserializer);
+        ByteBuff data2 = deserializer.getDeserializedByteBuff();
+        for (int j = 0; j < data1.length; ++j) {
+          assertTrue(data1[j] == data2.get(j));
+        }
+      }
+    } finally {
+      File file = new File(filePath);
+      if (file.exists()) {
+        file.delete();
+      }
+    }
+
+  }
+}
\ No newline at end of file