You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/06/03 10:39:02 UTC
[hadoop] branch branch-3.3 updated: HADOOP-14566. Add seek support
for SFTP FileSystem. (#1999)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new cf84bec HADOOP-14566. Add seek support for SFTP FileSystem. (#1999)
cf84bec is described below
commit cf84bec6e3bb07f22cc13a858853e8ffe9326fb1
Author: Mike <m....@gmail.com>
AuthorDate: Wed Jun 3 13:37:40 2020 +0300
HADOOP-14566. Add seek support for SFTP FileSystem. (#1999)
Contributed by Mikhail Pryakhin
---
.../org/apache/hadoop/fs/sftp/SFTPFileSystem.java | 14 +--
.../org/apache/hadoop/fs/sftp/SFTPInputStream.java | 110 +++++++++++++-------
.../hadoop/fs/contract/AbstractFSContract.java | 10 +-
.../fs/contract/AbstractFSContractTestBase.java | 3 +
.../hadoop/fs/contract/sftp/SFTPContract.java | 111 +++++++++++++++++++++
.../fs/contract/sftp/TestSFTPContractSeek.java | 31 ++++++
.../src/test/resources/contract/sftp.xml | 79 +++++++++++++++
7 files changed, 313 insertions(+), 45 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java
index ed33357..a91b50f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.fs.sftp;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URLDecoder;
@@ -516,20 +515,21 @@ public class SFTPFileSystem extends FileSystem {
disconnect(channel);
throw new IOException(String.format(E_PATH_DIR, f));
}
- InputStream is;
try {
// the path could be a symbolic link, so get the real path
absolute = new Path("/", channel.realpath(absolute.toUri().getPath()));
-
- is = channel.get(absolute.toUri().getPath());
} catch (SftpException e) {
throw new IOException(e);
}
- return new FSDataInputStream(new SFTPInputStream(is, statistics)){
+ return new FSDataInputStream(
+ new SFTPInputStream(channel, absolute, statistics)){
@Override
public void close() throws IOException {
- super.close();
- disconnect(channel);
+ try {
+ super.close();
+ } finally {
+ disconnect(channel);
+ }
}
};
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java
index 7af299b..d0f9a8d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java
@@ -15,62 +15,107 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.fs.sftp;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.SftpATTRS;
+import com.jcraft.jsch.SftpException;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
/** SFTP FileSystem input stream. */
class SFTPInputStream extends FSInputStream {
- public static final String E_SEEK_NOTSUPPORTED = "Seek not supported";
- public static final String E_NULL_INPUTSTREAM = "Null InputStream";
- public static final String E_STREAM_CLOSED = "Stream closed";
-
+ private final ChannelSftp channel;
+ private final Path path;
private InputStream wrappedStream;
private FileSystem.Statistics stats;
private boolean closed;
private long pos;
+ private long nextPos;
+ private long contentLength;
- SFTPInputStream(InputStream stream, FileSystem.Statistics stats) {
-
- if (stream == null) {
- throw new IllegalArgumentException(E_NULL_INPUTSTREAM);
+ SFTPInputStream(ChannelSftp channel, Path path, FileSystem.Statistics stats)
+ throws IOException {
+ try {
+ this.channel = channel;
+ this.path = path;
+ this.stats = stats;
+ this.wrappedStream = channel.get(path.toUri().getPath());
+ SftpATTRS stat = channel.lstat(path.toString());
+ this.contentLength = stat.getSize();
+ } catch (SftpException e) {
+ throw new IOException(e);
}
- this.wrappedStream = stream;
- this.stats = stats;
+ }
- this.pos = 0;
- this.closed = false;
+ @Override
+ public synchronized void seek(long position) throws IOException {
+ checkNotClosed();
+ if (position < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+ }
+ nextPos = position;
}
@Override
- public void seek(long position) throws IOException {
- throw new IOException(E_SEEK_NOTSUPPORTED);
+ public synchronized int available() throws IOException {
+ checkNotClosed();
+ long remaining = contentLength - nextPos;
+ if (remaining > Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ }
+ return (int) remaining;
+ }
+
+ private void seekInternal() throws IOException {
+ if (pos == nextPos) {
+ return;
+ }
+ if (nextPos > pos) {
+ long skipped = wrappedStream.skip(nextPos - pos);
+ pos = pos + skipped;
+ }
+ if (nextPos < pos) {
+ wrappedStream.close();
+ try {
+ wrappedStream = channel.get(path.toUri().getPath());
+ pos = wrappedStream.skip(nextPos);
+ } catch (SftpException e) {
+ throw new IOException(e);
+ }
+ }
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
- throw new IOException(E_SEEK_NOTSUPPORTED);
+ return false;
}
@Override
- public long getPos() throws IOException {
- return pos;
+ public synchronized long getPos() throws IOException {
+ return nextPos;
}
@Override
public synchronized int read() throws IOException {
- if (closed) {
- throw new IOException(E_STREAM_CLOSED);
+ checkNotClosed();
+ if (this.contentLength == 0 || (nextPos >= contentLength)) {
+ return -1;
}
-
+ seekInternal();
int byteRead = wrappedStream.read();
if (byteRead >= 0) {
pos++;
+ nextPos++;
}
if (stats != null & byteRead >= 0) {
stats.incrementBytesRead(1);
@@ -78,23 +123,6 @@ class SFTPInputStream extends FSInputStream {
return byteRead;
}
- public synchronized int read(byte[] buf, int off, int len)
- throws IOException {
- if (closed) {
- throw new IOException(E_STREAM_CLOSED);
- }
-
- int result = wrappedStream.read(buf, off, len);
- if (result > 0) {
- pos += result;
- }
- if (stats != null & result > 0) {
- stats.incrementBytesRead(result);
- }
-
- return result;
- }
-
public synchronized void close() throws IOException {
if (closed) {
return;
@@ -103,4 +131,12 @@ class SFTPInputStream extends FSInputStream {
wrappedStream.close();
closed = true;
}
+
+ private void checkNotClosed() throws IOException {
+ if (closed) {
+ throw new IOException(
+ path.toUri() + ": " + FSExceptionMessages.STREAM_IS_CLOSED
+ );
+ }
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java
index f09496a..76d3116 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java
@@ -70,6 +70,14 @@ public abstract class AbstractFSContract extends Configured {
}
/**
+ * Any teardown logic can go here.
+ * @throws IOException IO problems
+ */
+ public void teardown() throws IOException {
+
+ }
+
+ /**
* Add a configuration resource to this instance's configuration
* @param resource resource reference
* @throws AssertionError if the resource was not found.
@@ -113,7 +121,7 @@ public abstract class AbstractFSContract extends Configured {
public abstract FileSystem getTestFileSystem() throws IOException;
/**
- * Get the scheme of this FS
+ * Get the scheme of this FS.
* @return the scheme this FS supports
*/
public abstract String getScheme();
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java
index 60373f6..ac9de6d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java
@@ -213,6 +213,9 @@ public abstract class AbstractFSContractTestBase extends Assert
Thread.currentThread().setName("teardown");
LOG.debug("== Teardown ==");
deleteTestDirInTeardown();
+ if (contract != null) {
+ contract.teardown();
+ }
LOG.debug("== Teardown complete ==");
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java
new file mode 100644
index 0000000..f72a2ae
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.sftp;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.sftp.SFTPFileSystem;
+import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.auth.UserAuth;
+import org.apache.sshd.server.auth.password.UserAuthPasswordFactory;
+import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
+import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;
+
+public class SFTPContract extends AbstractFSContract {
+
+ private static final String CONTRACT_XML = "contract/sftp.xml";
+ private static final URI TEST_URI =
+ URI.create("sftp://user:password@localhost");
+ private final String testDataDir =
+ new FileSystemTestHelper().getTestRootDir();
+ private final Configuration conf;
+ private SshServer sshd;
+
+ public SFTPContract(Configuration conf) {
+ super(conf);
+ addConfResource(CONTRACT_XML);
+ this.conf = conf;
+ }
+
+ @Override
+ public void init() throws IOException {
+ sshd = SshServer.setUpDefaultServer();
+ // ask OS to assign a port
+ sshd.setPort(0);
+ sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
+
+ List<NamedFactory<UserAuth>> userAuthFactories = new ArrayList<>();
+ userAuthFactories.add(new UserAuthPasswordFactory());
+
+ sshd.setUserAuthFactories(userAuthFactories);
+ sshd.setPasswordAuthenticator((username, password, session) ->
+ username.equals("user") && password.equals("password")
+ );
+
+ sshd.setSubsystemFactories(
+ Collections.singletonList(new SftpSubsystemFactory()));
+
+ sshd.start();
+ int port = sshd.getPort();
+
+ conf.setClass("fs.sftp.impl", SFTPFileSystem.class, FileSystem.class);
+ conf.setInt("fs.sftp.host.port", port);
+ conf.setBoolean("fs.sftp.impl.disable.cache", true);
+ }
+
+ @Override
+ public void teardown() throws IOException {
+ if (sshd != null) {
+ sshd.stop();
+ }
+ }
+
+ @Override
+ public FileSystem getTestFileSystem() throws IOException {
+ return FileSystem.get(TEST_URI, conf);
+ }
+
+ @Override
+ public String getScheme() {
+ return "sftp";
+ }
+
+ @Override
+ public Path getTestPath() {
+ try {
+ FileSystem fs = FileSystem.get(
+ URI.create("sftp://user:password@localhost"), conf
+ );
+ return fs.makeQualified(new Path(testDataDir));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java
new file mode 100644
index 0000000..20f4116
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.sftp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestSFTPContractSeek extends AbstractContractSeekTest {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new SFTPContract(conf);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml b/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml
new file mode 100644
index 0000000..20a24b7
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml
@@ -0,0 +1,79 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <!--
+ SFTP -these options are for testing against a remote unix filesystem.
+ -->
+
+ <property>
+ <name>fs.contract.test.root-tests-enabled</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>fs.contract.is-case-sensitive</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-append</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-atomic-directory-delete</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-atomic-rename</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-block-locality</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-concat</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-seek</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.rejects-seek-past-eof</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-strict-exceptions</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>fs.contract.supports-unix-permissions</name>
+ <value>false</value>
+ </property>
+
+</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org