You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/06/06 20:27:58 UTC

[2/2] hbase git commit: HBASE-9393 Hbase does not closing a closed socket resulting in many CLOSE_WAIT

HBASE-9393 Hbase does not closing a closed socket resulting in many CLOSE_WAIT

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/356d4e91
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/356d4e91
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/356d4e91

Branch: refs/heads/branch-1
Commit: 356d4e9187fc6748169d3aaebe516fb2257d8835
Parents: 39e8e2f
Author: Ashish Singhi <as...@apache.org>
Authored: Tue Jun 6 17:49:08 2017 +0530
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Jun 6 12:59:19 2017 -0700

----------------------------------------------------------------------
 .../hbase/io/FSDataInputStreamWrapper.java      | 71 +++++++++++++++++++-
 .../hadoop/hbase/io/HalfStoreFileReader.java    |  4 ++
 .../hbase/io/hfile/AbstractHFileReader.java     |  8 +++
 .../org/apache/hadoop/hbase/io/hfile/HFile.java | 26 +++++--
 .../hadoop/hbase/io/hfile/HFileBlock.java       | 21 +++++-
 .../hadoop/hbase/io/hfile/HFileReaderV2.java    |  5 ++
 .../hadoop/hbase/io/hfile/HFileScanner.java     |  5 ++
 .../hbase/regionserver/StoreFileScanner.java    |  1 +
 8 files changed, 133 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index b06be6b..dc168da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -18,7 +18,12 @@
 package org.apache.hadoop.hbase.io;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -32,6 +37,8 @@ import com.google.common.annotations.VisibleForTesting;
  * see method comments.
  */
 public class FSDataInputStreamWrapper {
+  private static final Log LOG = LogFactory.getLog(FSDataInputStreamWrapper.class);
+
   private final HFileSystem hfs;
   private final Path path;
   private final FileLink link;
@@ -74,6 +81,11 @@ public class FSDataInputStreamWrapper {
   // reads without hbase checksum verification.
   private volatile int hbaseChecksumOffCount = -1;
 
+  private Boolean instanceOfCanUnbuffer = null;
+  // Using reflection to get org.apache.hadoop.fs.CanUnbuffer#unbuffer method to avoid compilation
+  // errors against Hadoop pre 2.6.4 and 2.7.1 versions.
+  private Method unbuffer = null;
+
   public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
     this(fs, null, path, false);
   }
@@ -219,4 +231,61 @@ public class FSDataInputStreamWrapper {
   public HFileSystem getHfs() {
     return this.hfs;
   }
-}
+
+  /**
+   * This will free sockets and file descriptors held by the stream only when the stream implements
+   * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients
+   * using this stream to read the blocks have finished reading. If by chance the stream is
+   * unbuffered and there are clients still holding this stream for read then on next client read
+   * request a new socket will be opened by Datanode without client knowing about it and will serve
+   * its read request. Note: If this socket is idle for some time then the DataNode will close the
+   * socket and the socket will move into CLOSE_WAIT state and on the next client request on this
+   * stream, the current socket will be closed and a new socket will be opened to serve the
+   * requests.
+   */
+  @SuppressWarnings({ "rawtypes" })
+  public void unbuffer() {
+    FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
+    if (stream != null) {
+      InputStream wrappedStream = stream.getWrappedStream();
+      // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop
+      // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the
+      // CanUnbuffer interface or not and based on that call the unbuffer api.
+      final Class<? extends InputStream> streamClass = wrappedStream.getClass();
+      if (this.instanceOfCanUnbuffer == null) {
+        // To ensure we compute whether the stream is instance of CanUnbuffer only once.
+        this.instanceOfCanUnbuffer = false;
+        Class<?>[] streamInterfaces = streamClass.getInterfaces();
+        for (Class c : streamInterfaces) {
+          if (c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer")) {
+            try {
+              this.unbuffer = streamClass.getDeclaredMethod("unbuffer");
+            } catch (NoSuchMethodException | SecurityException e) {
+              LOG.warn("Failed to find 'unbuffer' method in class " + streamClass
+                  + " . So there may be a TCP socket connection "
+                  + "left open in CLOSE_WAIT state.",
+                e);
+              return;
+            }
+            this.instanceOfCanUnbuffer = true;
+            break;
+          }
+        }
+      }
+      if (this.instanceOfCanUnbuffer) {
+        try {
+          this.unbuffer.invoke(wrappedStream);
+        } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+          LOG.warn("Failed to invoke 'unbuffer' method in class " + streamClass
+              + " . So there may be a TCP socket connection left open in CLOSE_WAIT state.",
+            e);
+        }
+      } else {
+        LOG.warn("Failed to find 'unbuffer' method in class " + streamClass
+            + " . So there may be a TCP socket connection "
+            + "left open in CLOSE_WAIT state. For more details check "
+            + "https://issues.apache.org/jira/browse/HBASE-9393");
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
index ed2e925..c259fc2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
@@ -322,6 +322,10 @@ public class HalfStoreFileReader extends StoreFile.Reader {
       public Cell getNextIndexedKey() {
         return null;
       }
+
+      @Override
+      public void close() {
+      }
     };
   }
   

http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
index 7d8b572..5039427 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
@@ -332,6 +332,14 @@ public abstract class AbstractHFileReader
     public HFile.Reader getReader() {
       return reader;
     }
+
+    @Override
+    public void close() {
+      if (!pread) {
+        // For seek + pread stream socket should be closed when the scanner is closed. HBASE-9393
+        reader.unbufferStream();
+      }
+    }
   }
 
   /** For testing */

http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 3ee120f..16c5f34 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -457,6 +457,12 @@ public class HFile {
     boolean isPrimaryReplicaReader();
 
     void setPrimaryReplicaReader(boolean isPrimaryReplicaReader);
+
+    /**
+     * To close the stream's socket. Note: This can be concurrently called from multiple threads and
+     * implementation should take care of thread safety.
+     */
+    void unbufferStream();
   }
 
   /**
@@ -473,8 +479,8 @@ public class HFile {
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
       justification="Intentional")
-  private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis,
-      long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
+  private static Reader openReader(Path path, FSDataInputStreamWrapper fsdis, long size,
+      CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
     FixedFileTrailer trailer = null;
     try {
       boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
@@ -495,10 +501,15 @@ public class HFile {
         LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t2);
       }
       throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t);
+    } finally {
+      fsdis.unbuffer();
     }
   }
 
   /**
+   * The sockets and the file descriptors held by the method parameter
+   * {@code FSDataInputStreamWrapper} passed will be freed after its usage so caller needs to ensure
+   * that no other threads have access to the same passed reference.
    * @param fs A file system
    * @param path Path to HFile
    * @param fsdis a stream of path's file
@@ -522,7 +533,7 @@ public class HFile {
     } else {
       hfs = (HFileSystem)fs;
     }
-    return pickReaderVersion(path, fsdis, size, cacheConf, hfs, conf);
+    return openReader(path, fsdis, size, cacheConf, hfs, conf);
   }
 
   /**
@@ -537,18 +548,21 @@ public class HFile {
       FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException {
     Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf");
     FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
-    return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(),
+    return openReader(path, stream, fs.getFileStatus(path).getLen(),
       cacheConf, stream.getHfs(), conf);
   }
 
   /**
-   * This factory method is used only by unit tests
+   * This factory method is used only by unit tests. <br/>
+   * The sockets and the file descriptors held by the method parameter
+   * {@code FSDataInputStreamWrapper} passed will be freed after its usage so caller needs to ensure
+   * that no other threads have access to the same passed reference.
    */
   static Reader createReaderFromStream(Path path,
       FSDataInputStream fsdis, long size, CacheConfig cacheConf, Configuration conf)
       throws IOException {
     FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
-    return pickReaderVersion(path, wrapper, size, cacheConf, null, conf);
+    return openReader(path, wrapper, size, cacheConf, null, conf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 3b014b9..d30ca19 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -1361,6 +1361,12 @@ public class HFileBlock implements Cacheable {
 
     /** Get the default decoder for blocks from this file. */
     HFileBlockDecodingContext getDefaultBlockDecodingContext();
+
+    /**
+     * To close the stream's socket. Note: This can be concurrently called from multiple threads and
+     * implementation should take care of thread safety.
+     */
+    void unbufferStream();
   }
 
   /**
@@ -1379,7 +1385,7 @@ public class HFileBlock implements Cacheable {
     /** The filesystem used to access data */
     protected HFileSystem hfs;
 
-    private final Lock streamLock = new ReentrantLock();
+    protected final Lock streamLock = new ReentrantLock();
 
     /** The default buffer size for our buffered streams */
     public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
@@ -1835,6 +1841,19 @@ public class HFileBlock implements Cacheable {
     }
 
     @Override
+    public void unbufferStream() {
+      // To handle concurrent reads, ensure that no other client is accessing the streams while we
+      // unbuffer it.
+      if (streamLock.tryLock()) {
+        try {
+          this.streamWrapper.unbuffer();
+        } finally {
+          streamLock.unlock();
+        }
+      }
+    }
+
+    @Override
     public String toString() {
       return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
index 0bca8e5..291a671 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
@@ -1429,4 +1429,9 @@ public class HFileReaderV2 extends AbstractHFileReader {
   boolean prefetchComplete() {
     return PrefetchExecutor.isCompleted(path);
   }
+
+  @Override
+  public void unbufferStream() {
+    fsBlockReader.unbufferStream();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
index 3e0f91f..b5f207c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
@@ -161,4 +161,9 @@ public interface HFileScanner {
    * @return the next key in the index (the key to seek to the next block)
    */
   Cell getNextIndexedKey();
+
+  /**
+   * Close the stream socket to handle RS CLOSE_WAIT. HBASE-9393
+   */
+  void close();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index 975d3c7..f5eb74f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -270,6 +270,7 @@ public class StoreFileScanner implements KeyValueScanner {
       this.reader.decrementRefCount();
     }
     closed = true;
+    this.hfs.close();
   }
 
   /**