You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/04/27 13:47:07 UTC
[hadoop] branch branch-3.3 updated: HDFS-1820. FTPFileSystem
attempts to close the outputstream even when it is not initialised. (#1952)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 68d8802 HDFS-1820. FTPFileSystem attempts to close the outputstream even when it is not initialised. (#1952)
68d8802 is described below
commit 68d8802624a33e998c6ce5175bab37f2710fdb0b
Author: Mike <m....@gmail.com>
AuthorDate: Mon Apr 27 16:43:51 2020 +0300
HDFS-1820. FTPFileSystem attempts to close the outputstream even when it is not initialised. (#1952)
Contributed by Mikhail Pryakhin.
---
hadoop-common-project/hadoop-common/pom.xml | 5 ++
.../org/apache/hadoop/fs/ftp/FTPFileSystem.java | 27 ++++--
.../org/apache/hadoop/fs/ftp/FtpTestServer.java | 99 ++++++++++++++++++++++
.../apache/hadoop/fs/ftp/TestFTPFileSystem.java | 84 +++++++++++++++++-
4 files changed, 204 insertions(+), 11 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 737db05..4765977 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -276,6 +276,11 @@
<artifactId>sshd-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.ftpserver</groupId>
+ <artifactId>ftpserver-core</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.htrace</groupId>
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
index 4b144bf..28db2c9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.ftp;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.net.ConnectException;
import java.net.URI;
@@ -41,6 +42,7 @@ import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
@@ -110,7 +112,9 @@ public class FTPFileSystem extends FileSystem {
// get port information from uri, (overrides info in conf)
int port = uri.getPort();
- port = (port == -1) ? FTP.DEFAULT_PORT : port;
+ if(port == -1){
+ port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT);
+ }
conf.setInt(FS_FTP_HOST_PORT, port);
// get user/password information from URI (overrides info in conf)
@@ -340,8 +344,19 @@ public class FTPFileSystem extends FileSystem {
// file. The FTP client connection is closed when close() is called on the
// FSDataOutputStream.
client.changeWorkingDirectory(parent.toUri().getPath());
- FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file
- .getName()), statistics) {
+ OutputStream outputStream = client.storeFileStream(file.getName());
+
+ if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
+ // The ftpClient is an inconsistent state. Must close the stream
+ // which in turn will logout and disconnect from FTP server
+ if (outputStream != null) {
+ IOUtils.closeStream(outputStream);
+ }
+ disconnect(client);
+ throw new IOException("Unable to create file: " + file + ", Aborting");
+ }
+
+ FSDataOutputStream fos = new FSDataOutputStream(outputStream, statistics) {
@Override
public void close() throws IOException {
super.close();
@@ -356,12 +371,6 @@ public class FTPFileSystem extends FileSystem {
}
}
};
- if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
- // The ftpClient is an inconsistent state. Must close the stream
- // which in turn will logout and disconnect from FTP server
- fos.close();
- throw new IOException("Unable to create file: " + file + ", Aborting");
- }
return fos;
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/FtpTestServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/FtpTestServer.java
new file mode 100644
index 0000000..eca26de
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/FtpTestServer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ftp;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+
+import org.apache.ftpserver.FtpServer;
+import org.apache.ftpserver.FtpServerFactory;
+import org.apache.ftpserver.ftplet.Authority;
+import org.apache.ftpserver.ftplet.FtpException;
+import org.apache.ftpserver.ftplet.UserManager;
+import org.apache.ftpserver.impl.DefaultFtpServer;
+import org.apache.ftpserver.listener.Listener;
+import org.apache.ftpserver.listener.ListenerFactory;
+import org.apache.ftpserver.usermanager.PropertiesUserManagerFactory;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+
+/**
+ * Helper class facilitating to manage a local ftp
+ * server for unit tests purposes only.
+ */
+public class FtpTestServer {
+
+ private int port;
+ private Path ftpRoot;
+ private UserManager userManager;
+ private FtpServer server;
+
+ public FtpTestServer(Path ftpRoot) {
+ this.ftpRoot = ftpRoot;
+ this.userManager = new PropertiesUserManagerFactory().createUserManager();
+ FtpServerFactory serverFactory = createServerFactory();
+ serverFactory.setUserManager(userManager);
+ this.server = serverFactory.createServer();
+ }
+
+ public FtpTestServer start() throws Exception {
+ server.start();
+ Listener listener = ((DefaultFtpServer) server)
+ .getListeners()
+ .get("default");
+ port = listener.getPort();
+ return this;
+ }
+
+ public Path getFtpRoot() {
+ return ftpRoot;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void stop() {
+ if (!server.isStopped()) {
+ server.stop();
+ }
+ }
+
+ public BaseUser addUser(String name, String password,
+ Authority... authorities) throws IOException, FtpException {
+
+ BaseUser user = new BaseUser();
+ user.setName(name);
+ user.setPassword(password);
+ Path userHome = Files.createDirectory(ftpRoot.resolve(name));
+ user.setHomeDirectory(userHome.toString());
+ user.setAuthorities(Arrays.asList(authorities));
+ userManager.save(user);
+ return user;
+ }
+
+ private FtpServerFactory createServerFactory() {
+ FtpServerFactory serverFactory = new FtpServerFactory();
+ ListenerFactory defaultListener = new ListenerFactory();
+ defaultListener.setPort(0);
+ serverFactory.addListener("default", defaultListener.createListener());
+ return serverFactory;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java
index 3d41ccb..02d5a48 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java
@@ -17,18 +17,35 @@
*/
package org.apache.hadoop.fs.ftp;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Comparator;
+
import com.google.common.base.Preconditions;
import org.apache.commons.net.ftp.FTP;
-
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+import org.apache.ftpserver.usermanager.impl.WritePermission;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
-
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
/**
@@ -37,9 +54,72 @@ import static org.junit.Assert.assertEquals;
*/
public class TestFTPFileSystem {
+ private FtpTestServer server;
+
@Rule
public Timeout testTimeout = new Timeout(180000);
+ @Before
+ public void setUp() throws Exception {
+ server = new FtpTestServer(GenericTestUtils.getTestDir().toPath()).start();
+ }
+
+ @After
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ public void tearDown() throws Exception {
+ if (server != null) {
+ server.stop();
+ Files.walk(server.getFtpRoot())
+ .sorted(Comparator.reverseOrder())
+ .map(java.nio.file.Path::toFile)
+ .forEach(File::delete);
+ }
+ }
+
+ @Test
+ public void testCreateWithWritePermissions() throws Exception {
+ BaseUser user = server.addUser("test", "password", new WritePermission());
+ Configuration configuration = new Configuration();
+ configuration.set("fs.defaultFS", "ftp:///");
+ configuration.set("fs.ftp.host", "localhost");
+ configuration.setInt("fs.ftp.host.port", server.getPort());
+ configuration.set("fs.ftp.user.localhost", user.getName());
+ configuration.set("fs.ftp.password.localhost", user.getPassword());
+ configuration.setBoolean("fs.ftp.impl.disable.cache", true);
+
+ FileSystem fs = FileSystem.get(configuration);
+ byte[] bytesExpected = "hello world".getBytes(StandardCharsets.UTF_8);
+ try (FSDataOutputStream outputStream = fs.create(new Path("test1.txt"))) {
+ outputStream.write(bytesExpected);
+ }
+ try (FSDataInputStream input = fs.open(new Path("test1.txt"))) {
+ assertThat(bytesExpected, equalTo(IOUtils.readFullyToByteArray(input)));
+ }
+ }
+
+ @Test
+ public void testCreateWithoutWritePermissions() throws Exception {
+ BaseUser user = server.addUser("test", "password");
+ Configuration configuration = new Configuration();
+ configuration.set("fs.defaultFS", "ftp:///");
+ configuration.set("fs.ftp.host", "localhost");
+ configuration.setInt("fs.ftp.host.port", server.getPort());
+ configuration.set("fs.ftp.user.localhost", user.getName());
+ configuration.set("fs.ftp.password.localhost", user.getPassword());
+ configuration.setBoolean("fs.ftp.impl.disable.cache", true);
+
+ FileSystem fs = FileSystem.get(configuration);
+ byte[] bytesExpected = "hello world".getBytes(StandardCharsets.UTF_8);
+ LambdaTestUtils.intercept(
+ IOException.class, "Unable to create file: test1.txt, Aborting",
+ () -> {
+ try (FSDataOutputStream out = fs.create(new Path("test1.txt"))) {
+ out.write(bytesExpected);
+ }
+ }
+ );
+ }
+
@Test
public void testFTPDefaultPort() throws Exception {
FTPFileSystem ftp = new FTPFileSystem();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org