You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/02/23 17:04:00 UTC
[hadoop] branch trunk updated: HADOOP-17528. SFTP File System:
close the connection pool when closing a FileSystem (#2701)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7b7c001 HADOOP-17528. SFTP File System: close the connection pool when closing a FileSystem (#2701)
7b7c001 is described below
commit 7b7c0019f4232055acd51880d6461f5cf14b54cc
Author: Mike <m....@gmail.com>
AuthorDate: Tue Feb 23 20:03:27 2021 +0300
HADOOP-17528. SFTP File System: close the connection pool when closing a FileSystem (#2701)
Contributed by Mike Pryakhin.
---
.../org/apache/hadoop/fs/sftp/SFTPFileSystem.java | 31 +++++++++++++++++++++-
.../apache/hadoop/fs/sftp/TestSFTPFileSystem.java | 11 ++++++++
2 files changed, 41 insertions(+), 1 deletion(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java
index 898f615..297ec04 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java
@@ -24,6 +24,7 @@ import java.net.URI;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Vector;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -50,6 +51,7 @@ public class SFTPFileSystem extends FileSystem {
private SFTPConnectionPool connectionPool;
private URI uri;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
private static final int DEFAULT_SFTP_PORT = 22;
private static final int DEFAULT_MAX_CONNECTION = 5;
@@ -83,6 +85,7 @@ public class SFTPFileSystem extends FileSystem {
"Destination path %s already exist, cannot rename!";
public static final String E_FAILED_GETHOME = "Failed to get home directory";
public static final String E_FAILED_DISCONNECT = "Failed to disconnect";
+ public static final String E_FS_CLOSED = "FileSystem is closed!";
/**
* Set configuration from UI.
@@ -138,8 +141,9 @@ public class SFTPFileSystem extends FileSystem {
* @throws IOException
*/
private ChannelSftp connect() throws IOException {
- Configuration conf = getConf();
+ checkNotClosed();
+ Configuration conf = getConf();
String host = conf.get(FS_SFTP_HOST, null);
int port = conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT);
String user = conf.get(FS_SFTP_USER_PREFIX + host, null);
@@ -703,6 +707,31 @@ public class SFTPFileSystem extends FileSystem {
}
}
+ @Override
+ public void close() throws IOException {
+ if (closed.getAndSet(true)) {
+ return;
+ }
+ try {
+ super.close();
+ } finally {
+ if (connectionPool != null) {
+ connectionPool.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Verify that the input stream is open. Non blocking; this gives
+ * the last state of the volatile {@link #closed} field.
+ * @throws IOException if the connection is closed.
+ */
+ private void checkNotClosed() throws IOException {
+ if (closed.get()) {
+ throw new IOException(uri + ": " + E_FS_CLOSED);
+ }
+ }
+
@VisibleForTesting
SFTPConnectionPool getConnectionPool() {
return connectionPool;
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java
index 6939262..58452f8 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java
@@ -374,4 +374,15 @@ public class TestSFTPFileSystem {
assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
is(1));
}
+
+ @Test
+ public void testCloseFileSystemClosesConnectionPool() throws Exception {
+ SFTPFileSystem fs = (SFTPFileSystem) sftpFs;
+ fs.getHomeDirectory();
+ assertThat(fs.getConnectionPool().getLiveConnCount(), is(1));
+ fs.close();
+ assertThat(fs.getConnectionPool().getLiveConnCount(), is(0));
+ ///making sure that re-entrant close calls are safe
+ fs.close();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org