You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/03/16 17:12:42 UTC
[14/14] hbase git commit: HBSE-15314 Allow more than one backing file
in bucketcache (Chunhui Shen)
HBSE-15314 Allow more than one backing file in bucketcache (Chunhui Shen)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e67eb6c4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e67eb6c4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e67eb6c4
Branch: refs/heads/hbase-12439
Commit: e67eb6c424d76ee259f5076c277454a73e3a2bf4
Parents: 6a6fff1
Author: Ramkrishna <ra...@intel.com>
Authored: Thu Mar 16 16:11:35 2017 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Thu Mar 16 16:28:58 2017 +0530
----------------------------------------------------------------------
.../hbase/io/hfile/bucket/BucketCache.java | 9 +-
.../hbase/io/hfile/bucket/FileIOEngine.java | 184 +++++++++++++++----
.../hbase/io/hfile/bucket/TestFileIOEngine.java | 47 ++++-
3 files changed, 190 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e67eb6c4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 3e9c376..3c27f14 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -314,8 +314,13 @@ public class BucketCache implements BlockCache, HeapSize {
*/
private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
throws IOException {
- if (ioEngineName.startsWith("file:")) {
- return new FileIOEngine(ioEngineName.substring(5), capacity);
+ if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
+ // In order to make the usage simple, we only need the prefix 'files:' in
+ // document whether one or multiple file(s), but also support 'file:' for
+ // the compatibility
+ String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1)
+ .split(FileIOEngine.FILE_DELIMITER);
+ return new FileIOEngine(capacity, filePaths);
} else if (ioEngineName.startsWith("offheap")) {
return new ByteBufferIOEngine(capacity, true);
} else if (ioEngineName.startsWith("heap")) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/e67eb6c4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index aaf5cf9..7586d57 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -18,10 +18,12 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
+import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,38 +41,52 @@ import org.apache.hadoop.util.StringUtils;
@InterfaceAudience.Private
public class FileIOEngine implements IOEngine {
private static final Log LOG = LogFactory.getLog(FileIOEngine.class);
- private final RandomAccessFile raf;
- private final FileChannel fileChannel;
- private final String path;
- private long size;
-
- public FileIOEngine(String filePath, long fileSize) throws IOException {
- this.path = filePath;
- this.size = fileSize;
- try {
- raf = new RandomAccessFile(filePath, "rw");
- } catch (java.io.FileNotFoundException fex) {
- LOG.error("Can't create bucket cache file " + filePath, fex);
- throw fex;
- }
+ public static final String FILE_DELIMITER = ",";
+ private final String[] filePaths;
+ private final FileChannel[] fileChannels;
+ private final RandomAccessFile[] rafs;
- try {
- raf.setLength(fileSize);
- } catch (IOException ioex) {
- LOG.error("Can't extend bucket cache file; insufficient space for "
- + StringUtils.byteDesc(fileSize), ioex);
- raf.close();
- throw ioex;
- }
+ private final long sizePerFile;
+ private final long capacity;
- fileChannel = raf.getChannel();
- LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath);
+ private FileReadAccessor readAccessor = new FileReadAccessor();
+ private FileWriteAccessor writeAccessor = new FileWriteAccessor();
+
+ public FileIOEngine(long capacity, String... filePaths) throws IOException {
+ this.sizePerFile = capacity / filePaths.length;
+ this.capacity = this.sizePerFile * filePaths.length;
+ this.filePaths = filePaths;
+ this.fileChannels = new FileChannel[filePaths.length];
+ this.rafs = new RandomAccessFile[filePaths.length];
+ for (int i = 0; i < filePaths.length; i++) {
+ String filePath = filePaths[i];
+ try {
+ rafs[i] = new RandomAccessFile(filePath, "rw");
+ long totalSpace = new File(filePath).getTotalSpace();
+ if (totalSpace < sizePerFile) {
+ // The next setting length will throw exception,logging this message
+ // is just used for the detail reason of exception\uff0c
+ String msg = "Only " + StringUtils.byteDesc(totalSpace)
+ + " total space under " + filePath + ", not enough for requested "
+ + StringUtils.byteDesc(sizePerFile);
+ LOG.warn(msg);
+ }
+ rafs[i].setLength(sizePerFile);
+ fileChannels[i] = rafs[i].getChannel();
+ LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
+ + ", on the path:" + filePath);
+ } catch (IOException fex) {
+ LOG.error("Failed allocating cache on " + filePath, fex);
+ shutdown();
+ throw fex;
+ }
+ }
}
@Override
public String toString() {
- return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path +
- ", size=" + String.format("%,d", this.size);
+ return "ioengine=" + this.getClass().getSimpleName() + ", paths="
+ + Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity);
}
/**
@@ -94,7 +110,7 @@ public class FileIOEngine implements IOEngine {
public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
throws IOException {
ByteBuffer dstBuffer = ByteBuffer.allocate(length);
- fileChannel.read(dstBuffer, offset);
+ accessFile(readAccessor, dstBuffer, offset);
// The buffer created out of the fileChannel is formed by copying the data from the file
// Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts
// this buffer from the file the data is already copied and there is no need to ensure that
@@ -114,7 +130,7 @@ public class FileIOEngine implements IOEngine {
*/
@Override
public void write(ByteBuffer srcBuffer, long offset) throws IOException {
- fileChannel.write(srcBuffer, offset);
+ accessFile(writeAccessor, srcBuffer, offset);
}
/**
@@ -123,7 +139,16 @@ public class FileIOEngine implements IOEngine {
*/
@Override
public void sync() throws IOException {
- fileChannel.force(true);
+ for (int i = 0; i < fileChannels.length; i++) {
+ try {
+ if (fileChannels[i] != null) {
+ fileChannels[i].force(true);
+ }
+ } catch (IOException ie) {
+ LOG.warn("Failed syncing data to " + this.filePaths[i]);
+ throw ie;
+ }
+ }
}
/**
@@ -131,15 +156,17 @@ public class FileIOEngine implements IOEngine {
*/
@Override
public void shutdown() {
- try {
- fileChannel.close();
- } catch (IOException ex) {
- LOG.error("Can't shutdown cleanly", ex);
- }
- try {
- raf.close();
- } catch (IOException ex) {
- LOG.error("Can't shutdown cleanly", ex);
+ for (int i = 0; i < filePaths.length; i++) {
+ try {
+ if (fileChannels[i] != null) {
+ fileChannels[i].close();
+ }
+ if (rafs[i] != null) {
+ rafs[i].close();
+ }
+ } catch (IOException ex) {
+ LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex);
+ }
}
}
@@ -147,7 +174,84 @@ public class FileIOEngine implements IOEngine {
public void write(ByteBuff srcBuffer, long offset) throws IOException {
// When caching block into BucketCache there will be single buffer backing for this HFileBlock.
assert srcBuffer.hasArray();
- fileChannel.write(
- ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(), srcBuffer.remaining()), offset);
+ write(ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(),
+ srcBuffer.remaining()), offset);
+ }
+
+ private void accessFile(FileAccessor accessor, ByteBuffer buffer,
+ long globalOffset) throws IOException {
+ int startFileNum = getFileNum(globalOffset);
+ int remainingAccessDataLen = buffer.remaining();
+ int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
+ int accessFileNum = startFileNum;
+ long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
+ int bufLimit = buffer.limit();
+ while (true) {
+ FileChannel fileChannel = fileChannels[accessFileNum];
+ if (endFileNum > accessFileNum) {
+ // short the limit;
+ buffer.limit((int) (buffer.limit() - remainingAccessDataLen
+ + sizePerFile - accessOffset));
+ }
+ int accessLen = accessor.access(fileChannel, buffer, accessOffset);
+ // recover the limit
+ buffer.limit(bufLimit);
+ if (accessLen < remainingAccessDataLen) {
+ remainingAccessDataLen -= accessLen;
+ accessFileNum++;
+ accessOffset = 0;
+ } else {
+ break;
+ }
+ if (accessFileNum >= fileChannels.length) {
+ throw new IOException("Required data len "
+ + StringUtils.byteDesc(buffer.remaining())
+ + " exceed the engine's capacity " + StringUtils.byteDesc(capacity)
+ + " where offset=" + globalOffset);
+ }
+ }
+ }
+
+ /**
+ * Get the absolute offset in given file with the relative global offset.
+ * @param fileNum
+ * @param globalOffset
+ * @return the absolute offset
+ */
+ private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) {
+ return globalOffset - fileNum * sizePerFile;
+ }
+
+ private int getFileNum(long offset) {
+ if (offset < 0) {
+ throw new IllegalArgumentException("Unexpected offset " + offset);
+ }
+ int fileNum = (int) (offset / sizePerFile);
+ if (fileNum >= fileChannels.length) {
+ throw new RuntimeException("Not expected offset " + offset
+ + " where capacity=" + capacity);
+ }
+ return fileNum;
+ }
+
+ private static interface FileAccessor {
+ int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
+ throws IOException;
+ }
+
+ private static class FileReadAccessor implements FileAccessor {
+ @Override
+ public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
+ long accessOffset) throws IOException {
+ return fileChannel.read(byteBuffer, accessOffset);
+ }
+ }
+
+ private static class FileWriteAccessor implements FileAccessor {
+ @Override
+ public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
+ long accessOffset) throws IOException {
+ return fileChannel.write(byteBuffer, accessOffset);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e67eb6c4/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
index 93f4cf7..d1f3dfe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
@@ -38,13 +40,39 @@ import org.junit.experimental.categories.Category;
public class TestFileIOEngine {
@Test
public void testFileIOEngine() throws IOException {
- int size = 2 * 1024 * 1024; // 2 MB
- String filePath = "testFileIOEngine";
+ long totalCapacity = 6 * 1024 * 1024; // 6 MB
+ String[] filePaths = { "testFileIOEngine1", "testFileIOEngine2",
+ "testFileIOEngine3" };
+ long sizePerFile = totalCapacity / filePaths.length; // 2 MB per File
+ List<Long> boundaryStartPositions = new ArrayList<Long>();
+ boundaryStartPositions.add(0L);
+ for (int i = 1; i < filePaths.length; i++) {
+ boundaryStartPositions.add(sizePerFile * i - 1);
+ boundaryStartPositions.add(sizePerFile * i);
+ boundaryStartPositions.add(sizePerFile * i + 1);
+ }
+ List<Long> boundaryStopPositions = new ArrayList<Long>();
+ for (int i = 1; i < filePaths.length; i++) {
+ boundaryStopPositions.add(sizePerFile * i - 1);
+ boundaryStopPositions.add(sizePerFile * i);
+ boundaryStopPositions.add(sizePerFile * i + 1);
+ }
+ boundaryStopPositions.add(sizePerFile * filePaths.length - 1);
+ FileIOEngine fileIOEngine = new FileIOEngine(totalCapacity, filePaths);
try {
- FileIOEngine fileIOEngine = new FileIOEngine(filePath, size);
- for (int i = 0; i < 50; i++) {
+ for (int i = 0; i < 500; i++) {
int len = (int) Math.floor(Math.random() * 100);
- long offset = (long) Math.floor(Math.random() * size % (size - len));
+ long offset = (long) Math.floor(Math.random() * totalCapacity % (totalCapacity - len));
+ if (i < boundaryStartPositions.size()) {
+ // make the boundary start positon
+ offset = boundaryStartPositions.get(i);
+ } else if ((i - boundaryStartPositions.size()) < boundaryStopPositions.size()) {
+ // make the boundary stop positon
+ offset = boundaryStopPositions.get(i - boundaryStartPositions.size()) - len + 1;
+ } else if (i % 2 == 0) {
+ // make the cross-files block writing/reading
+ offset = Math.max(1, i % filePaths.length) * sizePerFile - len / 2;
+ }
byte[] data1 = new byte[len];
for (int j = 0; j < data1.length; ++j) {
data1[j] = (byte) (Math.random() * 255);
@@ -58,9 +86,12 @@ public class TestFileIOEngine {
}
}
} finally {
- File file = new File(filePath);
- if (file.exists()) {
- file.delete();
+ fileIOEngine.shutdown();
+ for (String filePath : filePaths) {
+ File file = new File(filePath);
+ if (file.exists()) {
+ file.delete();
+ }
}
}