You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/06/06 20:27:58 UTC
[2/2] hbase git commit: HBASE-9393 Hbase does not closing a closed
socket resulting in many CLOSE_WAIT
HBASE-9393 Hbase does not closing a closed socket resulting in many CLOSE_WAIT
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/356d4e91
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/356d4e91
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/356d4e91
Branch: refs/heads/branch-1
Commit: 356d4e9187fc6748169d3aaebe516fb2257d8835
Parents: 39e8e2f
Author: Ashish Singhi <as...@apache.org>
Authored: Tue Jun 6 17:49:08 2017 +0530
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Jun 6 12:59:19 2017 -0700
----------------------------------------------------------------------
.../hbase/io/FSDataInputStreamWrapper.java | 71 +++++++++++++++++++-
.../hadoop/hbase/io/HalfStoreFileReader.java | 4 ++
.../hbase/io/hfile/AbstractHFileReader.java | 8 +++
.../org/apache/hadoop/hbase/io/hfile/HFile.java | 26 +++++--
.../hadoop/hbase/io/hfile/HFileBlock.java | 21 +++++-
.../hadoop/hbase/io/hfile/HFileReaderV2.java | 5 ++
.../hadoop/hbase/io/hfile/HFileScanner.java | 5 ++
.../hbase/regionserver/StoreFileScanner.java | 1 +
8 files changed, 133 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index b06be6b..dc168da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -18,7 +18,12 @@
package org.apache.hadoop.hbase.io;
import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -32,6 +37,8 @@ import com.google.common.annotations.VisibleForTesting;
* see method comments.
*/
public class FSDataInputStreamWrapper {
+ private static final Log LOG = LogFactory.getLog(FSDataInputStreamWrapper.class);
+
private final HFileSystem hfs;
private final Path path;
private final FileLink link;
@@ -74,6 +81,11 @@ public class FSDataInputStreamWrapper {
// reads without hbase checksum verification.
private volatile int hbaseChecksumOffCount = -1;
+ private Boolean instanceOfCanUnbuffer = null;
+ // Using reflection to get org.apache.hadoop.fs.CanUnbuffer#unbuffer method to avoid compilation
+ // errors against Hadoop pre 2.6.4 and 2.7.1 versions.
+ private Method unbuffer = null;
+
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
this(fs, null, path, false);
}
@@ -219,4 +231,61 @@ public class FSDataInputStreamWrapper {
public HFileSystem getHfs() {
return this.hfs;
}
-}
+
+ /**
+ * This will free sockets and file descriptors held by the stream only when the stream implements
+ * org.apache.hadoop.fs.CanUnbuffer. NOT THREAD SAFE. Must be called only when all the clients
+ * using this stream to read the blocks have finished reading. If by chance the stream is
+ * unbuffered and there are clients still holding this stream for read then on next client read
+ * request a new socket will be opened by Datanode without client knowing about it and will serve
+ * its read request. Note: If this socket is idle for some time then the DataNode will close the
+ * socket and the socket will move into CLOSE_WAIT state and on the next client request on this
+ * stream, the current socket will be closed and a new socket will be opened to serve the
+ * requests.
+ */
+ @SuppressWarnings({ "rawtypes" })
+ public void unbuffer() {
+ FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
+ if (stream != null) {
+ InputStream wrappedStream = stream.getWrappedStream();
+ // CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop
+ // 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the
+ // CanUnbuffer interface or not and based on that call the unbuffer api.
+ final Class<? extends InputStream> streamClass = wrappedStream.getClass();
+ if (this.instanceOfCanUnbuffer == null) {
+ // To ensure we compute whether the stream is instance of CanUnbuffer only once.
+ this.instanceOfCanUnbuffer = false;
+ Class<?>[] streamInterfaces = streamClass.getInterfaces();
+ for (Class c : streamInterfaces) {
+ if (c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer")) {
+ try {
+ this.unbuffer = streamClass.getDeclaredMethod("unbuffer");
+ } catch (NoSuchMethodException | SecurityException e) {
+ LOG.warn("Failed to find 'unbuffer' method in class " + streamClass
+ + " . So there may be a TCP socket connection "
+ + "left open in CLOSE_WAIT state.",
+ e);
+ return;
+ }
+ this.instanceOfCanUnbuffer = true;
+ break;
+ }
+ }
+ }
+ if (this.instanceOfCanUnbuffer) {
+ try {
+ this.unbuffer.invoke(wrappedStream);
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ LOG.warn("Failed to invoke 'unbuffer' method in class " + streamClass
+ + " . So there may be a TCP socket connection left open in CLOSE_WAIT state.",
+ e);
+ }
+ } else {
+ LOG.warn("Failed to find 'unbuffer' method in class " + streamClass
+ + " . So there may be a TCP socket connection "
+ + "left open in CLOSE_WAIT state. For more details check "
+ + "https://issues.apache.org/jira/browse/HBASE-9393");
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
index ed2e925..c259fc2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
@@ -322,6 +322,10 @@ public class HalfStoreFileReader extends StoreFile.Reader {
public Cell getNextIndexedKey() {
return null;
}
+
+ @Override
+ public void close() {
+ }
};
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
index 7d8b572..5039427 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
@@ -332,6 +332,14 @@ public abstract class AbstractHFileReader
public HFile.Reader getReader() {
return reader;
}
+
+ @Override
+ public void close() {
+ if (!pread) {
+ // For seek + pread stream socket should be closed when the scanner is closed. HBASE-9393
+ reader.unbufferStream();
+ }
+ }
}
/** For testing */
http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 3ee120f..16c5f34 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -457,6 +457,12 @@ public class HFile {
boolean isPrimaryReplicaReader();
void setPrimaryReplicaReader(boolean isPrimaryReplicaReader);
+
+ /**
+ * To close the stream's socket. Note: This can be concurrently called from multiple threads and
+ * implementation should take care of thread safety.
+ */
+ void unbufferStream();
}
/**
@@ -473,8 +479,8 @@ public class HFile {
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
justification="Intentional")
- private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis,
- long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
+ private static Reader openReader(Path path, FSDataInputStreamWrapper fsdis, long size,
+ CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
FixedFileTrailer trailer = null;
try {
boolean isHBaseChecksum = fsdis.shouldUseHBaseChecksum();
@@ -495,10 +501,15 @@ public class HFile {
LOG.warn("Error closing fsdis FSDataInputStreamWrapper", t2);
}
throw new CorruptHFileException("Problem reading HFile Trailer from file " + path, t);
+ } finally {
+ fsdis.unbuffer();
}
}
/**
+ * The sockets and the file descriptors held by the method parameter
+ * {@code FSDataInputStreamWrapper} passed will be freed after its usage so caller needs to ensure
+ * that no other threads have access to the same passed reference.
* @param fs A file system
* @param path Path to HFile
* @param fsdis a stream of path's file
@@ -522,7 +533,7 @@ public class HFile {
} else {
hfs = (HFileSystem)fs;
}
- return pickReaderVersion(path, fsdis, size, cacheConf, hfs, conf);
+ return openReader(path, fsdis, size, cacheConf, hfs, conf);
}
/**
@@ -537,18 +548,21 @@ public class HFile {
FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) throws IOException {
Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf");
FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
- return pickReaderVersion(path, stream, fs.getFileStatus(path).getLen(),
+ return openReader(path, stream, fs.getFileStatus(path).getLen(),
cacheConf, stream.getHfs(), conf);
}
/**
- * This factory method is used only by unit tests
+ * This factory method is used only by unit tests. <br/>
+ * The sockets and the file descriptors held by the method parameter
+ * {@code FSDataInputStreamWrapper} passed will be freed after its usage so caller needs to ensure
+ * that no other threads have access to the same passed reference.
*/
static Reader createReaderFromStream(Path path,
FSDataInputStream fsdis, long size, CacheConfig cacheConf, Configuration conf)
throws IOException {
FSDataInputStreamWrapper wrapper = new FSDataInputStreamWrapper(fsdis);
- return pickReaderVersion(path, wrapper, size, cacheConf, null, conf);
+ return openReader(path, wrapper, size, cacheConf, null, conf);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 3b014b9..d30ca19 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -1361,6 +1361,12 @@ public class HFileBlock implements Cacheable {
/** Get the default decoder for blocks from this file. */
HFileBlockDecodingContext getDefaultBlockDecodingContext();
+
+ /**
+ * To close the stream's socket. Note: This can be concurrently called from multiple threads and
+ * implementation should take care of thread safety.
+ */
+ void unbufferStream();
}
/**
@@ -1379,7 +1385,7 @@ public class HFileBlock implements Cacheable {
/** The filesystem used to access data */
protected HFileSystem hfs;
- private final Lock streamLock = new ReentrantLock();
+ protected final Lock streamLock = new ReentrantLock();
/** The default buffer size for our buffered streams */
public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
@@ -1835,6 +1841,19 @@ public class HFileBlock implements Cacheable {
}
@Override
+ public void unbufferStream() {
+ // To handle concurrent reads, ensure that no other client is accessing the streams while we
+ // unbuffer it.
+ if (streamLock.tryLock()) {
+ try {
+ this.streamWrapper.unbuffer();
+ } finally {
+ streamLock.unlock();
+ }
+ }
+ }
+
+ @Override
public String toString() {
return "hfs=" + hfs + ", path=" + pathName + ", fileContext=" + fileContext;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
index 0bca8e5..291a671 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
@@ -1429,4 +1429,9 @@ public class HFileReaderV2 extends AbstractHFileReader {
boolean prefetchComplete() {
return PrefetchExecutor.isCompleted(path);
}
+
+ @Override
+ public void unbufferStream() {
+ fsBlockReader.unbufferStream();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
index 3e0f91f..b5f207c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
@@ -161,4 +161,9 @@ public interface HFileScanner {
* @return the next key in the index (the key to seek to the next block)
*/
Cell getNextIndexedKey();
+
+ /**
+ * Close the stream socket to handle RS CLOSE_WAIT. HBASE-9393
+ */
+ void close();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/356d4e91/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index 975d3c7..f5eb74f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -270,6 +270,7 @@ public class StoreFileScanner implements KeyValueScanner {
this.reader.decrementRefCount();
}
closed = true;
+ this.hfs.close();
}
/**