You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/02/23 17:04:00 UTC

[hadoop] branch trunk updated: HADOOP-17528. SFTP File System: close the connection pool when closing a FileSystem (#2701)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7b7c001  HADOOP-17528. SFTP File System: close the connection pool when closing a FileSystem (#2701)
7b7c001 is described below

commit 7b7c0019f4232055acd51880d6461f5cf14b54cc
Author: Mike <m....@gmail.com>
AuthorDate: Tue Feb 23 20:03:27 2021 +0300

    HADOOP-17528. SFTP File System: close the connection pool when closing a FileSystem (#2701)
    
    
    Contributed by Mike Pryakhin.
---
 .../org/apache/hadoop/fs/sftp/SFTPFileSystem.java  | 31 +++++++++++++++++++++-
 .../apache/hadoop/fs/sftp/TestSFTPFileSystem.java  | 11 ++++++++
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java
index 898f615..297ec04 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java
@@ -24,6 +24,7 @@ import java.net.URI;
 import java.net.URLDecoder;
 import java.util.ArrayList;
 import java.util.Vector;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -50,6 +51,7 @@ public class SFTPFileSystem extends FileSystem {
 
   private SFTPConnectionPool connectionPool;
   private URI uri;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
 
   private static final int DEFAULT_SFTP_PORT = 22;
   private static final int DEFAULT_MAX_CONNECTION = 5;
@@ -83,6 +85,7 @@ public class SFTPFileSystem extends FileSystem {
       "Destination path %s already exist, cannot rename!";
   public static final String E_FAILED_GETHOME = "Failed to get home directory";
   public static final String E_FAILED_DISCONNECT = "Failed to disconnect";
+  public static final String E_FS_CLOSED = "FileSystem is closed!";
 
   /**
    * Set configuration from UI.
@@ -138,8 +141,9 @@ public class SFTPFileSystem extends FileSystem {
    * @throws IOException
    */
   private ChannelSftp connect() throws IOException {
-    Configuration conf = getConf();
+    checkNotClosed();
 
+    Configuration conf = getConf();
     String host = conf.get(FS_SFTP_HOST, null);
     int port = conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT);
     String user = conf.get(FS_SFTP_USER_PREFIX + host, null);
@@ -703,6 +707,31 @@ public class SFTPFileSystem extends FileSystem {
     }
   }
 
+  @Override
+  public void close() throws IOException {
+    if (closed.getAndSet(true)) {
+      return;
+    }
+    try {
+      super.close();
+    } finally {
+      if (connectionPool != null) {
+        connectionPool.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Verify that the input stream is open. Non blocking; this gives
+   * the last state of the volatile {@link #closed} field.
+   * @throws IOException if the connection is closed.
+   */
+  private void checkNotClosed() throws IOException {
+    if (closed.get()) {
+      throw new IOException(uri + ": " + E_FS_CLOSED);
+    }
+  }
+
   @VisibleForTesting
   SFTPConnectionPool getConnectionPool() {
     return connectionPool;
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java
index 6939262..58452f8 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java
@@ -374,4 +374,15 @@ public class TestSFTPFileSystem {
     assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
         is(1));
   }
+
+  @Test
+  public void testCloseFileSystemClosesConnectionPool() throws Exception {
+    SFTPFileSystem fs = (SFTPFileSystem) sftpFs;
+    fs.getHomeDirectory();
+    assertThat(fs.getConnectionPool().getLiveConnCount(), is(1));
+    fs.close();
+    assertThat(fs.getConnectionPool().getLiveConnCount(), is(0));
+    ///making sure that re-entrant close calls are safe
+    fs.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org