You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/12/07 18:53:06 UTC

hbase git commit: HBASE-19435 Reopen Files for ClosedChannelException in BucketCache

Repository: hbase
Updated Branches:
  refs/heads/master 638433358 -> f55e81e6c


HBASE-19435 Reopen Files for ClosedChannelException in BucketCache

Signed-off-by: tedyu <yu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f55e81e6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f55e81e6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f55e81e6

Branch: refs/heads/master
Commit: f55e81e6c03f8cf1667340bcd3f7fa6890f1a770
Parents: 6384333
Author: Zach York <zy...@amazon.com>
Authored: Mon Dec 4 12:11:21 2017 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Thu Dec 7 10:52:55 2017 -0800

----------------------------------------------------------------------
 .../hbase/io/hfile/bucket/FileIOEngine.java     | 35 +++++++++++++++++---
 .../hbase/io/hfile/bucket/TestFileIOEngine.java | 17 +++++++++-
 2 files changed, 46 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f55e81e6/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index 5a47c73..ad1c394 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -19,12 +19,15 @@
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.util.Arrays;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -139,6 +142,17 @@ public class FileIOEngine implements IOEngine {
     return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE);
   }
 
+  @VisibleForTesting
+  void closeFileChannels() {
+    for (FileChannel fileChannel: fileChannels) {
+      try {
+        fileChannel.close();
+      } catch (IOException e) {
+        LOG.warn("Failed to close FileChannel", e);
+      }
+    }
+  }
+
   /**
    * Transfers data from the given byte buffer to file
    * @param srcBuffer the given byte buffer from which bytes are to be read
@@ -208,12 +222,19 @@ public class FileIOEngine implements IOEngine {
     int bufLimit = buffer.limit();
     while (true) {
       FileChannel fileChannel = fileChannels[accessFileNum];
+      int accessLen = 0;
       if (endFileNum > accessFileNum) {
         // short the limit;
         buffer.limit((int) (buffer.limit() - remainingAccessDataLen
             + sizePerFile - accessOffset));
       }
-      int accessLen = accessor.access(fileChannel, buffer, accessOffset);
+      try {
+        accessLen = accessor.access(fileChannel, buffer, accessOffset);
+      } catch (ClosedChannelException e) {
+        LOG.warn("Caught ClosedChannelException accessing BucketCache, reopening file. ", e);
+        refreshFileConnection(accessFileNum);
+        continue;
+      }
       // recover the limit
       buffer.limit(bufLimit);
       if (accessLen < remainingAccessDataLen) {
@@ -224,10 +245,9 @@ public class FileIOEngine implements IOEngine {
         break;
       }
       if (accessFileNum >= fileChannels.length) {
-        throw new IOException("Required data len "
-            + StringUtils.byteDesc(buffer.remaining())
-            + " exceed the engine's capacity " + StringUtils.byteDesc(capacity)
-            + " where offset=" + globalOffset);
+        throw new IOException("Required data len " + StringUtils.byteDesc(buffer.remaining())
+            + " exceed the engine's capacity " + StringUtils.byteDesc(capacity) + " where offset="
+            + globalOffset);
       }
     }
   }
@@ -254,6 +274,11 @@ public class FileIOEngine implements IOEngine {
     return fileNum;
   }
 
+  private void refreshFileConnection(int accessFileNum) throws FileNotFoundException {
+    rafs[accessFileNum] = new RandomAccessFile(filePaths[accessFileNum], "rw");
+    fileChannels[accessFileNum] = rafs[accessFileNum].getChannel();
+  }
+
   private static interface FileAccessor {
     int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
         throws IOException;

http://git-wip-us.apache.org/repos/asf/hbase/blob/f55e81e6/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
index 4451c0c..1bcc026 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
 import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
@@ -119,4 +118,20 @@ public class TestFileIOEngine {
     ByteBuff data2 = deserializer.getDeserializedByteBuff();
     assertArrayEquals(data1, data2.array());
   }
+
+  @Test
+  public void testClosedChannelException() throws IOException {
+    fileIOEngine.closeFileChannels();
+    int len = 5;
+    long offset = 0L;
+    byte[] data1 = new byte[len];
+    for (int j = 0; j < data1.length; ++j) {
+      data1[j] = (byte) (Math.random() * 255);
+    }
+    fileIOEngine.write(ByteBuffer.wrap(data1), offset);
+    BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer();
+    fileIOEngine.read(offset, len, deserializer);
+    ByteBuff data2 = deserializer.getDeserializedByteBuff();
+    assertArrayEquals(data1, data2.array());
+  }
 }