You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zy...@apache.org on 2018/05/09 21:02:24 UTC
hbase git commit: HBASE-20204 Add locking to RefreshFileConnections
in BucketCache
Repository: hbase
Updated Branches:
refs/heads/fixRefresh [created] 9c046c091
HBASE-20204 Add locking to RefreshFileConnections in BucketCache
This is a follow-up to HBASE-20141 where Anoop suggested adding locking
for refreshing channels.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9c046c09
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9c046c09
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9c046c09
Branch: refs/heads/fixRefresh
Commit: 9c046c091c9b1ca6eb50d8b3e945c24a841dd4f1
Parents: 4f2dfd3
Author: Zach York <zy...@amazon.com>
Authored: Wed Mar 14 15:38:22 2018 -0700
Committer: Zach York <zy...@amazon.com>
Committed: Wed May 9 14:01:51 2018 -0700
----------------------------------------------------------------------
.../hbase/io/hfile/bucket/FileIOEngine.java | 32 +++++++++++++++-----
.../hbase/io/hfile/bucket/TestFileIOEngine.java | 19 +++++++++---
2 files changed, 38 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9c046c09/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index 29b810f..0710d26 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -26,6 +26,7 @@ import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.Arrays;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
@@ -49,6 +50,7 @@ public class FileIOEngine implements IOEngine {
private final String[] filePaths;
private final FileChannel[] fileChannels;
private final RandomAccessFile[] rafs;
+ private final ReentrantLock[] channelLocks;
private final long sizePerFile;
private final long capacity;
@@ -75,6 +77,7 @@ public class FileIOEngine implements IOEngine {
}
}
this.rafs = new RandomAccessFile[filePaths.length];
+ this.channelLocks = new ReentrantLock[filePaths.length];
for (int i = 0; i < filePaths.length; i++) {
String filePath = filePaths[i];
try {
@@ -90,6 +93,7 @@ public class FileIOEngine implements IOEngine {
}
rafs[i].setLength(sizePerFile);
fileChannels[i] = rafs[i].getChannel();
+ channelLocks[i] = new ReentrantLock();
LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
+ ", on the path:" + filePath);
} catch (IOException fex) {
@@ -233,8 +237,7 @@ public class FileIOEngine implements IOEngine {
} catch (ClosedByInterruptException e) {
throw e;
} catch (ClosedChannelException e) {
- LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file. ", e);
- refreshFileConnection(accessFileNum);
+ refreshFileConnection(accessFileNum, e);
continue;
}
// recover the limit
@@ -282,13 +285,26 @@ public class FileIOEngine implements IOEngine {
}
@VisibleForTesting
- void refreshFileConnection(int accessFileNum) throws IOException {
- FileChannel fileChannel = fileChannels[accessFileNum];
- if (fileChannel != null) {
- fileChannel.close();
+ void refreshFileConnection(int accessFileNum, IOException ioe) throws IOException {
+ ReentrantLock channelLock = channelLocks[accessFileNum];
+ channelLock.lock();
+ try {
+ FileChannel fileChannel = fileChannels[accessFileNum];
+ if (fileChannel != null) {
+ // Don't re-open a channel if we were waiting on another
+ // thread to re-open the channel and it is now open.
+ if (fileChannel.isOpen()) {
+ return;
+ }
+ fileChannel.close();
+ }
+ LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file: "
+ + filePaths[accessFileNum], ioe);
+ rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
+ fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
+ } finally{
+ channelLock.unlock();
}
- rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
- fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
}
private static interface FileAccessor {
http://git-wip-us.apache.org/repos/asf/hbase/blob/9c046c09/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
index 6480986..efb8145 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
@@ -18,7 +18,8 @@
package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import java.io.File;
@@ -143,10 +144,18 @@ public class TestFileIOEngine {
}
@Test
- public void testRefreshFileConnectionClosesConnections() throws IOException {
- FileChannel fileChannel = fileIOEngine.getFileChannels()[0];
+ public void testRefreshFileConnection() throws IOException {
+ FileChannel[] fileChannels = fileIOEngine.getFileChannels();
+ FileChannel fileChannel = fileChannels[0];
assertNotNull(fileChannel);
- fileIOEngine.refreshFileConnection(0);
- assertFalse(fileChannel.isOpen());
+ fileChannel.close();
+ fileIOEngine.refreshFileConnection(0, new IOException("Test Exception"));
+ FileChannel[] reopenedFileChannels = fileIOEngine.getFileChannels();
+ FileChannel reopenedFileChannel = reopenedFileChannels[0];
+ assertNotEquals(fileChannel, reopenedFileChannel);
+ assertEquals(fileChannels.length, reopenedFileChannels.length);
+ for (int i = 1; i < fileChannels.length; i++) {
+ assertEquals(fileChannels[i], reopenedFileChannels[i]);
+ }
}
}