You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/06/03 10:39:02 UTC

[hadoop] branch branch-3.3 updated: HADOOP-14566. Add seek support for SFTP FileSystem. (#1999)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new cf84bec  HADOOP-14566. Add seek support for SFTP FileSystem. (#1999)
cf84bec is described below

commit cf84bec6e3bb07f22cc13a858853e8ffe9326fb1
Author: Mike <m....@gmail.com>
AuthorDate: Wed Jun 3 13:37:40 2020 +0300

    HADOOP-14566. Add seek support for SFTP FileSystem. (#1999)
    
    
    Contributed by Mikhail Pryakhin
---
 .../org/apache/hadoop/fs/sftp/SFTPFileSystem.java  |  14 +--
 .../org/apache/hadoop/fs/sftp/SFTPInputStream.java | 110 +++++++++++++-------
 .../hadoop/fs/contract/AbstractFSContract.java     |  10 +-
 .../fs/contract/AbstractFSContractTestBase.java    |   3 +
 .../hadoop/fs/contract/sftp/SFTPContract.java      | 111 +++++++++++++++++++++
 .../fs/contract/sftp/TestSFTPContractSeek.java     |  31 ++++++
 .../src/test/resources/contract/sftp.xml           |  79 +++++++++++++++
 7 files changed, 313 insertions(+), 45 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java
index ed33357..a91b50f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.fs.sftp;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URLDecoder;
@@ -516,20 +515,21 @@ public class SFTPFileSystem extends FileSystem {
       disconnect(channel);
       throw new IOException(String.format(E_PATH_DIR, f));
     }
-    InputStream is;
     try {
       // the path could be a symbolic link, so get the real path
       absolute = new Path("/", channel.realpath(absolute.toUri().getPath()));
-
-      is = channel.get(absolute.toUri().getPath());
     } catch (SftpException e) {
       throw new IOException(e);
     }
-    return new FSDataInputStream(new SFTPInputStream(is, statistics)){
+    return new FSDataInputStream(
+        new SFTPInputStream(channel, absolute, statistics)){
       @Override
       public void close() throws IOException {
-        super.close();
-        disconnect(channel);
+        try {
+          super.close();
+        } finally {
+          disconnect(channel);
+        }
       }
     };
   }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java
index 7af299b..d0f9a8d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java
@@ -15,62 +15,107 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.hadoop.fs.sftp;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.SftpATTRS;
+import com.jcraft.jsch.SftpException;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 /** SFTP FileSystem input stream. */
 class SFTPInputStream extends FSInputStream {
 
-  public static final String E_SEEK_NOTSUPPORTED = "Seek not supported";
-  public static final String E_NULL_INPUTSTREAM = "Null InputStream";
-  public static final String E_STREAM_CLOSED = "Stream closed";
-
+  private final ChannelSftp channel;
+  private final Path path;
   private InputStream wrappedStream;
   private FileSystem.Statistics stats;
   private boolean closed;
   private long pos;
+  private long nextPos;
+  private long contentLength;
 
-  SFTPInputStream(InputStream stream,  FileSystem.Statistics stats) {
-
-    if (stream == null) {
-      throw new IllegalArgumentException(E_NULL_INPUTSTREAM);
+  SFTPInputStream(ChannelSftp channel, Path path, FileSystem.Statistics stats)
+      throws IOException {
+    try {
+      this.channel = channel;
+      this.path = path;
+      this.stats = stats;
+      this.wrappedStream = channel.get(path.toUri().getPath());
+      SftpATTRS stat = channel.lstat(path.toString());
+      this.contentLength = stat.getSize();
+    } catch (SftpException e) {
+      throw new IOException(e);
     }
-    this.wrappedStream = stream;
-    this.stats = stats;
+  }
 
-    this.pos = 0;
-    this.closed = false;
+  @Override
+  public synchronized void seek(long position) throws IOException {
+    checkNotClosed();
+    if (position < 0) {
+      throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+    }
+    nextPos = position;
   }
 
   @Override
-  public void seek(long position) throws IOException {
-    throw new IOException(E_SEEK_NOTSUPPORTED);
+  public synchronized int available() throws IOException {
+    checkNotClosed();
+    long remaining = contentLength - nextPos;
+    if (remaining > Integer.MAX_VALUE) {
+      return Integer.MAX_VALUE;
+    }
+    return (int) remaining;
+  }
+
+  private void seekInternal() throws IOException {
+    if (pos == nextPos) {
+      return;
+    }
+    if (nextPos > pos) {
+      long skipped = wrappedStream.skip(nextPos - pos);
+      pos = pos + skipped;
+    }
+    if (nextPos < pos) {
+      wrappedStream.close();
+      try {
+        wrappedStream = channel.get(path.toUri().getPath());
+        pos = wrappedStream.skip(nextPos);
+      } catch (SftpException e) {
+        throw new IOException(e);
+      }
+    }
   }
 
   @Override
   public boolean seekToNewSource(long targetPos) throws IOException {
-    throw new IOException(E_SEEK_NOTSUPPORTED);
+    return false;
   }
 
   @Override
-  public long getPos() throws IOException {
-    return pos;
+  public synchronized long getPos() throws IOException {
+    return nextPos;
   }
 
   @Override
   public synchronized int read() throws IOException {
-    if (closed) {
-      throw new IOException(E_STREAM_CLOSED);
+    checkNotClosed();
+    if (this.contentLength == 0 || (nextPos >= contentLength)) {
+      return -1;
     }
-
+    seekInternal();
     int byteRead = wrappedStream.read();
     if (byteRead >= 0) {
       pos++;
+      nextPos++;
     }
     if (stats != null & byteRead >= 0) {
       stats.incrementBytesRead(1);
@@ -78,23 +123,6 @@ class SFTPInputStream extends FSInputStream {
     return byteRead;
   }
 
-  public synchronized int read(byte[] buf, int off, int len)
-      throws IOException {
-    if (closed) {
-      throw new IOException(E_STREAM_CLOSED);
-    }
-
-    int result = wrappedStream.read(buf, off, len);
-    if (result > 0) {
-      pos += result;
-    }
-    if (stats != null & result > 0) {
-      stats.incrementBytesRead(result);
-    }
-
-    return result;
-  }
-
   public synchronized void close() throws IOException {
     if (closed) {
       return;
@@ -103,4 +131,12 @@ class SFTPInputStream extends FSInputStream {
     wrappedStream.close();
     closed = true;
   }
+
+  private void checkNotClosed() throws IOException {
+    if (closed) {
+      throw new IOException(
+          path.toUri() + ": " + FSExceptionMessages.STREAM_IS_CLOSED
+      );
+    }
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java
index f09496a..76d3116 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java
@@ -70,6 +70,14 @@ public abstract class AbstractFSContract extends Configured {
   }
 
   /**
+   * Any teardown logic can go here.
+   * @throws IOException IO problems
+   */
+  public void teardown() throws IOException {
+
+  }
+
+  /**
    * Add a configuration resource to this instance's configuration
    * @param resource resource reference
    * @throws AssertionError if the resource was not found.
@@ -113,7 +121,7 @@ public abstract class AbstractFSContract extends Configured {
   public abstract FileSystem getTestFileSystem() throws IOException;
 
   /**
-   * Get the scheme of this FS
+   * Get the scheme of this FS.
    * @return the scheme this FS supports
    */
   public abstract String getScheme();
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java
index 60373f6..ac9de6d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java
@@ -213,6 +213,9 @@ public abstract class AbstractFSContractTestBase extends Assert
     Thread.currentThread().setName("teardown");
     LOG.debug("== Teardown ==");
     deleteTestDirInTeardown();
+    if (contract != null) {
+      contract.teardown();
+    }
     LOG.debug("== Teardown complete ==");
   }
 
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java
new file mode 100644
index 0000000..f72a2ae
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.sftp;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.sftp.SFTPFileSystem;
+import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.auth.UserAuth;
+import org.apache.sshd.server.auth.password.UserAuthPasswordFactory;
+import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
+import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;
+
+public class SFTPContract extends AbstractFSContract {
+
+  private static final String CONTRACT_XML = "contract/sftp.xml";
+  private static final URI TEST_URI =
+      URI.create("sftp://user:password@localhost");
+  private final String testDataDir =
+      new FileSystemTestHelper().getTestRootDir();
+  private final Configuration conf;
+  private SshServer sshd;
+
+  public SFTPContract(Configuration conf) {
+    super(conf);
+    addConfResource(CONTRACT_XML);
+    this.conf = conf;
+  }
+
+  @Override
+  public void init() throws IOException {
+    sshd = SshServer.setUpDefaultServer();
+    // ask OS to assign a port
+    sshd.setPort(0);
+    sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
+
+    List<NamedFactory<UserAuth>> userAuthFactories = new ArrayList<>();
+    userAuthFactories.add(new UserAuthPasswordFactory());
+
+    sshd.setUserAuthFactories(userAuthFactories);
+    sshd.setPasswordAuthenticator((username, password, session) ->
+        username.equals("user") && password.equals("password")
+    );
+
+    sshd.setSubsystemFactories(
+        Collections.singletonList(new SftpSubsystemFactory()));
+
+    sshd.start();
+    int port = sshd.getPort();
+
+    conf.setClass("fs.sftp.impl", SFTPFileSystem.class, FileSystem.class);
+    conf.setInt("fs.sftp.host.port", port);
+    conf.setBoolean("fs.sftp.impl.disable.cache", true);
+  }
+
+  @Override
+  public void teardown() throws IOException {
+    if (sshd != null) {
+      sshd.stop();
+    }
+  }
+
+  @Override
+  public FileSystem getTestFileSystem() throws IOException {
+    return FileSystem.get(TEST_URI, conf);
+  }
+
+  @Override
+  public String getScheme() {
+    return "sftp";
+  }
+
+  @Override
+  public Path getTestPath() {
+    try {
+      FileSystem fs = FileSystem.get(
+          URI.create("sftp://user:password@localhost"), conf
+      );
+      return fs.makeQualified(new Path(testDataDir));
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java
new file mode 100644
index 0000000..20f4116
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.sftp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+public class TestSFTPContractSeek extends AbstractContractSeekTest {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new SFTPContract(conf);
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml b/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml
new file mode 100644
index 0000000..20a24b7
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml
@@ -0,0 +1,79 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<configuration>
+  <!--
+  SFTP -these options are for testing against a remote unix filesystem.
+  -->
+
+  <property>
+    <name>fs.contract.test.root-tests-enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.is-case-sensitive</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-append</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-atomic-directory-delete</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-atomic-rename</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-block-locality</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-concat</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-seek</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.rejects-seek-past-eof</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-strict-exceptions</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>fs.contract.supports-unix-permissions</name>
+    <value>false</value>
+  </property>
+
+</configuration>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org