You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/04/27 13:44:05 UTC

[hadoop] branch trunk updated: HDFS-1820. FTPFileSystem attempts to close the outputstream even when it is not initialised. (#1952)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 18d7dfb  HDFS-1820. FTPFileSystem attempts to close the outputstream even when it is not initialised. (#1952)
18d7dfb is described below

commit 18d7dfbf35564694e24bf2b7c99fea1bee1c790e
Author: Mike <m....@gmail.com>
AuthorDate: Mon Apr 27 16:43:51 2020 +0300

    HDFS-1820. FTPFileSystem attempts to close the outputstream even when it is not initialised. (#1952)
    
    
    Contributed by Mikhail Pryakhin.
---
 hadoop-common-project/hadoop-common/pom.xml        |  5 ++
 .../org/apache/hadoop/fs/ftp/FTPFileSystem.java    | 27 ++++--
 .../org/apache/hadoop/fs/ftp/FtpTestServer.java    | 99 ++++++++++++++++++++++
 .../apache/hadoop/fs/ftp/TestFTPFileSystem.java    | 84 +++++++++++++++++-
 4 files changed, 204 insertions(+), 11 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 777e001..d768907 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -276,6 +276,11 @@
       <artifactId>sshd-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ftpserver</groupId>
+      <artifactId>ftpserver-core</artifactId>
+      <scope>test</scope>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.htrace</groupId>
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
index 4b144bf..28db2c9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.ftp;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.ConnectException;
 import java.net.URI;
 
@@ -41,6 +42,7 @@ import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progressable;
 import org.slf4j.Logger;
@@ -110,7 +112,9 @@ public class FTPFileSystem extends FileSystem {
 
     // get port information from uri, (overrides info in conf)
     int port = uri.getPort();
-    port = (port == -1) ? FTP.DEFAULT_PORT : port;
+    if(port == -1){
+      port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT);
+    }
     conf.setInt(FS_FTP_HOST_PORT, port);
 
     // get user/password information from URI (overrides info in conf)
@@ -340,8 +344,19 @@ public class FTPFileSystem extends FileSystem {
     // file. The FTP client connection is closed when close() is called on the
     // FSDataOutputStream.
     client.changeWorkingDirectory(parent.toUri().getPath());
-    FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file
-        .getName()), statistics) {
+    OutputStream outputStream = client.storeFileStream(file.getName());
+
+    if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
+      // The ftpClient is an inconsistent state. Must close the stream
+      // which in turn will logout and disconnect from FTP server
+      if (outputStream != null) {
+        IOUtils.closeStream(outputStream);
+      }
+      disconnect(client);
+      throw new IOException("Unable to create file: " + file + ", Aborting");
+    }
+
+    FSDataOutputStream fos = new FSDataOutputStream(outputStream, statistics) {
       @Override
       public void close() throws IOException {
         super.close();
@@ -356,12 +371,6 @@ public class FTPFileSystem extends FileSystem {
         }
       }
     };
-    if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
-      // The ftpClient is an inconsistent state. Must close the stream
-      // which in turn will logout and disconnect from FTP server
-      fos.close();
-      throw new IOException("Unable to create file: " + file + ", Aborting");
-    }
     return fos;
   }
 
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/FtpTestServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/FtpTestServer.java
new file mode 100644
index 0000000..eca26de
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/FtpTestServer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ftp;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+
+import org.apache.ftpserver.FtpServer;
+import org.apache.ftpserver.FtpServerFactory;
+import org.apache.ftpserver.ftplet.Authority;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.UserManager;
+import org.apache.ftpserver.impl.DefaultFtpServer;
+import org.apache.ftpserver.listener.Listener;
+import org.apache.ftpserver.listener.ListenerFactory;
+import org.apache.ftpserver.usermanager.PropertiesUserManagerFactory;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+
+/**
+ * Helper class facilitating to manage a local ftp
+ * server for unit tests purposes only.
+ */
+public class FtpTestServer {
+
+  private int port;
+  private Path ftpRoot;
+  private UserManager userManager;
+  private FtpServer server;
+
+  public FtpTestServer(Path ftpRoot) {
+    this.ftpRoot = ftpRoot;
+    this.userManager = new PropertiesUserManagerFactory().createUserManager();
+    FtpServerFactory serverFactory = createServerFactory();
+    serverFactory.setUserManager(userManager);
+    this.server = serverFactory.createServer();
+  }
+
+  public FtpTestServer start() throws Exception {
+    server.start();
+    Listener listener = ((DefaultFtpServer) server)
+        .getListeners()
+        .get("default");
+    port = listener.getPort();
+    return this;
+  }
+
+  public Path getFtpRoot() {
+    return ftpRoot;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public void stop() {
+    if (!server.isStopped()) {
+      server.stop();
+    }
+  }
+
+  public BaseUser addUser(String name, String password,
+      Authority... authorities) throws IOException, FtpException {
+
+    BaseUser user = new BaseUser();
+    user.setName(name);
+    user.setPassword(password);
+    Path userHome = Files.createDirectory(ftpRoot.resolve(name));
+    user.setHomeDirectory(userHome.toString());
+    user.setAuthorities(Arrays.asList(authorities));
+    userManager.save(user);
+    return user;
+  }
+
+  private FtpServerFactory createServerFactory() {
+    FtpServerFactory serverFactory = new FtpServerFactory();
+    ListenerFactory defaultListener = new ListenerFactory();
+    defaultListener.setPort(0);
+    serverFactory.addListener("default", defaultListener.createListener());
+    return serverFactory;
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java
index 3d41ccb..02d5a48 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java
@@ -17,18 +17,35 @@
  */
 package org.apache.hadoop.fs.ftp;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Comparator;
+
 import com.google.common.base.Preconditions;
 import org.apache.commons.net.ftp.FTP;
-
 import org.apache.commons.net.ftp.FTPClient;
 import org.apache.commons.net.ftp.FTPFile;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+import org.apache.ftpserver.usermanager.impl.WritePermission;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
-
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -37,9 +54,72 @@ import static org.junit.Assert.assertEquals;
  */
 public class TestFTPFileSystem {
 
+  private FtpTestServer server;
+
   @Rule
   public Timeout testTimeout = new Timeout(180000);
 
+  @Before
+  public void setUp() throws Exception {
+    server = new FtpTestServer(GenericTestUtils.getTestDir().toPath()).start();
+  }
+
+  @After
+  @SuppressWarnings("ResultOfMethodCallIgnored")
+  public void tearDown() throws Exception {
+    if (server != null) {
+      server.stop();
+      Files.walk(server.getFtpRoot())
+          .sorted(Comparator.reverseOrder())
+          .map(java.nio.file.Path::toFile)
+          .forEach(File::delete);
+    }
+  }
+
+  @Test
+  public void testCreateWithWritePermissions() throws Exception {
+    BaseUser user = server.addUser("test", "password", new WritePermission());
+    Configuration configuration = new Configuration();
+    configuration.set("fs.defaultFS", "ftp:///");
+    configuration.set("fs.ftp.host", "localhost");
+    configuration.setInt("fs.ftp.host.port", server.getPort());
+    configuration.set("fs.ftp.user.localhost", user.getName());
+    configuration.set("fs.ftp.password.localhost", user.getPassword());
+    configuration.setBoolean("fs.ftp.impl.disable.cache", true);
+
+    FileSystem fs = FileSystem.get(configuration);
+    byte[] bytesExpected = "hello world".getBytes(StandardCharsets.UTF_8);
+    try (FSDataOutputStream outputStream = fs.create(new Path("test1.txt"))) {
+      outputStream.write(bytesExpected);
+    }
+    try (FSDataInputStream input = fs.open(new Path("test1.txt"))) {
+      assertThat(bytesExpected, equalTo(IOUtils.readFullyToByteArray(input)));
+    }
+  }
+
+  @Test
+  public void testCreateWithoutWritePermissions() throws Exception {
+    BaseUser user = server.addUser("test", "password");
+    Configuration configuration = new Configuration();
+    configuration.set("fs.defaultFS", "ftp:///");
+    configuration.set("fs.ftp.host", "localhost");
+    configuration.setInt("fs.ftp.host.port", server.getPort());
+    configuration.set("fs.ftp.user.localhost", user.getName());
+    configuration.set("fs.ftp.password.localhost", user.getPassword());
+    configuration.setBoolean("fs.ftp.impl.disable.cache", true);
+
+    FileSystem fs = FileSystem.get(configuration);
+    byte[] bytesExpected = "hello world".getBytes(StandardCharsets.UTF_8);
+    LambdaTestUtils.intercept(
+        IOException.class, "Unable to create file: test1.txt, Aborting",
+        () -> {
+          try (FSDataOutputStream out = fs.create(new Path("test1.txt"))) {
+            out.write(bytesExpected);
+          }
+        }
+    );
+  }
+
   @Test
   public void testFTPDefaultPort() throws Exception {
     FTPFileSystem ftp = new FTPFileSystem();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org