You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/02/27 00:09:25 UTC
[04/37] hbase git commit: HBASE-13259 mmap() based BucketCache
IOEngine (Zee Chen & Ram)
HBASE-13259 mmap() based BucketCache IOEngine (Zee Chen & Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3ba1a7fd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3ba1a7fd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3ba1a7fd
Branch: refs/heads/hbase-12439
Commit: 3ba1a7fd23f0b0ca06cf7a9a04cb45975e1c7d91
Parents: a8073c4
Author: ramkrishna <ra...@gmail.com>
Authored: Tue Feb 23 17:03:38 2016 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Tue Feb 23 17:03:38 2016 +0530
----------------------------------------------------------------------
.../hadoop/hbase/util/ByteBufferAllocator.java | 39 +++++
.../hadoop/hbase/util/ByteBufferArray.java | 16 +-
.../hadoop/hbase/util/TestByteBufferArray.java | 16 +-
.../hbase/io/hfile/bucket/BucketCache.java | 11 +-
.../io/hfile/bucket/ByteBufferIOEngine.java | 18 +-
.../hbase/io/hfile/bucket/FileMmapEngine.java | 166 +++++++++++++++++++
.../io/hfile/bucket/TestFileMmapEngine.java | 68 ++++++++
7 files changed, 320 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ba1a7fd/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferAllocator.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferAllocator.java
new file mode 100644
index 0000000..b19a0a7
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferAllocator.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Defines the way the ByteBuffers are created
+ */
+@InterfaceAudience.Private
+public interface ByteBufferAllocator {
+
+ /**
+ * Allocates a bytebuffer
+ * @param size the size of the bytebuffer
+ * @param directByteBuffer indicator to create a direct bytebuffer
+ * @return the bytebuffer that is created
+ * @throws IOException exception thrown if there is an error while creating the ByteBuffer
+ */
+ ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ba1a7fd/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
index 2334cf7..b09dc9a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.util;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -39,20 +40,23 @@ import org.apache.hadoop.util.StringUtils;
public final class ByteBufferArray {
private static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
- static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
+ public static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
private ByteBuffer buffers[];
private Lock locks[];
private int bufferSize;
private int bufferCount;
-
+ private ByteBufferAllocator allocator;
/**
* We allocate a number of byte buffers as the capacity. In order not to out
* of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
* we will allocate one additional buffer with capacity 0;
* @param capacity total size of the byte buffer array
* @param directByteBuffer true if we allocate direct buffer
+ * @param allocator the ByteBufferAllocator that will create the buffers
+ * @throws IOException throws IOException if there is an exception thrown by the allocator
*/
- public ByteBufferArray(long capacity, boolean directByteBuffer) {
+ public ByteBufferArray(long capacity, boolean directByteBuffer, ByteBufferAllocator allocator)
+ throws IOException {
this.bufferSize = DEFAULT_BUFFER_SIZE;
if (this.bufferSize > (capacity / 16))
this.bufferSize = (int) roundUp(capacity / 16, 32768);
@@ -62,15 +66,15 @@ public final class ByteBufferArray {
+ bufferCount + ", direct=" + directByteBuffer);
buffers = new ByteBuffer[bufferCount + 1];
locks = new Lock[bufferCount + 1];
+ this.allocator = allocator;
for (int i = 0; i <= bufferCount; i++) {
locks[i] = new ReentrantLock();
if (i < bufferCount) {
- buffers[i] = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize)
- : ByteBuffer.allocate(bufferSize);
+ buffers[i] = allocator.allocate(bufferSize, directByteBuffer);
} else {
+ // always create on heap
buffers[i] = ByteBuffer.allocate(0);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ba1a7fd/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
index 701601d..f2c8549 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -32,7 +35,18 @@ public class TestByteBufferArray {
@Test
public void testAsSubBufferWhenEndOffsetLandInLastBuffer() throws Exception {
int capacity = 4 * 1024 * 1024;
- ByteBufferArray array = new ByteBufferArray(capacity, false);
+ ByteBufferAllocator allocator = new ByteBufferAllocator() {
+ @Override
+ public ByteBuffer allocate(long size, boolean directByteBuffer)
+ throws IOException {
+ if (directByteBuffer) {
+ return ByteBuffer.allocateDirect((int) size);
+ } else {
+ return ByteBuffer.allocate((int) size);
+ }
+ }
+ };
+ ByteBufferArray array = new ByteBufferArray(capacity, false, allocator);
ByteBuff subBuf = array.asSubByteBuff(0, capacity);
subBuf.position(capacity - 1);// Position to the last byte
assertTrue(subBuf.hasRemaining());
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ba1a7fd/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 6024958..7436b71 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -303,15 +303,18 @@ public class BucketCache implements BlockCache, HeapSize {
*/
private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
throws IOException {
- if (ioEngineName.startsWith("file:"))
+ if (ioEngineName.startsWith("file:")) {
return new FileIOEngine(ioEngineName.substring(5), capacity);
- else if (ioEngineName.startsWith("offheap"))
+ } else if (ioEngineName.startsWith("offheap")) {
return new ByteBufferIOEngine(capacity, true);
- else if (ioEngineName.startsWith("heap"))
+ } else if (ioEngineName.startsWith("heap")) {
return new ByteBufferIOEngine(capacity, false);
- else
+ } else if (ioEngineName.startsWith("mmap:")) {
+ return new FileMmapEngine(ioEngineName.substring(5), capacity);
+ } else {
throw new IllegalArgumentException(
"Don't understand io engine name for cache - prefix with file:, heap or offheap");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ba1a7fd/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
index 2227312..45ed1ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.util.ByteBufferAllocator;
import org.apache.hadoop.hbase.util.ByteBufferArray;
/**
@@ -42,13 +43,24 @@ public class ByteBufferIOEngine implements IOEngine {
* Construct the ByteBufferIOEngine with the given capacity
* @param capacity
* @param direct true if allocate direct buffer
- * @throws IOException
+ * @throws IOException ideally here no exception to be thrown from the allocator
*/
public ByteBufferIOEngine(long capacity, boolean direct)
throws IOException {
this.capacity = capacity;
this.direct = direct;
- bufferArray = new ByteBufferArray(capacity, direct);
+ ByteBufferAllocator allocator = new ByteBufferAllocator() {
+ @Override
+ public ByteBuffer allocate(long size, boolean directByteBuffer)
+ throws IOException {
+ if (directByteBuffer) {
+ return ByteBuffer.allocateDirect((int) size);
+ } else {
+ return ByteBuffer.allocate((int) size);
+ }
+ }
+ };
+ bufferArray = new ByteBufferArray(capacity, direct, allocator);
}
@Override
@@ -85,7 +97,7 @@ public class ByteBufferIOEngine implements IOEngine {
* @param srcBuffer the given byte buffer from which bytes are to be read
* @param offset The offset in the ByteBufferArray of the first byte to be
* written
- * @throws IOException
+ * @throws IOException throws IOException if writing to the array throws exception
*/
@Override
public void write(ByteBuffer srcBuffer, long offset) throws IOException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ba1a7fd/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java
new file mode 100644
index 0000000..7a2afe8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java
@@ -0,0 +1,166 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
+import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.util.ByteBufferAllocator;
+import org.apache.hadoop.hbase.util.ByteBufferArray;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * IO engine that stores data to a file on the local file system using memory mapping
+ * mechanism
+ */
+@InterfaceAudience.Private
+public class FileMmapEngine implements IOEngine {
+ static final Log LOG = LogFactory.getLog(FileMmapEngine.class);
+
+ private final String path;
+ private long size;
+ private ByteBufferArray bufferArray;
+ private final FileChannel fileChannel;
+ private RandomAccessFile raf = null;
+
+ public FileMmapEngine(String filePath, long capacity) throws IOException {
+ this.path = filePath;
+ this.size = capacity;
+ long fileSize = 0;
+ try {
+ raf = new RandomAccessFile(filePath, "rw");
+ fileSize = roundUp(capacity, ByteBufferArray.DEFAULT_BUFFER_SIZE);
+ raf.setLength(fileSize);
+ fileChannel = raf.getChannel();
+ LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath);
+ } catch (java.io.FileNotFoundException fex) {
+ LOG.error("Can't create bucket cache file " + filePath, fex);
+ throw fex;
+ } catch (IOException ioex) {
+ LOG.error("Can't extend bucket cache file; insufficient space for "
+ + StringUtils.byteDesc(fileSize), ioex);
+ shutdown();
+ throw ioex;
+ }
+ ByteBufferAllocator allocator = new ByteBufferAllocator() {
+ int pos = 0;
+ @Override
+ public ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException {
+ ByteBuffer buffer = null;
+ if (directByteBuffer) {
+ buffer = fileChannel.map(java.nio.channels.FileChannel.MapMode.READ_WRITE, pos * size,
+ size);
+ } else {
+ throw new IllegalArgumentException(
+ "Only Direct Bytebuffers allowed with FileMMap engine");
+ }
+ pos++;
+ return buffer;
+ }
+ };
+ bufferArray = new ByteBufferArray(fileSize, true, allocator);
+ }
+
+ private long roundUp(long n, long to) {
+ return ((n + to - 1) / to) * to;
+ }
+
+ @Override
+ public String toString() {
+ return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path +
+ ", size=" + String.format("%,d", this.size);
+ }
+
+ /**
+ * File IO engine is always able to support persistent storage for the cache
+ * @return true
+ */
+ @Override
+ public boolean isPersistent() {
+ return true;
+ }
+
+ @Override
+ public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
+ throws IOException {
+ byte[] dst = new byte[length];
+ bufferArray.getMultiple(offset, length, dst);
+ return deserializer.deserialize(new SingleByteBuff(ByteBuffer.wrap(dst)), true,
+ MemoryType.EXCLUSIVE);
+ }
+
+ /**
+ * Transfers data from the given byte buffer to file
+ * @param srcBuffer the given byte buffer from which bytes are to be read
+ * @param offset The offset in the file where the first byte to be written
+ * @throws IOException
+ */
+ @Override
+ public void write(ByteBuffer srcBuffer, long offset) throws IOException {
+ assert srcBuffer.hasArray();
+ bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
+ srcBuffer.arrayOffset());
+ }
+
+ @Override
+ public void write(ByteBuff srcBuffer, long offset) throws IOException {
+ // This singleByteBuff can be considered to be array backed
+ assert srcBuffer.hasArray();
+ bufferArray.putMultiple(offset, srcBuffer.remaining(), srcBuffer.array(),
+ srcBuffer.arrayOffset());
+
+ }
+ /**
+ * Sync the data to file after writing
+ * @throws IOException
+ */
+ @Override
+ public void sync() throws IOException {
+ if (fileChannel != null) {
+ fileChannel.force(true);
+ }
+ }
+
+ /**
+ * Close the file
+ */
+ @Override
+ public void shutdown() {
+ try {
+ fileChannel.close();
+ } catch (IOException ex) {
+ LOG.error("Can't shutdown cleanly", ex);
+ }
+ try {
+ raf.close();
+ } catch (IOException ex) {
+ LOG.error("Can't shutdown cleanly", ex);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ba1a7fd/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java
new file mode 100644
index 0000000..dfc18c7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java
@@ -0,0 +1,68 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Basic test for {@link FileMmapEngine}
+ */
+@Category({IOTests.class, SmallTests.class})
+public class TestFileMmapEngine {
+ @Test
+ public void testFileMmapEngine() throws IOException {
+ int size = 2 * 1024 * 1024; // 2 MB
+ String filePath = "testFileMmapEngine";
+ try {
+ FileMmapEngine fileMmapEngine = new FileMmapEngine(filePath, size);
+ for (int i = 0; i < 50; i++) {
+ int len = (int) Math.floor(Math.random() * 100);
+ long offset = (long) Math.floor(Math.random() * size % (size - len));
+ byte[] data1 = new byte[len];
+ for (int j = 0; j < data1.length; ++j) {
+ data1[j] = (byte) (Math.random() * 255);
+ }
+ fileMmapEngine.write(ByteBuffer.wrap(data1), offset);
+ BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
+ fileMmapEngine.read(offset, len, deserializer);
+ ByteBuff data2 = deserializer.getDeserializedByteBuff();
+ for (int j = 0; j < data1.length; ++j) {
+ assertTrue(data1[j] == data2.get(j));
+ }
+ }
+ } finally {
+ File file = new File(filePath);
+ if (file.exists()) {
+ file.delete();
+ }
+ }
+
+ }
+}
\ No newline at end of file