You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2018/04/16 11:47:53 UTC
[06/30] mina-sshd git commit: [SSHD-815] Extract SFTP in its own
module
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
new file mode 100644
index 0000000..3743477
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java
@@ -0,0 +1,1069 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.server.subsystem.sftp;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.UnknownServiceException;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystemLoopException;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.NotDirectoryException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.sshd.common.Factory;
+import org.apache.sshd.common.FactoryManager;
+import org.apache.sshd.common.digest.BuiltinDigests;
+import org.apache.sshd.common.digest.DigestFactory;
+import org.apache.sshd.common.file.FileSystemAware;
+import org.apache.sshd.common.random.Random;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.subsystem.sftp.SftpHelper;
+import org.apache.sshd.common.subsystem.sftp.extensions.openssh.FsyncExtensionParser;
+import org.apache.sshd.common.subsystem.sftp.extensions.openssh.HardLinkExtensionParser;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ValidateUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.BufferUtils;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.io.IoUtils;
+import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
+import org.apache.sshd.common.util.threads.ThreadUtils;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.Environment;
+import org.apache.sshd.server.ExitCallback;
+import org.apache.sshd.server.SessionAware;
+import org.apache.sshd.server.session.ServerSession;
+
+/**
+ * SFTP subsystem
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class SftpSubsystem
+ extends AbstractSftpSubsystemHelper
+ implements Command, Runnable, SessionAware, FileSystemAware, ExecutorServiceCarrier {
+
+ /**
+ * Properties key for the maximum of available open handles per session.
+ */
+ public static final String MAX_OPEN_HANDLES_PER_SESSION = "max-open-handles-per-session";
+ public static final int DEFAULT_MAX_OPEN_HANDLES = Integer.MAX_VALUE;
+
+ /**
+ * Size in bytes of the opaque handle value
+ *
+ * @see #DEFAULT_FILE_HANDLE_SIZE
+ */
+ public static final String FILE_HANDLE_SIZE = "sftp-handle-size";
+ public static final int MIN_FILE_HANDLE_SIZE = 4; // ~uint32
+ public static final int DEFAULT_FILE_HANDLE_SIZE = 16;
+ public static final int MAX_FILE_HANDLE_SIZE = 64; // ~sha512
+
+ /**
+ * Max. rounds to attempt to create a unique file handle - if all handles
+ * already in use after these many rounds, then an exception is thrown
+ *
+ * @see #generateFileHandle(Path)
+ * @see #DEFAULT_FILE_HANDLE_ROUNDS
+ */
+ public static final String MAX_FILE_HANDLE_RAND_ROUNDS = "sftp-handle-rand-max-rounds";
+ public static final int MIN_FILE_HANDLE_ROUNDS = 1;
+ public static final int DEFAULT_FILE_HANDLE_ROUNDS = MIN_FILE_HANDLE_SIZE;
+ public static final int MAX_FILE_HANDLE_ROUNDS = MAX_FILE_HANDLE_SIZE;
+
+ /**
+ * Maximum amount of data allocated for listing the contents of a directory
+ * in any single invocation of {@link #doReadDir(Buffer, int)}
+ *
+ * @see #DEFAULT_MAX_READDIR_DATA_SIZE
+ */
+ public static final String MAX_READDIR_DATA_SIZE_PROP = "sftp-max-readdir-data-size";
+ public static final int DEFAULT_MAX_READDIR_DATA_SIZE = 16 * 1024;
+
+ protected ExitCallback callback;
+ protected InputStream in;
+ protected OutputStream out;
+ protected OutputStream err;
+ protected Environment env;
+ protected Random randomizer;
+ protected int fileHandleSize = DEFAULT_FILE_HANDLE_SIZE;
+ protected int maxFileHandleRounds = DEFAULT_FILE_HANDLE_ROUNDS;
+ protected Future<?> pendingFuture;
+ protected byte[] workBuf = new byte[Math.max(DEFAULT_FILE_HANDLE_SIZE, Integer.BYTES)];
+ protected FileSystem fileSystem = FileSystems.getDefault();
+ protected Path defaultDir = fileSystem.getPath(System.getProperty("user.dir"));
+ protected long requestsCount;
+ protected int version;
+ protected final Map<String, byte[]> extensions = new TreeMap<>(Comparator.naturalOrder());
+ protected final Map<String, Handle> handles = new HashMap<>();
+
+ private ServerSession serverSession;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private ExecutorService executorService;
+ private boolean shutdownOnExit;
+
+ /**
+ * @param executorService The {@link ExecutorService} to be used by
+ * the {@link SftpSubsystem} command when starting execution. If
+ * {@code null} then a single-threaded ad-hoc service is used.
+ * @param shutdownOnExit If {@code true} the {@link ExecutorService#shutdownNow()}
+ * will be called when subsystem terminates - unless it is the ad-hoc
+ * service, which will be shutdown regardless
+ * @param policy The {@link UnsupportedAttributePolicy} to use if failed to access
+ * some local file attributes
+ * @param accessor The {@link SftpFileSystemAccessor} to use for opening files and directories
+ * @param errorStatusDataHandler The (never {@code null}) {@link SftpErrorStatusDataHandler} to
+ * use when generating failed commands error messages
+ * @see ThreadUtils#newSingleThreadExecutor(String)
+ */
+ public SftpSubsystem(ExecutorService executorService, boolean shutdownOnExit, UnsupportedAttributePolicy policy,
+ SftpFileSystemAccessor accessor, SftpErrorStatusDataHandler errorStatusDataHandler) {
+ super(policy, accessor, errorStatusDataHandler);
+
+ if (executorService == null) {
+ this.executorService = ThreadUtils.newSingleThreadExecutor(getClass().getSimpleName());
+ this.shutdownOnExit = true; // we always close the ad-hoc executor service
+ } else {
+ this.executorService = executorService;
+ this.shutdownOnExit = shutdownOnExit;
+ }
+ }
+
+ @Override
+ public int getVersion() {
+ return version;
+ }
+
+ @Override
+ public Path getDefaultDirectory() {
+ return defaultDir;
+ }
+
+ @Override
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ @Override
+ public boolean isShutdownOnExit() {
+ return shutdownOnExit;
+ }
+
+ @Override
+ public void setSession(ServerSession session) {
+ this.serverSession = Objects.requireNonNull(session, "No session");
+
+ FactoryManager manager = session.getFactoryManager();
+ Factory<? extends Random> factory = manager.getRandomFactory();
+ this.randomizer = factory.create();
+
+ this.fileHandleSize = session.getIntProperty(FILE_HANDLE_SIZE, DEFAULT_FILE_HANDLE_SIZE);
+ ValidateUtils.checkTrue(this.fileHandleSize >= MIN_FILE_HANDLE_SIZE, "File handle size too small: %d", this.fileHandleSize);
+ ValidateUtils.checkTrue(this.fileHandleSize <= MAX_FILE_HANDLE_SIZE, "File handle size too big: %d", this.fileHandleSize);
+
+ this.maxFileHandleRounds = session.getIntProperty(MAX_FILE_HANDLE_RAND_ROUNDS, DEFAULT_FILE_HANDLE_ROUNDS);
+ ValidateUtils.checkTrue(this.maxFileHandleRounds >= MIN_FILE_HANDLE_ROUNDS, "File handle rounds too small: %d", this.maxFileHandleRounds);
+ ValidateUtils.checkTrue(this.maxFileHandleRounds <= MAX_FILE_HANDLE_ROUNDS, "File handle rounds too big: %d", this.maxFileHandleRounds);
+
+ if (workBuf.length < this.fileHandleSize) {
+ workBuf = new byte[this.fileHandleSize];
+ }
+ }
+
+ @Override
+ public ServerSession getServerSession() {
+ return serverSession;
+ }
+
+ @Override
+ public void setFileSystem(FileSystem fileSystem) {
+ if (fileSystem != this.fileSystem) {
+ this.fileSystem = fileSystem;
+
+ Iterable<Path> roots = Objects.requireNonNull(fileSystem.getRootDirectories(), "No root directories");
+ Iterator<Path> available = Objects.requireNonNull(roots.iterator(), "No roots iterator");
+ ValidateUtils.checkTrue(available.hasNext(), "No available root");
+ this.defaultDir = available.next();
+ }
+ }
+
+ @Override
+ public void setExitCallback(ExitCallback callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public void setInputStream(InputStream in) {
+ this.in = in;
+ }
+
+ @Override
+ public void setOutputStream(OutputStream out) {
+ this.out = out;
+ }
+
+ @Override
+ public void setErrorStream(OutputStream err) {
+ this.err = err;
+ }
+
+ @Override
+ public void start(Environment env) throws IOException {
+ this.env = env;
+ try {
+ ExecutorService executor = getExecutorService();
+ pendingFuture = executor.submit(this);
+ } catch (RuntimeException e) { // e.g., RejectedExecutionException
+ log.error("Failed (" + e.getClass().getSimpleName() + ") to start command: " + e.toString(), e);
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ for (long count = 1L;; count++) {
+ int length = BufferUtils.readInt(in, workBuf, 0, workBuf.length);
+ ValidateUtils.checkTrue(length >= (Integer.BYTES + 1 /* command */), "Bad length to read: %d", length);
+
+ Buffer buffer = new ByteArrayBuffer(length + Integer.BYTES + Long.SIZE /* a bit extra */, false);
+ buffer.putInt(length);
+ for (int remainLen = length; remainLen > 0;) {
+ int l = in.read(buffer.array(), buffer.wpos(), remainLen);
+ if (l < 0) {
+ throw new IllegalArgumentException("Premature EOF at buffer #" + count + " while read length=" + length + " and remain=" + remainLen);
+ }
+ buffer.wpos(buffer.wpos() + l);
+ remainLen -= l;
+ }
+
+ process(buffer);
+ }
+ } catch (Throwable t) {
+ if ((!closed.get()) && (!(t instanceof EOFException))) { // Ignore
+ log.error("run({}) {} caught in SFTP subsystem: {}",
+ getServerSession(), t.getClass().getSimpleName(), t.getMessage());
+ if (log.isDebugEnabled()) {
+ log.debug("run(" + getServerSession() + ") caught exception details", t);
+ }
+ }
+ } finally {
+ boolean debugEnabled = log.isDebugEnabled();
+ handles.forEach((id, handle) -> {
+ try {
+ handle.close();
+ if (debugEnabled) {
+ log.debug("run({}) closed pending handle {} [{}]", getServerSession(), id, handle);
+ }
+ } catch (IOException ioe) {
+ log.error("run({}) failed ({}) to close handle={}[{}]: {}",
+ getServerSession(), ioe.getClass().getSimpleName(), id, handle, ioe.getMessage());
+ }
+ });
+
+ callback.onExit(0);
+ }
+ }
+
+ @Override
+ protected void process(Buffer buffer) throws IOException {
+ int length = buffer.getInt();
+ int type = buffer.getUByte();
+ int id = buffer.getInt();
+ if (log.isDebugEnabled()) {
+ log.debug("process({})[length={}, type={}, id={}] processing",
+ getServerSession(), length, SftpConstants.getCommandMessageName(type), id);
+ }
+
+ switch (type) {
+ case SftpConstants.SSH_FXP_INIT:
+ doInit(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_OPEN:
+ doOpen(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_CLOSE:
+ doClose(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_READ:
+ doRead(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_WRITE:
+ doWrite(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_LSTAT:
+ doLStat(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_FSTAT:
+ doFStat(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_SETSTAT:
+ doSetStat(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_FSETSTAT:
+ doFSetStat(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_OPENDIR:
+ doOpenDir(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_READDIR:
+ doReadDir(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_REMOVE:
+ doRemove(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_MKDIR:
+ doMakeDirectory(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_RMDIR:
+ doRemoveDirectory(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_REALPATH:
+ doRealPath(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_STAT:
+ doStat(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_RENAME:
+ doRename(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_READLINK:
+ doReadLink(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_SYMLINK:
+ doSymLink(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_LINK:
+ doLink(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_BLOCK:
+ doBlock(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_UNBLOCK:
+ doUnblock(buffer, id);
+ break;
+ case SftpConstants.SSH_FXP_EXTENDED:
+ doExtended(buffer, id);
+ break;
+ default:
+ {
+ String name = SftpConstants.getCommandMessageName(type);
+ log.warn("process({})[length={}, type={}, id={}] unknown command",
+ getServerSession(), length, name, id);
+ sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command " + name + " is unsupported or not implemented");
+ }
+ }
+
+ if (type != SftpConstants.SSH_FXP_INIT) {
+ requestsCount++;
+ }
+ }
+
+ @Override
+ protected void executeExtendedCommand(Buffer buffer, int id, String extension) throws IOException {
+ switch (extension) {
+ case SftpConstants.EXT_TEXT_SEEK:
+ doTextSeek(buffer, id);
+ break;
+ case SftpConstants.EXT_VERSION_SELECT:
+ doVersionSelect(buffer, id);
+ break;
+ case SftpConstants.EXT_COPY_FILE:
+ doCopyFile(buffer, id);
+ break;
+ case SftpConstants.EXT_COPY_DATA:
+ doCopyData(buffer, id);
+ break;
+ case SftpConstants.EXT_MD5_HASH:
+ case SftpConstants.EXT_MD5_HASH_HANDLE:
+ doMD5Hash(buffer, id, extension);
+ break;
+ case SftpConstants.EXT_CHECK_FILE_HANDLE:
+ case SftpConstants.EXT_CHECK_FILE_NAME:
+ doCheckFileHash(buffer, id, extension);
+ break;
+ case FsyncExtensionParser.NAME:
+ doOpenSSHFsync(buffer, id);
+ break;
+ case SftpConstants.EXT_SPACE_AVAILABLE:
+ doSpaceAvailable(buffer, id);
+ break;
+ case HardLinkExtensionParser.NAME:
+ doOpenSSHHardLink(buffer, id);
+ break;
+ default:
+ if (log.isDebugEnabled()) {
+ log.debug("executeExtendedCommand({}) received unsupported SSH_FXP_EXTENDED({})", getServerSession(), extension);
+ }
+ sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OP_UNSUPPORTED, "Command SSH_FXP_EXTENDED(" + extension + ") is unsupported or not implemented");
+ break;
+ }
+ }
+
+ @Override
+ protected void createLink(int id, String existingPath, String linkPath, boolean symLink) throws IOException {
+ Path link = resolveFile(linkPath);
+ Path existing = fileSystem.getPath(existingPath);
+ if (log.isDebugEnabled()) {
+ log.debug("createLink({})[id={}], existing={}[{}], link={}[{}], symlink={})",
+ getServerSession(), id, linkPath, link, existingPath, existing, symLink);
+ }
+
+ SftpEventListener listener = getSftpEventListenerProxy();
+ ServerSession session = getServerSession();
+ listener.linking(session, link, existing, symLink);
+ try {
+ if (symLink) {
+ Files.createSymbolicLink(link, existing);
+ } else {
+ Files.createLink(link, existing);
+ }
+ } catch (IOException | RuntimeException e) {
+ listener.linked(session, link, existing, symLink, e);
+ throw e;
+ }
+ listener.linked(session, link, existing, symLink, null);
+ }
+
+ @Override
+ protected void doTextSeek(int id, String handle, long line) throws IOException {
+ Handle h = handles.get(handle);
+ if (log.isDebugEnabled()) {
+ log.debug("doTextSeek({})[id={}] SSH_FXP_EXTENDED(text-seek) (handle={}[{}], line={})",
+ getServerSession(), id, handle, h, line);
+ }
+
+ FileHandle fileHandle = validateHandle(handle, h, FileHandle.class);
+ throw new UnknownServiceException("doTextSeek(" + fileHandle + ")");
+ }
+
+ @Override
+ protected void doOpenSSHFsync(int id, String handle) throws IOException {
+ Handle h = handles.get(handle);
+ if (log.isDebugEnabled()) {
+ log.debug("doOpenSSHFsync({})[id={}] {}[{}]", getServerSession(), id, handle, h);
+ }
+
+ FileHandle fileHandle = validateHandle(handle, h, FileHandle.class);
+ SftpFileSystemAccessor accessor = getFileSystemAccessor();
+ ServerSession session = getServerSession();
+ accessor.syncFileData(session, this, fileHandle.getFile(), fileHandle.getFileHandle(), fileHandle.getFileChannel());
+ }
+
+ @Override
+ protected void doCheckFileHash(
+ int id, String targetType, String target, Collection<String> algos,
+ long startOffset, long length, int blockSize, Buffer buffer)
+ throws Exception {
+ Path path;
+ if (SftpConstants.EXT_CHECK_FILE_HANDLE.equalsIgnoreCase(targetType)) {
+ Handle h = handles.get(target);
+ FileHandle fileHandle = validateHandle(target, h, FileHandle.class);
+ path = fileHandle.getFile();
+
+ /*
+ * To quote http://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/draft-ietf-secsh-filexfer-09.txt section 9.1.2:
+ *
+ * If ACE4_READ_DATA was not included when the file was opened,
+ * the server MUST return STATUS_PERMISSION_DENIED.
+ */
+ int access = fileHandle.getAccessMask();
+ if ((access & SftpConstants.ACE4_READ_DATA) == 0) {
+ throw new AccessDeniedException(path.toString(), path.toString(), "File not opened for read");
+ }
+ } else {
+ path = resolveFile(target);
+
+ /*
+ * To quote http://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/draft-ietf-secsh-filexfer-09.txt section 9.1.2:
+ *
+ * If 'check-file-name' refers to a SSH_FILEXFER_TYPE_SYMLINK, the
+ * target should be opened.
+ */
+ for (int index = 0; Files.isSymbolicLink(path) && (index < Byte.MAX_VALUE /* TODO make this configurable */); index++) {
+ path = Files.readSymbolicLink(path);
+ }
+
+ if (Files.isSymbolicLink(path)) {
+ throw new FileSystemLoopException(target);
+ }
+
+ if (Files.isDirectory(path, IoUtils.getLinkOptions(false))) {
+ throw new NotDirectoryException(path.toString());
+ }
+ }
+
+ ValidateUtils.checkNotNullAndNotEmpty(algos, "No hash algorithms specified");
+
+ DigestFactory factory = null;
+ for (String a : algos) {
+ factory = BuiltinDigests.fromFactoryName(a);
+ if ((factory != null) && factory.isSupported()) {
+ break;
+ }
+ }
+ ValidateUtils.checkNotNull(factory, "No matching digest factory found for %s", algos);
+
+ doCheckFileHash(id, path, factory, startOffset, length, blockSize, buffer);
+ }
+
+ @Override
+ protected byte[] doMD5Hash(
+ int id, String targetType, String target, long startOffset, long length, byte[] quickCheckHash)
+ throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("doMD5Hash({})({})[{}] offset={}, length={}, quick-hash={}",
+ getServerSession(), targetType, target, startOffset, length,
+ BufferUtils.toHex(':', quickCheckHash));
+ }
+
+ Path path;
+ if (SftpConstants.EXT_MD5_HASH_HANDLE.equalsIgnoreCase(targetType)) {
+ Handle h = handles.get(target);
+ FileHandle fileHandle = validateHandle(target, h, FileHandle.class);
+ path = fileHandle.getFile();
+
+ /*
+ * To quote http://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/draft-ietf-secsh-filexfer-09.txt section 9.1.1:
+ *
+ * The handle MUST be a file handle, and ACE4_READ_DATA MUST
+ * have been included in the desired-access when the file
+ * was opened
+ */
+ int access = fileHandle.getAccessMask();
+ if ((access & SftpConstants.ACE4_READ_DATA) == 0) {
+ throw new AccessDeniedException(path.toString(), path.toString(), "File not opened for read");
+ }
+ } else {
+ path = resolveFile(target);
+ if (Files.isDirectory(path, IoUtils.getLinkOptions(true))) {
+ throw new NotDirectoryException(path.toString());
+ }
+ }
+
+ /*
+ * To quote http://tools.ietf.org/wg/secsh/draft-ietf-secsh-filexfer/draft-ietf-secsh-filexfer-09.txt section 9.1.1:
+ *
+ * If both start-offset and length are zero, the entire file should be included
+ */
+ long effectiveLength = length;
+ long totalSize = Files.size(path);
+ if ((startOffset == 0L) && (length == 0L)) {
+ effectiveLength = totalSize;
+ } else {
+ long maxRead = startOffset + effectiveLength;
+ if (maxRead > totalSize) {
+ effectiveLength = totalSize - startOffset;
+ }
+ }
+
+ return doMD5Hash(id, path, startOffset, effectiveLength, quickCheckHash);
+ }
+
+ protected void doVersionSelect(Buffer buffer, int id) throws IOException {
+ String proposed = buffer.getString();
+ ServerSession session = getServerSession();
+ /*
+ * The 'version-select' MUST be the first request from the client to the
+ * server; if it is not, the server MUST fail the request and close the
+ * channel.
+ */
+ if (requestsCount > 0L) {
+ sendStatus(BufferUtils.clear(buffer), id,
+ SftpConstants.SSH_FX_FAILURE,
+ "Version selection not the 1st request for proposal = " + proposed);
+ session.close(true);
+ return;
+ }
+
+ Boolean result = validateProposedVersion(buffer, id, proposed);
+ /*
+ * "MUST then close the channel without processing any further requests"
+ */
+ if (result == null) { // response sent internally
+ session.close(true);
+ return;
+ }
+ if (result) {
+ version = Integer.parseInt(proposed);
+ sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_OK, "");
+ } else {
+ sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_FAILURE, "Unsupported version " + proposed);
+ session.close(true);
+ }
+ }
+
+ @Override
+ protected void doBlock(int id, String handle, long offset, long length, int mask) throws IOException {
+ Handle p = handles.get(handle);
+ if (log.isDebugEnabled()) {
+ log.debug("doBlock({})[id={}] SSH_FXP_BLOCK (handle={}[{}], offset={}, length={}, mask=0x{})",
+ getServerSession(), id, handle, p, offset, length, Integer.toHexString(mask));
+ }
+
+ FileHandle fileHandle = validateHandle(handle, p, FileHandle.class);
+ SftpEventListener listener = getSftpEventListenerProxy();
+ ServerSession session = getServerSession();
+ listener.blocking(session, handle, fileHandle, offset, length, mask);
+ try {
+ fileHandle.lock(offset, length, mask);
+ } catch (IOException | RuntimeException e) {
+ listener.blocked(session, handle, fileHandle, offset, length, mask, e);
+ throw e;
+ }
+ listener.blocked(session, handle, fileHandle, offset, length, mask, null);
+ }
+
+ @Override
+ protected void doUnblock(int id, String handle, long offset, long length) throws IOException {
+ Handle p = handles.get(handle);
+ if (log.isDebugEnabled()) {
+ log.debug("doUnblock({})[id={}] SSH_FXP_UNBLOCK (handle={}[{}], offset={}, length={})",
+ getServerSession(), id, handle, p, offset, length);
+ }
+
+ FileHandle fileHandle = validateHandle(handle, p, FileHandle.class);
+ SftpEventListener listener = getSftpEventListenerProxy();
+ ServerSession session = getServerSession();
+ listener.unblocking(session, handle, fileHandle, offset, length);
+ try {
+ fileHandle.unlock(offset, length);
+ } catch (IOException | RuntimeException e) {
+ listener.unblocked(session, handle, fileHandle, offset, length, e);
+ throw e;
+ }
+ listener.unblocked(session, handle, fileHandle, offset, length, null);
+ }
+
+ @Override
+ @SuppressWarnings("resource")
+ protected void doCopyData(int id, String readHandle, long readOffset, long readLength, String writeHandle, long writeOffset) throws IOException {
+ boolean inPlaceCopy = readHandle.equals(writeHandle);
+ Handle rh = handles.get(readHandle);
+ Handle wh = inPlaceCopy ? rh : handles.get(writeHandle);
+ if (log.isDebugEnabled()) {
+ log.debug("doCopyData({})[id={}] SSH_FXP_EXTENDED[{}] read={}[{}], read-offset={}, read-length={}, write={}[{}], write-offset={})",
+ getServerSession(), id, SftpConstants.EXT_COPY_DATA,
+ readHandle, rh, readOffset, readLength,
+ writeHandle, wh, writeOffset);
+ }
+
+ FileHandle srcHandle = validateHandle(readHandle, rh, FileHandle.class);
+ Path srcPath = srcHandle.getFile();
+ int srcAccess = srcHandle.getAccessMask();
+ if ((srcAccess & SftpConstants.ACE4_READ_DATA) != SftpConstants.ACE4_READ_DATA) {
+ throw new AccessDeniedException(srcPath.toString(), srcPath.toString(), "Source file not opened for read");
+ }
+
+ ValidateUtils.checkTrue(readLength >= 0L, "Invalid read length: %d", readLength);
+ ValidateUtils.checkTrue(readOffset >= 0L, "Invalid read offset: %d", readOffset);
+
+ long totalSize = Files.size(srcHandle.getFile());
+ long effectiveLength = readLength;
+ if (effectiveLength == 0L) {
+ effectiveLength = totalSize - readOffset;
+ } else {
+ long maxRead = readOffset + effectiveLength;
+ if (maxRead > totalSize) {
+ effectiveLength = totalSize - readOffset;
+ }
+ }
+ ValidateUtils.checkTrue(effectiveLength > 0L, "Non-positive effective copy data length: %d", effectiveLength);
+
+ FileHandle dstHandle = inPlaceCopy ? srcHandle : validateHandle(writeHandle, wh, FileHandle.class);
+ int dstAccess = dstHandle.getAccessMask();
+ if ((dstAccess & SftpConstants.ACE4_WRITE_DATA) != SftpConstants.ACE4_WRITE_DATA) {
+ throw new AccessDeniedException(srcHandle.toString(), srcHandle.toString(), "Source handle not opened for write");
+ }
+
+ ValidateUtils.checkTrue(writeOffset >= 0L, "Invalid write offset: %d", writeOffset);
+ // check if overlapping ranges as per the draft
+ if (inPlaceCopy) {
+ long maxRead = readOffset + effectiveLength;
+ if (maxRead > totalSize) {
+ maxRead = totalSize;
+ }
+
+ long maxWrite = writeOffset + effectiveLength;
+ if (maxWrite > readOffset) {
+ throw new IllegalArgumentException("Write range end [" + writeOffset + "-" + maxWrite + "]"
+ + " overlaps with read range [" + readOffset + "-" + maxRead + "]");
+ } else if (maxRead > writeOffset) {
+ throw new IllegalArgumentException("Read range end [" + readOffset + "-" + maxRead + "]"
+ + " overlaps with write range [" + writeOffset + "-" + maxWrite + "]");
+ }
+ }
+
+ byte[] copyBuf = new byte[Math.min(IoUtils.DEFAULT_COPY_SIZE, (int) effectiveLength)];
+ while (effectiveLength > 0L) {
+ int remainLength = Math.min(copyBuf.length, (int) effectiveLength);
+ int readLen = srcHandle.read(copyBuf, 0, remainLength, readOffset);
+ if (readLen < 0) {
+ throw new EOFException("Premature EOF while still remaining " + effectiveLength + " bytes");
+ }
+ dstHandle.write(copyBuf, 0, readLen, writeOffset);
+
+ effectiveLength -= readLen;
+ readOffset += readLen;
+ writeOffset += readLen;
+ }
+ }
+
+ @Override
+ protected void doReadDir(Buffer buffer, int id) throws IOException {
+ String handle = buffer.getString();
+ Handle h = handles.get(handle);
+ boolean debugEnabled = log.isDebugEnabled();
+ if (debugEnabled) {
+ log.debug("doReadDir({})[id={}] SSH_FXP_READDIR (handle={}[{}])",
+ getServerSession(), id, handle, h);
+ }
+
+ Buffer reply = null;
+ try {
+ DirectoryHandle dh = validateHandle(handle, h, DirectoryHandle.class);
+ if (dh.isDone()) {
+ sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_EOF, "Directory reading is done");
+ return;
+ }
+
+ Path file = dh.getFile();
+ LinkOption[] options =
+ getPathResolutionLinkOption(SftpConstants.SSH_FXP_READDIR, "", file);
+ Boolean status = IoUtils.checkFileExists(file, options);
+ if (status == null) {
+ throw new AccessDeniedException(file.toString(), file.toString(), "Cannot determine existence of read-dir");
+ }
+
+ if (!status) {
+ throw new NoSuchFileException(file.toString(), file.toString(), "Non-existent directory");
+ } else if (!Files.isDirectory(file, options)) {
+ throw new NotDirectoryException(file.toString());
+ } else if (!Files.isReadable(file)) {
+ throw new AccessDeniedException(file.toString(), file.toString(), "Not readable");
+ }
+
+ if (dh.isSendDot() || dh.isSendDotDot() || dh.hasNext()) {
+ // There is at least one file in the directory or we need to send the "..".
+ // Send only a few files at a time to not create packets of a too
+ // large size or have a timeout to occur.
+
+ reply = BufferUtils.clear(buffer);
+ reply.putByte((byte) SftpConstants.SSH_FXP_NAME);
+ reply.putInt(id);
+
+ int lenPos = reply.wpos();
+ reply.putInt(0);
+
+ ServerSession session = getServerSession();
+ int maxDataSize = session.getIntProperty(MAX_READDIR_DATA_SIZE_PROP, DEFAULT_MAX_READDIR_DATA_SIZE);
+ int count = doReadDir(id, handle, dh, reply, maxDataSize, IoUtils.getLinkOptions(false));
+ BufferUtils.updateLengthPlaceholder(reply, lenPos, count);
+ if ((!dh.isSendDot()) && (!dh.isSendDotDot()) && (!dh.hasNext())) {
+ dh.markDone();
+ }
+
+ Boolean indicator =
+ SftpHelper.indicateEndOfNamesList(reply, getVersion(), session, dh.isDone());
+ if (debugEnabled) {
+ log.debug("doReadDir({})({})[{}] - seding {} entries - eol={}", session, handle, h, count, indicator);
+ }
+ } else {
+ // empty directory
+ dh.markDone();
+ sendStatus(BufferUtils.clear(buffer), id, SftpConstants.SSH_FX_EOF, "Empty directory");
+ return;
+ }
+
+ Objects.requireNonNull(reply, "No reply buffer created");
+ } catch (IOException | RuntimeException e) {
+ sendStatus(BufferUtils.clear(buffer), id, e, SftpConstants.SSH_FXP_READDIR, handle);
+ return;
+ }
+
+ send(reply);
+ }
+
+ @Override
+ protected String doOpenDir(int id, String path, Path p, LinkOption... options) throws IOException {
+ Boolean status = IoUtils.checkFileExists(p, options);
+ if (status == null) {
+ throw new AccessDeniedException(p.toString(), p.toString(), "Cannot determine open-dir existence");
+ }
+
+ if (!status) {
+ throw new NoSuchFileException(path, path, "Referenced target directory N/A");
+ } else if (!Files.isDirectory(p, options)) {
+ throw new NotDirectoryException(path);
+ } else if (!Files.isReadable(p)) {
+ throw new AccessDeniedException(p.toString(), p.toString(), "Not readable");
+ } else {
+ String handle = generateFileHandle(p);
+ DirectoryHandle dirHandle = new DirectoryHandle(this, p, handle);
+ handles.put(handle, dirHandle);
+ return handle;
+ }
+ }
+
+ @Override
+ protected void doFSetStat(int id, String handle, Map<String, ?> attrs) throws IOException {
+ Handle h = handles.get(handle);
+ if (log.isDebugEnabled()) {
+ log.debug("doFsetStat({})[id={}] SSH_FXP_FSETSTAT (handle={}[{}], attrs={})",
+ getServerSession(), id, handle, h, attrs);
+ }
+
+ doSetAttributes(validateHandle(handle, h, Handle.class).getFile(), attrs);
+ }
+
+ @Override
+ protected Map<String, Object> doFStat(int id, String handle, int flags) throws IOException {
+ Handle h = handles.get(handle);
+ if (log.isDebugEnabled()) {
+ log.debug("doFStat({})[id={}] SSH_FXP_FSTAT (handle={}[{}], flags=0x{})",
+ getServerSession(), id, handle, h, Integer.toHexString(flags));
+ }
+
+ Handle fileHandle = validateHandle(handle, h, Handle.class);
+ return resolveFileAttributes(fileHandle.getFile(), flags, IoUtils.getLinkOptions(true));
+ }
+
+ @Override
+ protected void doWrite(int id, String handle, long offset, int length, byte[] data, int doff, int remaining) throws IOException {
+ Handle h = handles.get(handle);
+ if (log.isTraceEnabled()) {
+ log.trace("doWrite({})[id={}] SSH_FXP_WRITE (handle={}[{}], offset={}, data=byte[{}])",
+ getServerSession(), id, handle, h, offset, length);
+ }
+
+ FileHandle fh = validateHandle(handle, h, FileHandle.class);
+ if (length < 0) {
+ throw new IllegalStateException("Bad length (" + length + ") for writing to " + fh);
+ }
+
+ if (remaining < length) {
+ throw new IllegalStateException("Not enough buffer data for writing to " + fh + ": required=" + length + ", available=" + remaining);
+ }
+
+ SftpEventListener listener = getSftpEventListenerProxy();
+ listener.writing(getServerSession(), handle, fh, offset, data, doff, length);
+ try {
+ if (fh.isOpenAppend()) {
+ fh.append(data, doff, length);
+ } else {
+ fh.write(data, doff, length, offset);
+ }
+ } catch (IOException | RuntimeException e) {
+ listener.written(getServerSession(), handle, fh, offset, data, doff, length, e);
+ throw e;
+ }
+ listener.written(getServerSession(), handle, fh, offset, data, doff, length, null);
+ }
+
+ @Override
+ protected int doRead(int id, String handle, long offset, int length, byte[] data, int doff) throws IOException {
+ Handle h = handles.get(handle);
+ if (log.isTraceEnabled()) {
+ log.trace("doRead({})[id={}] SSH_FXP_READ (handle={}[{}], offset={}, length={})",
+ getServerSession(), id, handle, h, offset, length);
+ }
+
+ ValidateUtils.checkTrue(length > 0L, "Invalid read length: %d", length);
+ FileHandle fh = validateHandle(handle, h, FileHandle.class);
+ SftpEventListener listener = getSftpEventListenerProxy();
+ ServerSession serverSession = getServerSession();
+ int readLen;
+ listener.reading(serverSession, handle, fh, offset, data, doff, length);
+ try {
+ readLen = fh.read(data, doff, length, offset);
+ } catch (IOException | RuntimeException e) {
+ listener.read(serverSession, handle, fh, offset, data, doff, length, -1, e);
+ throw e;
+ }
+ listener.read(serverSession, handle, fh, offset, data, doff, length, readLen, null);
+ return readLen;
+ }
+
+ @Override
+ protected void doClose(int id, String handle) throws IOException {
+ Handle h = handles.remove(handle);
+ if (log.isDebugEnabled()) {
+ log.debug("doClose({})[id={}] SSH_FXP_CLOSE (handle={}[{}])",
+ getServerSession(), id, handle, h);
+ }
+ validateHandle(handle, h, Handle.class).close();
+
+ SftpEventListener listener = getSftpEventListenerProxy();
+ listener.close(getServerSession(), handle, h);
+ }
+
+ @Override
+ protected String doOpen(int id, String path, int pflags, int access, Map<String, Object> attrs) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("doOpen({})[id={}] SSH_FXP_OPEN (path={}, access=0x{}, pflags=0x{}, attrs={})",
+ getServerSession(), id, path, Integer.toHexString(access), Integer.toHexString(pflags), attrs);
+ }
+ int curHandleCount = handles.size();
+ int maxHandleCount = getServerSession().getIntProperty(MAX_OPEN_HANDLES_PER_SESSION, DEFAULT_MAX_OPEN_HANDLES);
+ if (curHandleCount > maxHandleCount) {
+ throw new IllegalStateException("Too many open handles: current=" + curHandleCount + ", max.=" + maxHandleCount);
+ }
+
+ Path file = resolveFile(path);
+ String handle = generateFileHandle(file);
+ FileHandle fileHandle = new FileHandle(this, file, handle, pflags, access, attrs);
+ handles.put(handle, fileHandle);
+ return handle;
+ }
+
+ // we stringify our handles and treat them as such on decoding as well as it is easier to use as a map key
+ protected String generateFileHandle(Path file) {
+ // use several rounds in case the file handle size is relatively small so we might get conflicts
+ for (int index = 0; index < maxFileHandleRounds; index++) {
+ randomizer.fill(workBuf, 0, fileHandleSize);
+ String handle = BufferUtils.toHex(workBuf, 0, fileHandleSize, BufferUtils.EMPTY_HEX_SEPARATOR);
+ if (handles.containsKey(handle)) {
+ if (log.isTraceEnabled()) {
+ log.trace("generateFileHandle({})[{}] handle={} in use at round {}",
+ getServerSession(), file, handle, index);
+ }
+ continue;
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("generateFileHandle({})[{}] {}", getServerSession(), file, handle);
+ }
+ return handle;
+ }
+
+ throw new IllegalStateException("Failed to generate a unique file handle for " + file);
+ }
+
+ protected void doInit(Buffer buffer, int id) throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("doInit({})[id={}] SSH_FXP_INIT (version={})", getServerSession(), id, id);
+ }
+
+ String all = checkVersionCompatibility(buffer, id, id, SftpConstants.SSH_FX_OP_UNSUPPORTED);
+ if (GenericUtils.isEmpty(all)) { // i.e. validation failed
+ return;
+ }
+
+ version = id;
+ while (buffer.available() > 0) {
+ String name = buffer.getString();
+ byte[] data = buffer.getBytes();
+ extensions.put(name, data);
+ }
+
+ buffer.clear();
+
+ buffer.putByte((byte) SftpConstants.SSH_FXP_VERSION);
+ buffer.putInt(version);
+ appendExtensions(buffer, all);
+
+ SftpEventListener listener = getSftpEventListenerProxy();
+ listener.initialized(getServerSession(), version);
+
+ send(buffer);
+ }
+
+ @Override
+ protected void send(Buffer buffer) throws IOException {
+ int len = buffer.available();
+ BufferUtils.writeInt(out, len, workBuf, 0, workBuf.length);
+ out.write(buffer.array(), buffer.rpos(), len);
+ out.flush();
+ }
+
+ @Override
+ public void destroy() {
+ if (closed.getAndSet(true)) {
+ return; // ignore if already closed
+ }
+
+ ServerSession session = getServerSession();
+ boolean debugEnabled = log.isDebugEnabled();
+ if (debugEnabled) {
+ log.debug("destroy({}) - mark as closed", session);
+ }
+
+ try {
+ SftpEventListener listener = getSftpEventListenerProxy();
+ listener.destroying(session);
+ } catch (Exception e) {
+ log.warn("destroy({}) Failed ({}) to announce destruction event: {}",
+ session, e.getClass().getSimpleName(), e.getMessage());
+ if (debugEnabled) {
+ log.debug("destroy(" + session + ") destruction announcement failure details", e);
+ }
+ }
+
+ // if thread has not completed, cancel it
+ if ((pendingFuture != null) && (!pendingFuture.isDone())) {
+ boolean result = pendingFuture.cancel(true);
+ // TODO consider waiting some reasonable (?) amount of time for cancellation
+ if (debugEnabled) {
+ log.debug("destroy(" + session + ") - cancel pending future=" + result);
+ }
+ }
+
+ pendingFuture = null;
+
+ ExecutorService executors = getExecutorService();
+ if ((executors != null) && (!executors.isShutdown()) && isShutdownOnExit()) {
+ Collection<Runnable> runners = executors.shutdownNow();
+ if (debugEnabled) {
+ log.debug("destroy(" + session + ") - shutdown executor service - runners count=" + runners.size());
+ }
+ }
+ this.executorService = null;
+
+ try {
+ fileSystem.close();
+ } catch (UnsupportedOperationException e) {
+ if (debugEnabled) {
+ log.debug("destroy(" + session + ") closing the file system is not supported");
+ }
+ } catch (IOException e) {
+ if (debugEnabled) {
+ log.debug("destroy(" + session + ")"
+ + " failed (" + e.getClass().getSimpleName() + ")"
+ + " to close file system: " + e.getMessage(), e);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemEnvironment.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemEnvironment.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemEnvironment.java
new file mode 100644
index 0000000..493a450
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemEnvironment.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.server.subsystem.sftp;
+
+import java.nio.file.Path;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.server.session.ServerSessionHolder;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface SftpSubsystemEnvironment extends ServerSessionHolder {
+ /**
+ * Force the use of a given sftp version
+ */
+ String SFTP_VERSION = "sftp-version";
+
+ int LOWER_SFTP_IMPL = SftpConstants.SFTP_V3; // Working implementation from v3
+
+ int HIGHER_SFTP_IMPL = SftpConstants.SFTP_V6; // .. up to and including
+
+ String ALL_SFTP_IMPL = IntStream.rangeClosed(LOWER_SFTP_IMPL, HIGHER_SFTP_IMPL)
+ .mapToObj(Integer::toString)
+ .collect(Collectors.joining(","));
+
+ /**
+ * @return The negotiated version
+ */
+ int getVersion();
+
+ /**
+ * @return The {@link SftpFileSystemAccessor} used to access effective
+ * server-side paths
+ */
+ SftpFileSystemAccessor getFileSystemAccessor();
+
+ /**
+ * @return The selected behavior in case some unsupported attributes are requested
+ */
+ UnsupportedAttributePolicy getUnsupportedAttributePolicy();
+
+ /**
+ * @return The default root directory used to resolve relative paths
+ * - a.k.a. the {@code chroot} location
+ */
+ Path getDefaultDirectory();
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemFactory.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemFactory.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemFactory.java
new file mode 100644
index 0000000..4e4aa77
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystemFactory.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.server.subsystem.sftp;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.ObjectBuilder;
+import org.apache.sshd.common.util.threads.ExecutorServiceConfigurer;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.subsystem.SubsystemFactory;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class SftpSubsystemFactory
+ extends AbstractSftpEventListenerManager
+ implements SubsystemFactory, ExecutorServiceConfigurer, SftpEventListenerManager, SftpFileSystemAccessorManager {
+ public static final String NAME = SftpConstants.SFTP_SUBSYSTEM_NAME;
+ public static final UnsupportedAttributePolicy DEFAULT_POLICY = UnsupportedAttributePolicy.Warn;
+
+ public static class Builder extends AbstractSftpEventListenerManager implements ObjectBuilder<SftpSubsystemFactory> {
+ private ExecutorService executors;
+ private boolean shutdownExecutor;
+ private UnsupportedAttributePolicy policy = DEFAULT_POLICY;
+ private SftpFileSystemAccessor fileSystemAccessor = SftpFileSystemAccessor.DEFAULT;
+ private SftpErrorStatusDataHandler errorStatusDataHandler = SftpErrorStatusDataHandler.DEFAULT;
+
+ public Builder() {
+ super();
+ }
+
+ public Builder withExecutorService(ExecutorService service) {
+ executors = service;
+ return this;
+ }
+
+ public Builder withShutdownOnExit(boolean shutdown) {
+ shutdownExecutor = shutdown;
+ return this;
+ }
+
+ public Builder withUnsupportedAttributePolicy(UnsupportedAttributePolicy p) {
+ policy = Objects.requireNonNull(p, "No policy");
+ return this;
+ }
+
+ public Builder withFileSystemAccessor(SftpFileSystemAccessor accessor) {
+ fileSystemAccessor = Objects.requireNonNull(accessor, "No accessor");
+ return this;
+ }
+
+ public Builder withSftpErrorStatusDataHandler(SftpErrorStatusDataHandler handler) {
+ errorStatusDataHandler = Objects.requireNonNull(handler, "No error status handler");
+ return this;
+ }
+
+ @Override
+ public SftpSubsystemFactory build() {
+ SftpSubsystemFactory factory = new SftpSubsystemFactory();
+ factory.setExecutorService(executors);
+ factory.setShutdownOnExit(shutdownExecutor);
+ factory.setUnsupportedAttributePolicy(policy);
+ factory.setFileSystemAccessor(fileSystemAccessor);
+ factory.setErrorStatusDataHandler(errorStatusDataHandler);
+ GenericUtils.forEach(getRegisteredListeners(), factory::addSftpEventListener);
+ return factory;
+ }
+ }
+
+ private ExecutorService executors;
+ private boolean shutdownExecutor;
+ private UnsupportedAttributePolicy policy = DEFAULT_POLICY;
+ private SftpFileSystemAccessor fileSystemAccessor = SftpFileSystemAccessor.DEFAULT;
+ private SftpErrorStatusDataHandler errorStatusDataHandler = SftpErrorStatusDataHandler.DEFAULT;
+
+ public SftpSubsystemFactory() {
+ super();
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public ExecutorService getExecutorService() {
+ return executors;
+ }
+
+ /**
+ * @param service The {@link ExecutorService} to be used by the {@link SftpSubsystem}
+ * command when starting execution. If {@code null} then a single-threaded ad-hoc service is used.
+ */
+ @Override
+ public void setExecutorService(ExecutorService service) {
+ executors = service;
+ }
+
+ @Override
+ public boolean isShutdownOnExit() {
+ return shutdownExecutor;
+ }
+
+ /**
+ * @param shutdownOnExit If {@code true} the {@link ExecutorService#shutdownNow()}
+ * will be called when subsystem terminates - unless it is the ad-hoc service, which
+ * will be shutdown regardless
+ */
+ @Override
+ public void setShutdownOnExit(boolean shutdownOnExit) {
+ shutdownExecutor = shutdownOnExit;
+ }
+
+ public UnsupportedAttributePolicy getUnsupportedAttributePolicy() {
+ return policy;
+ }
+
+ /**
+ * @param p The {@link UnsupportedAttributePolicy} to use if failed to access
+ * some local file attributes - never {@code null}
+ */
+ public void setUnsupportedAttributePolicy(UnsupportedAttributePolicy p) {
+ policy = Objects.requireNonNull(p, "No policy");
+ }
+
+ @Override
+ public SftpFileSystemAccessor getFileSystemAccessor() {
+ return fileSystemAccessor;
+ }
+
+ @Override
+ public void setFileSystemAccessor(SftpFileSystemAccessor accessor) {
+ fileSystemAccessor = Objects.requireNonNull(accessor, "No accessor");
+ }
+
+ public SftpErrorStatusDataHandler getErrorStatusDataHandler() {
+ return errorStatusDataHandler;
+ }
+
+ public void setErrorStatusDataHandler(SftpErrorStatusDataHandler handler) {
+ errorStatusDataHandler = Objects.requireNonNull(handler, "No error status data handler provided");
+ }
+
+ @Override
+ public Command create() {
+ SftpSubsystem subsystem =
+ new SftpSubsystem(getExecutorService(), isShutdownOnExit(),
+ getUnsupportedAttributePolicy(), getFileSystemAccessor(),
+ getErrorStatusDataHandler());
+ GenericUtils.forEach(getRegisteredListeners(), subsystem::addSftpEventListener);
+ return subsystem;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/TreeLockExecutor.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/TreeLockExecutor.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/TreeLockExecutor.java
new file mode 100644
index 0000000..b0ed061
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/TreeLockExecutor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.server.subsystem.sftp;
+
+import java.io.Closeable;
+import java.nio.file.Path;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+public class TreeLockExecutor implements Closeable {
+
+ private static final Runnable CLOSE = () -> { };
+
+ private final ExecutorService executor;
+ private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+ private final Future<?> future;
+ private final Function<String, Path> resolver;
+
+ public TreeLockExecutor(ExecutorService executor, Function<String, Path> resolver) {
+ this.executor = executor;
+ this.resolver = resolver;
+ this.future = executor.submit(this::run);
+ }
+
+ public void submit(Runnable work, String... paths) {
+ queue.add(work);
+ }
+
+ protected void run() {
+ while (true) {
+ try {
+ Runnable work = queue.take();
+ if (work == CLOSE) {
+ break;
+ }
+ work.run();
+ } catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ queue.clear();
+ queue.add(CLOSE);
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ // Ignore
+ }
+ future.cancel(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnixDateFormat.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnixDateFormat.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnixDateFormat.java
new file mode 100644
index 0000000..3ce474a
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnixDateFormat.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.server.subsystem.sftp;
+
+import java.nio.file.attribute.FileTime;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public final class UnixDateFormat {
+
+ /**
+ * A {@link List} of <U>short</U> months names where Jan=0, Feb=1, etc.
+ */
+ public static final List<String> MONTHS =
+ Collections.unmodifiableList(Arrays.asList(
+ "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"
+ ));
+
+ /**
+ * Six months duration in msec.
+ */
+ public static final long SIX_MONTHS = 183L * 24L * 60L * 60L * 1000L;
+
+ private UnixDateFormat() {
+ throw new UnsupportedOperationException("No instance allowed");
+ }
+
+ /**
+ * Get unix style date string.
+ *
+ * @param time The {@link FileTime} to format - ignored if {@code null}
+ * @return The formatted date string
+ * @see #getUnixDate(long)
+ */
+ public static String getUnixDate(FileTime time) {
+ return getUnixDate((time != null) ? time.toMillis() : -1L);
+ }
+
+ public static String getUnixDate(long millis) {
+ if (millis < 0L) {
+ return "------------";
+ }
+
+ StringBuilder sb = new StringBuilder(16);
+ Calendar cal = new GregorianCalendar();
+ cal.setTimeInMillis(millis);
+
+ // month
+ sb.append(MONTHS.get(cal.get(Calendar.MONTH)));
+ sb.append(' ');
+
+ // day
+ int day = cal.get(Calendar.DATE);
+ if (day < 10) {
+ sb.append(' ');
+ }
+ sb.append(day);
+ sb.append(' ');
+
+ long nowTime = System.currentTimeMillis();
+ if (Math.abs(nowTime - millis) > SIX_MONTHS) {
+
+ // year
+ int year = cal.get(Calendar.YEAR);
+ sb.append(' ');
+ sb.append(year);
+ } else {
+ // hour
+ int hh = cal.get(Calendar.HOUR_OF_DAY);
+ if (hh < 10) {
+ sb.append('0');
+ }
+ sb.append(hh);
+ sb.append(':');
+
+ // minute
+ int mm = cal.get(Calendar.MINUTE);
+ if (mm < 10) {
+ sb.append('0');
+ }
+ sb.append(mm);
+ }
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnsupportedAttributePolicy.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnsupportedAttributePolicy.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnsupportedAttributePolicy.java
new file mode 100644
index 0000000..ca763e3
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/UnsupportedAttributePolicy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.server.subsystem.sftp;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public enum UnsupportedAttributePolicy {
+ Ignore,
+ Warn,
+ ThrowException;
+
+ public static final Set<UnsupportedAttributePolicy> VALUES =
+ Collections.unmodifiableSet(EnumSet.allOf(UnsupportedAttributePolicy.class));
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/test/java/org/apache/sshd/client/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/test/java/org/apache/sshd/client/ClientTest.java b/sshd-sftp/src/test/java/org/apache/sshd/client/ClientTest.java
new file mode 100644
index 0000000..594c756
--- /dev/null
+++ b/sshd-sftp/src/test/java/org/apache/sshd/client/ClientTest.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.client.channel.ChannelExec;
+import org.apache.sshd.client.channel.ChannelShell;
+import org.apache.sshd.client.channel.ChannelSubsystem;
+import org.apache.sshd.client.channel.ClientChannel;
+import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.client.session.ClientSession;
+import org.apache.sshd.client.subsystem.SubsystemClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClient;
+import org.apache.sshd.client.subsystem.sftp.SftpClientFactory;
+import org.apache.sshd.common.Factory;
+import org.apache.sshd.common.NamedResource;
+import org.apache.sshd.common.PropertyResolverUtils;
+import org.apache.sshd.common.RuntimeSshException;
+import org.apache.sshd.common.Service;
+import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.ChannelListener;
+import org.apache.sshd.common.channel.ChannelListenerManager;
+import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.session.SessionListener;
+import org.apache.sshd.common.subsystem.sftp.SftpConstants;
+import org.apache.sshd.common.util.GenericUtils;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.net.SshdSocketAddress;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.server.channel.ChannelSession;
+import org.apache.sshd.server.channel.ChannelSessionFactory;
+import org.apache.sshd.server.forward.DirectTcpipFactory;
+import org.apache.sshd.server.session.ServerConnectionServiceFactory;
+import org.apache.sshd.server.session.ServerUserAuthService;
+import org.apache.sshd.server.session.ServerUserAuthServiceFactory;
+import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.apache.sshd.util.test.EchoShell;
+import org.apache.sshd.util.test.EchoShellFactory;
+import org.apache.sshd.util.test.TestChannelListener;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+/**
+ * TODO Add javadoc
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class ClientTest extends BaseTestSupport {
+ private SshServer sshd;
+ private SshClient client;
+ private int port;
+ private CountDownLatch authLatch;
+ private CountDownLatch channelLatch;
+
+ private final AtomicReference<ClientSession> clientSessionHolder = new AtomicReference<>(null);
+ @SuppressWarnings("synthetic-access")
+ private final SessionListener clientSessionListener = new SessionListener() {
+ @Override
+ public void sessionCreated(Session session) {
+ assertObjectInstanceOf("Non client session creation notification", ClientSession.class, session);
+ assertNull("Multiple creation notifications", clientSessionHolder.getAndSet((ClientSession) session));
+ }
+
+ @Override
+ public void sessionEvent(Session session, Event event) {
+ assertObjectInstanceOf("Non client session event notification: " + event, ClientSession.class, session);
+ assertSame("Mismatched client session event instance: " + event, clientSessionHolder.get(), session);
+ }
+
+ @Override
+ public void sessionException(Session session, Throwable t) {
+ assertObjectInstanceOf("Non client session exception notification", ClientSession.class, session);
+ assertNotNull("No session exception data", t);
+ }
+
+ @Override
+ public void sessionClosed(Session session) {
+ assertObjectInstanceOf("Non client session closure notification", ClientSession.class, session);
+ assertSame("Mismatched client session closure instance", clientSessionHolder.getAndSet(null), session);
+ }
+ };
+
+ public ClientTest() {
+ super();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ authLatch = new CountDownLatch(0);
+ channelLatch = new CountDownLatch(0);
+
+ sshd = setupTestServer();
+ sshd.setShellFactory(new TestEchoShellFactory());
+ sshd.setServiceFactories(Arrays.asList(
+ new ServerUserAuthServiceFactory() {
+ @Override
+ public Service create(Session session) throws IOException {
+ return new ServerUserAuthService(session) {
+ @SuppressWarnings("synthetic-access")
+ @Override
+ public void process(int cmd, Buffer buffer) throws Exception {
+ authLatch.await();
+ super.process(cmd, buffer);
+ }
+ };
+ }
+ },
+ ServerConnectionServiceFactory.INSTANCE
+ ));
+ sshd.setChannelFactories(Arrays.asList(
+ new ChannelSessionFactory() {
+ @Override
+ public Channel create() {
+ return new ChannelSession() {
+ @SuppressWarnings("synthetic-access")
+ @Override
+ public OpenFuture open(int recipient, long rwsize, long rmpsize, Buffer buffer) {
+ try {
+ channelLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeSshException(e);
+ }
+ return super.open(recipient, rwsize, rmpsize, buffer);
+ }
+
+ @Override
+ public String toString() {
+ return "ChannelSession" + "[id=" + getId() + ", recipient=" + getRecipient() + "]";
+ }
+ };
+ }
+ },
+ DirectTcpipFactory.INSTANCE));
+ sshd.start();
+ port = sshd.getPort();
+
+ client = setupTestClient();
+ clientSessionHolder.set(null); // just making sure
+ client.addSessionListener(clientSessionListener);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (sshd != null) {
+ sshd.stop(true);
+ }
+ if (client != null) {
+ client.stop();
+ }
+ clientSessionHolder.set(null); // just making sure
+ }
+
+ @Test
+ public void testSimpleClientListener() throws Exception {
+ AtomicReference<Channel> channelHolder = new AtomicReference<>(null);
+ client.addChannelListener(new ChannelListener() {
+ @Override
+ public void channelOpenSuccess(Channel channel) {
+ assertSame("Mismatched opened channel instances", channel, channelHolder.get());
+ }
+
+ @Override
+ public void channelOpenFailure(Channel channel, Throwable reason) {
+ assertSame("Mismatched failed open channel instances", channel, channelHolder.get());
+ }
+
+ @Override
+ public void channelInitialized(Channel channel) {
+ assertNull("Multiple channel initialization notifications", channelHolder.getAndSet(channel));
+ }
+
+ @Override
+ public void channelStateChanged(Channel channel, String hint) {
+ outputDebugMessage("channelStateChanged(%s): %s", channel, hint);
+ }
+
+ @Override
+ public void channelClosed(Channel channel, Throwable reason) {
+ assertSame("Mismatched closed channel instances", channel, channelHolder.getAndSet(null));
+ }
+ });
+ sshd.setSubsystemFactories(Collections.singletonList(new SftpSubsystemFactory()));
+
+ client.start();
+
+ try (ClientSession session = createTestClientSession()) {
+ testClientListener(channelHolder, ChannelShell.class, () -> {
+ try {
+ return session.createShellChannel();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ testClientListener(channelHolder, ChannelExec.class, () -> {
+ try {
+ return session.createExecChannel(getCurrentTestName());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ testClientListener(channelHolder, SftpClient.class, () -> {
+ try {
+ return SftpClientFactory.instance().createSftpClient(session);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } finally {
+ client.stop();
+ }
+ }
+
+ private <C extends Closeable> void testClientListener(AtomicReference<Channel> channelHolder, Class<C> channelType, Factory<? extends C> factory) throws Exception {
+ assertNull(channelType.getSimpleName() + ": Unexpected currently active channel", channelHolder.get());
+
+ try (C instance = factory.create()) {
+ Channel expectedChannel;
+ if (instance instanceof Channel) {
+ expectedChannel = (Channel) instance;
+ } else if (instance instanceof SubsystemClient) {
+ expectedChannel = ((SubsystemClient) instance).getClientChannel();
+ } else {
+ throw new UnsupportedOperationException("Unknown test instance type" + instance.getClass().getSimpleName());
+ }
+
+ Channel actualChannel = channelHolder.get();
+ assertSame("Mismatched listener " + channelType.getSimpleName() + " instances", expectedChannel, actualChannel);
+ }
+
+ assertNull(channelType.getSimpleName() + ": Active channel closure not signalled", channelHolder.get());
+ }
+
+ @Test
+ public void testCreateChannelByType() throws Exception {
+ client.start();
+
+ Collection<ClientChannel> channels = new LinkedList<>();
+ try (ClientSession session = createTestClientSession()) {
+ // required since we do not use an SFTP subsystem
+ PropertyResolverUtils.updateProperty(session, ChannelSubsystem.REQUEST_SUBSYSTEM_REPLY, false);
+ channels.add(session.createChannel(Channel.CHANNEL_SUBSYSTEM, SftpConstants.SFTP_SUBSYSTEM_NAME));
+ channels.add(session.createChannel(Channel.CHANNEL_EXEC, getCurrentTestName()));
+ channels.add(session.createChannel(Channel.CHANNEL_SHELL, getClass().getSimpleName()));
+
+ Set<Integer> ids = new HashSet<>(channels.size());
+ for (ClientChannel c : channels) {
+ int id = c.getId();
+ assertTrue("Channel ID repeated: " + id, ids.add(id));
+ }
+ } finally {
+ for (Closeable c : channels) {
+ try {
+ c.close();
+ } catch (IOException e) {
+ // ignored
+ }
+ }
+ client.stop();
+ }
+
+ assertNull("Session closure not signalled", clientSessionHolder.get());
+ }
+
+ /**
+ * Makes sure that the {@link ChannelListener}s added to the client, session
+ * and channel are <U>cumulative</U> - i.e., all of them invoked
+ * @throws Exception If failed
+ */
+ @Test
+ public void testChannelListenersPropagation() throws Exception {
+ Map<String, TestChannelListener> clientListeners = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ addChannelListener(clientListeners, client, new TestChannelListener(client.getClass().getSimpleName()));
+
+ // required since we do not use an SFTP subsystem
+ PropertyResolverUtils.updateProperty(client, ChannelSubsystem.REQUEST_SUBSYSTEM_REPLY, false);
+ client.start();
+ try (ClientSession session = createTestClientSession()) {
+ addChannelListener(clientListeners, session, new TestChannelListener(session.getClass().getSimpleName()));
+ assertListenerSizes("ClientSessionOpen", clientListeners, 0, 0);
+
+ try (ClientChannel channel = session.createSubsystemChannel(SftpConstants.SFTP_SUBSYSTEM_NAME)) {
+ channel.open().verify(5L, TimeUnit.SECONDS);
+
+ TestChannelListener channelListener = new TestChannelListener(channel.getClass().getSimpleName());
+ // need to emulate them since we are adding the listener AFTER the channel is open
+ channelListener.channelInitialized(channel);
+ channelListener.channelOpenSuccess(channel);
+ channel.addChannelListener(channelListener);
+ assertListenerSizes("ClientChannelOpen", clientListeners, 1, 1);
+ }
+
+ assertListenerSizes("ClientChannelClose", clientListeners, 0, 1);
+ } finally {
+ client.stop();
+ }
+
+ assertListenerSizes("ClientStop", clientListeners, 0, 1);
+ }
+
+ private static void assertListenerSizes(String phase, Map<String, ? extends TestChannelListener> listeners, int activeSize, int openSize) {
+ assertListenerSizes(phase, listeners.values(), activeSize, openSize);
+ }
+
+ private static void assertListenerSizes(String phase, Collection<? extends TestChannelListener> listeners, int activeSize, int openSize) {
+ if (GenericUtils.isEmpty(listeners)) {
+ return;
+ }
+
+ for (TestChannelListener l : listeners) {
+ if (activeSize >= 0) {
+ assertEquals(phase + ": mismatched active channels size for " + l.getName() + " listener", activeSize, GenericUtils.size(l.getActiveChannels()));
+ }
+
+ if (openSize >= 0) {
+ assertEquals(phase + ": mismatched open channels size for " + l.getName() + " listener", openSize, GenericUtils.size(l.getOpenChannels()));
+ }
+
+ assertEquals(phase + ": unexpected failed channels size for " + l.getName() + " listener", 0, GenericUtils.size(l.getFailedChannels()));
+ }
+ }
+
+ private static <L extends ChannelListener & NamedResource> void addChannelListener(Map<String, L> listeners, ChannelListenerManager manager, L listener) {
+ String name = listener.getName();
+ assertNull("Duplicate listener named " + name, listeners.put(name, listener));
+ manager.addChannelListener(listener);
+ }
+
+ private ClientSession createTestClientSession() throws IOException {
+ ClientSession session = createTestClientSession(TEST_LOCALHOST);
+ try {
+ InetSocketAddress addr = SshdSocketAddress.toInetSocketAddress(session.getConnectAddress());
+ assertEquals("Mismatched connect host", TEST_LOCALHOST, addr.getHostString());
+
+ ClientSession returnValue = session;
+ session = null; // avoid 'finally' close
+ return returnValue;
+ } finally {
+ if (session != null) {
+ session.close();
+ }
+ }
+ }
+
+ private ClientSession createTestClientSession(String host) throws IOException {
+ ClientSession session = client.connect(getCurrentTestName(), host, port).verify(7L, TimeUnit.SECONDS).getSession();
+ try {
+ assertNotNull("Client session creation not signalled", clientSessionHolder.get());
+ session.addPasswordIdentity(getCurrentTestName());
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ InetSocketAddress addr = SshdSocketAddress.toInetSocketAddress(session.getConnectAddress());
+ assertNotNull("No reported connect address", addr);
+ assertEquals("Mismatched connect port", port, addr.getPort());
+
+ ClientSession returnValue = session;
+ session = null; // avoid 'finally' close
+ return returnValue;
+ } finally {
+ if (session != null) {
+ session.close();
+ }
+ }
+ }
+
+ public static class TestEchoShellFactory extends EchoShellFactory {
+ @Override
+ public Command create() {
+ return new TestEchoShell();
+ }
+ }
+
+ public static class TestEchoShell extends EchoShell {
+ // CHECKSTYLE:OFF
+ public static CountDownLatch latch;
+ // CHECKSTYLE:ON
+
+ public TestEchoShell() {
+ super();
+ }
+
+ @Override
+ public void destroy() {
+ if (latch != null) {
+ latch.countDown();
+ }
+ super.destroy();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/251db9b9/sshd-sftp/src/test/java/org/apache/sshd/client/simple/BaseSimpleClientTestSupport.java
----------------------------------------------------------------------
diff --git a/sshd-sftp/src/test/java/org/apache/sshd/client/simple/BaseSimpleClientTestSupport.java b/sshd-sftp/src/test/java/org/apache/sshd/client/simple/BaseSimpleClientTestSupport.java
new file mode 100644
index 0000000..60b9403
--- /dev/null
+++ b/sshd-sftp/src/test/java/org/apache/sshd/client/simple/BaseSimpleClientTestSupport.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.client.simple;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sshd.client.SshClient;
+import org.apache.sshd.server.SshServer;
+import org.apache.sshd.util.test.BaseTestSupport;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public abstract class BaseSimpleClientTestSupport extends BaseTestSupport {
+ public static final long CONNECT_TIMEOUT = TimeUnit.SECONDS.toMillis(5L);
+ public static final long AUTH_TIMEOUT = TimeUnit.SECONDS.toMillis(7L);
+
+ protected SshServer sshd;
+ protected SshClient client;
+ protected int port;
+ protected SimpleClient simple;
+
+ protected BaseSimpleClientTestSupport() {
+ super();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ sshd = setupTestServer();
+ sshd.start();
+ port = sshd.getPort();
+ client = setupTestClient();
+
+ simple = SshClient.wrapAsSimpleClient(client);
+ simple.setConnectTimeout(CONNECT_TIMEOUT);
+ simple.setAuthenticationTimeout(AUTH_TIMEOUT);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (sshd != null) {
+ sshd.stop(true);
+ }
+ if (simple != null) {
+ simple.close();
+ }
+ if (client != null) {
+ client.stop();
+ }
+ }
+}