You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2020/04/28 14:07:30 UTC
[mina-sshd] 01/02: [SSHD-979] Allow more flexible extension of
improved SFTP API implementations
This is an automated email from the ASF dual-hosted git repository.
lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit 15d6c73538e0477d4a57783b4ff5aebdbad405c0
Author: Lyor Goldstein <lg...@apache.org>
AuthorDate: Thu Apr 23 21:10:27 2020 +0300
[SSHD-979] Allow more flexible extension of improved SFTP API implementations
---
.../sshd/client/subsystem/sftp/RawSftpClient.java | 1 -
.../subsystem/sftp/impl/AbstractSftpClient.java | 34 +----
.../subsystem/sftp/impl/DefaultSftpClient.java | 163 +++++++++++++--------
.../client/subsystem/sftp/impl/SftpAckData.java | 45 ++++++
.../subsystem/sftp/impl/SftpInputStreamAsync.java | 78 +++++-----
.../subsystem/sftp/impl/SftpOutputStreamAsync.java | 46 +++---
.../subsystem/sftp/impl/SftpRemotePathChannel.java | 24 +--
.../sshd/server/subsystem/sftp/FileHandle.java | 1 +
8 files changed, 214 insertions(+), 178 deletions(-)
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java
index 560ce55..0f1afd7 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/RawSftpClient.java
@@ -49,5 +49,4 @@ public interface RawSftpClient {
* @throws IOException If connection closed or interrupted
*/
Buffer receive(int id, long timeout) throws IOException;
-
}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
index 211f3ee..338f666 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/AbstractSftpClient.java
@@ -58,6 +58,8 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public abstract class AbstractSftpClient extends AbstractSubsystemClient implements SftpClient, RawSftpClient {
+ public static final int INIT_COMMAND_SIZE = Byte.BYTES /* command */ + Integer.BYTES /* version */;
+
/**
* Property used to avoid large buffers when {@link #write(Handle, long, byte[], int, int)} is invoked with a large
* buffer size.
@@ -158,7 +160,7 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
/**
* Sends the specified command, waits for the response and then invokes {@link #checkResponseStatus(int, Buffer)}
- *
+ *
* @param cmd The command to send
* @param request The request {@link Buffer}
* @throws IOException If failed to send, receive or check the returned status
@@ -872,7 +874,7 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
@Override
public void write(Handle handle, long fileOffset, byte[] src, int srcOffset, int len) throws IOException {
// do some bounds checking first
- if ((fileOffset < 0) || (srcOffset < 0) || (len < 0)) {
+ if ((fileOffset < 0L) || (srcOffset < 0) || (len < 0)) {
throw new IllegalArgumentException(
"write(" + handle + ") please ensure all parameters "
+ " are non-negative values: file-offset=" + fileOffset
@@ -1281,13 +1283,6 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
checkCommandStatus(SftpConstants.SSH_FXP_UNBLOCK, buffer);
}
- /**
- * @param path The remote directory path
- * @return An {@link Iterable} that can be used to iterate over all the directory entries (unlike
- * {@link #readDir(Handle)})
- * @throws IOException If failed to access the remote site
- * @see #readDir(Handle)
- */
@Override
public Iterable<DirEntry> readDir(String path) throws IOException {
if (!isOpen()) {
@@ -1297,13 +1292,6 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
return new SftpIterableDirEntry(this, path);
}
- /**
- * @param handle A directory {@link Handle}
- * @return An {@link Iterable} that can be used to iterate over all the directory entries (like
- * {@link #readDir(String)}). <B>Note:</B> the iterable instance is not re-usable - i.e., files
- * can be iterated only <U>once</U>
- * @throws IOException If failed to access the directory
- */
@Override
public Iterable<DirEntry> listDir(Handle handle) throws IOException {
if (!isOpen()) {
@@ -1313,20 +1301,8 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
return new StfpIterableDirHandle(this, handle);
}
- /**
- * Opens an {@link FileChannel} on the specified remote path
- *
- * @param path The remote path
- * @param modes The access mode(s) - if {@code null}/empty then the {@link #DEFAULT_CHANNEL_MODES} are used
- * @return The open {@link FileChannel} - <B>Note:</B> do not close this owner client instance until the
- * channel is no longer needed since it uses the client for providing the channel's
- * functionality.
- * @throws IOException If failed to open the channel
- * @see java.nio.channels.Channels#newInputStream(java.nio.channels.ReadableByteChannel)
- * @see java.nio.channels.Channels#newOutputStream(java.nio.channels.WritableByteChannel)
- */
@Override
- public SftpRemotePathChannel openRemoteFileChannel(String path, Collection<OpenMode> modes) throws IOException {
+ public FileChannel openRemoteFileChannel(String path, Collection<OpenMode> modes) throws IOException {
return new SftpRemotePathChannel(path, this, false, GenericUtils.isEmpty(modes) ? DEFAULT_CHANNEL_MODES : modes);
}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
index 2c13f5b..cf00605 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/DefaultSftpClient.java
@@ -50,6 +50,8 @@ import org.apache.sshd.common.SshException;
import org.apache.sshd.common.channel.Channel;
import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.session.ConnectionService;
import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.subsystem.sftp.SftpConstants;
@@ -79,7 +81,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
this.nameDecodingCharset = PropertyResolverUtils.getCharset(
clientSession, NAME_DECODING_CHARSET, DEFAULT_NAME_DECODING_CHARSET);
this.clientSession = Objects.requireNonNull(clientSession, "No client session");
- this.channel = new SftpChannelSubsystem();
+ this.channel = createSftpChannelSubsystem(clientSession);
clientSession.getService(ConnectionService.class).registerChannel(channel);
long initializationTimeout = clientSession.getLongProperty(
@@ -153,7 +155,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
/**
* Receive binary data
- *
+ *
* @param buf The buffer for the incoming data
* @param start Offset in buffer to place the data
* @param len Available space in buffer for the data
@@ -278,7 +280,10 @@ public class DefaultSftpClient extends AbstractSftpClient {
buf.putInt(id);
buf.putBuffer(buffer);
}
- channel.getAsyncIn().writePacket(buf).verify();
+
+ IoOutputStream asyncIn = channel.getAsyncIn();
+ IoWriteFuture writeFuture = asyncIn.writePacket(buf);
+ writeFuture.verify();
return id;
}
@@ -315,7 +320,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
if (buffer != null) {
return buffer;
}
- if (idleTimeout > 0) {
+ if (idleTimeout > 0L) {
try {
messages.wait(idleTimeout);
} catch (InterruptedException e) {
@@ -327,65 +332,37 @@ public class DefaultSftpClient extends AbstractSftpClient {
}
protected void init(long initializationTimeout) throws IOException {
- ValidateUtils.checkTrue(initializationTimeout > 0L, "Invalid initialization timeout: %d", initializationTimeout);
-
// Send init packet
- Buffer buf = new ByteArrayBuffer(9);
- buf.putInt(5);
+ Buffer buf = new ByteArrayBuffer(INIT_COMMAND_SIZE + SshConstants.SSH_PACKET_HEADER_LEN);
+ buf.putInt(INIT_COMMAND_SIZE);
buf.putByte((byte) SftpConstants.SSH_FXP_INIT);
buf.putInt(SftpConstants.SFTP_V6);
- channel.getAsyncIn().writePacket(buf).verify();
-
- Buffer buffer;
- Integer reqId;
- synchronized (messages) {
- /*
- * We need to use a timeout since if the remote server does not support SFTP, we will not know it
- * immediately. This is due to the fact that the request for the subsystem does not contain a reply as to
- * its success or failure. Thus, the SFTP channel is created by the client, but there is no one on the other
- * side to reply - thus the need for the timeout
- */
- for (long remainingTimeout = initializationTimeout;
- (remainingTimeout > 0L) && messages.isEmpty() && (!isClosing()) && isOpen();) {
- try {
- long sleepStart = System.nanoTime();
- messages.wait(remainingTimeout);
- long sleepEnd = System.nanoTime();
- long sleepDuration = sleepEnd - sleepStart;
- long sleepMillis = TimeUnit.NANOSECONDS.toMillis(sleepDuration);
- if (sleepMillis < 1L) {
- remainingTimeout--;
- } else {
- remainingTimeout -= sleepMillis;
- }
- } catch (InterruptedException e) {
- throw (IOException) new InterruptedIOException(
- "Interrupted init() while " + remainingTimeout + " msec. remaining").initCause(e);
- }
- }
- if (isClosing() || (!isOpen())) {
- throw new EOFException("Closing while await init message");
- }
-
- if (messages.isEmpty()) {
- throw new SocketTimeoutException(
- "No incoming initialization response received within " + initializationTimeout + " msec.");
- }
+ boolean traceEnabled = log.isTraceEnabled();
+ IoOutputStream asyncIn = channel.getAsyncIn();
+ ClientChannel clientChannel = getClientChannel();
+ if (traceEnabled) {
+ log.trace("init({}) send SSH_FXP_INIT", clientChannel);
+ }
+ IoWriteFuture writeFuture = asyncIn.writePacket(buf);
+ writeFuture.verify();
- Collection<Integer> ids = messages.keySet();
- Iterator<Integer> iter = ids.iterator();
- reqId = iter.next();
- buffer = messages.remove(reqId);
+ if (traceEnabled) {
+ log.trace("init({}) wait for SSH_FXP_INIT respose (timeout={})", clientChannel, initializationTimeout);
}
+ Buffer buffer = waitForInitResponse(initializationTimeout);
+ handleInitResponse(buffer);
+ }
+ protected void handleInitResponse(Buffer buffer) throws IOException {
+ boolean traceEnabled = log.isTraceEnabled();
+ ClientChannel clientChannel = getClientChannel();
int length = buffer.getInt();
int type = buffer.getUByte();
int id = buffer.getInt();
- boolean traceEnabled = log.isTraceEnabled();
if (traceEnabled) {
- log.trace("init({}) id={} type={} len={}",
- getClientChannel(), id, SftpConstants.getCommandMessageName(type), length);
+ log.trace("handleInitResponse({}) id={} type={} len={}",
+ clientChannel, id, SftpConstants.getCommandMessageName(type), length);
}
if (type == SftpConstants.SSH_FXP_VERSION) {
@@ -395,14 +372,14 @@ public class DefaultSftpClient extends AbstractSftpClient {
versionHolder.set(id);
if (traceEnabled) {
- log.trace("init({}) version={}", getClientChannel(), versionHolder);
+ log.trace("handleInitResponse({}) version={}", clientChannel, versionHolder);
}
while (buffer.available() > 0) {
String name = buffer.getString();
byte[] data = buffer.getBytes();
if (traceEnabled) {
- log.trace("init({}) added extension={}", getClientChannel(), name);
+ log.trace("handleInitResponse({}) added extension={}", clientChannel, name);
}
extensions.put(name, data);
}
@@ -411,8 +388,8 @@ public class DefaultSftpClient extends AbstractSftpClient {
String msg = buffer.getString();
String lang = buffer.getString();
if (traceEnabled) {
- log.trace("init({})[id={}] - status: {} [{}] {}",
- getClientChannel(), id, SftpConstants.getStatusName(substatus), lang, msg);
+ log.trace("handleInitResponse({})[id={}] - status: {} [{}] {}",
+ clientChannel, id, SftpConstants.getStatusName(substatus), lang, msg);
}
throwStatusException(SftpConstants.SSH_FXP_INIT, id, substatus, msg, lang);
@@ -426,6 +403,51 @@ public class DefaultSftpClient extends AbstractSftpClient {
}
}
+ protected Buffer waitForInitResponse(long initializationTimeout) throws IOException {
+ ValidateUtils.checkTrue(initializationTimeout > 0L, "Invalid initialization timeout: %d", initializationTimeout);
+
+ synchronized (messages) {
+ /*
+ * We need to use a timeout since if the remote server does not support SFTP, we will not know it
+ * immediately. This is due to the fact that the request for the subsystem does not contain a reply as to
+ * its success or failure. Thus, the SFTP channel is created by the client, but there is no one on the other
+ * side to reply - thus the need for the timeout
+ */
+ for (long remainingTimeout = initializationTimeout;
+ (remainingTimeout > 0L) && messages.isEmpty() && (!isClosing()) && isOpen();) {
+ try {
+ long sleepStart = System.nanoTime();
+ messages.wait(remainingTimeout);
+ long sleepEnd = System.nanoTime();
+ long sleepDuration = sleepEnd - sleepStart;
+ long sleepMillis = TimeUnit.NANOSECONDS.toMillis(sleepDuration);
+ if (sleepMillis < 1L) {
+ remainingTimeout--;
+ } else {
+ remainingTimeout -= sleepMillis;
+ }
+ } catch (InterruptedException e) {
+ throw (IOException) new InterruptedIOException(
+ "Interrupted init() while " + remainingTimeout + " msec. remaining").initCause(e);
+ }
+ }
+
+ if (isClosing() || (!isOpen())) {
+ throw new EOFException("Closing while await init message");
+ }
+
+ if (messages.isEmpty()) {
+ throw new SocketTimeoutException(
+ "No incoming initialization response received within " + initializationTimeout + " msec.");
+ }
+
+ Collection<Integer> ids = messages.keySet();
+ Iterator<Integer> iter = ids.iterator();
+ Integer reqId = iter.next();
+ return messages.remove(reqId);
+ }
+ }
+
/**
* @param selector The {@link SftpVersionSelector} to use - ignored if {@code null}
* @return The selected version (may be same as current)
@@ -485,9 +507,12 @@ public class DefaultSftpClient extends AbstractSftpClient {
return selected;
}
- private class SftpChannelSubsystem extends ChannelSubsystem {
+ protected ChannelSubsystem createSftpChannelSubsystem(ClientSession clientSession) {
+ return new SftpChannelSubsystem();
+ }
- SftpChannelSubsystem() {
+ protected class SftpChannelSubsystem extends ChannelSubsystem {
+ protected SftpChannelSubsystem() {
super(SftpConstants.SFTP_SUBSYSTEM_NAME);
}
@@ -506,14 +531,19 @@ public class DefaultSftpClient extends AbstractSftpClient {
addPendingRequest(Channel.CHANNEL_SUBSYSTEM, wantReply);
writePacket(buffer);
- asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) {
+ asyncIn = createAsyncInput(session);
+ setOut(createStdOutputStream(session));
+ setErr(createErrOutputStream(session));
+ }
+
+ protected ChannelAsyncOutputStream createAsyncInput(Session session) {
+ return new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) {
@SuppressWarnings("synthetic-access")
@Override
protected CloseFuture doCloseGracefully() {
try {
sendEof();
} catch (IOException e) {
- Session session = getSession();
session.exceptionCaught(e);
}
return super.doCloseGracefully();
@@ -521,7 +551,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
@Override
protected Buffer createSendBuffer(Buffer buffer, Channel channel, long length) {
- if (buffer.rpos() >= 9 && length == buffer.available()) {
+ if ((buffer.rpos() >= 9) && (length == buffer.available())) {
int rpos = buffer.rpos();
int wpos = buffer.wpos();
buffer.rpos(rpos - 9);
@@ -537,7 +567,10 @@ public class DefaultSftpClient extends AbstractSftpClient {
}
}
};
- out = new OutputStream() {
+ }
+
+ protected OutputStream createStdOutputStream(Session session) {
+ return new OutputStream() {
private final byte[] singleByte = new byte[1];
@Override
@@ -553,7 +586,11 @@ public class DefaultSftpClient extends AbstractSftpClient {
data(b, off, len);
}
};
- err = new ByteArrayOutputStream();
+ }
+
+ protected OutputStream createErrOutputStream(Session session) {
+ // TODO use some limit in case some data is constantly written to this stream
+ return new ByteArrayOutputStream();
}
}
}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpAckData.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpAckData.java
new file mode 100644
index 0000000..e21d48e
--- /dev/null
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpAckData.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.client.subsystem.sftp.impl;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class SftpAckData {
+ public final int id;
+ public final long offset;
+ public final int length;
+
+ public SftpAckData(int id, long offset, int length) {
+ this.id = id;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName()
+ + "[id=" + id
+ + ", offset=" + offset
+ + ", length=" + length
+ + "]";
+ }
+}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
index ec3e593..c1a73d2 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpInputStreamAsync.java
@@ -32,6 +32,9 @@ import org.apache.sshd.client.subsystem.sftp.SftpClient;
import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.channel.Channel;
+import org.apache.sshd.common.channel.Window;
+import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.subsystem.sftp.SftpConstants;
import org.apache.sshd.common.subsystem.sftp.SftpHelper;
import org.apache.sshd.common.util.buffer.Buffer;
@@ -39,30 +42,18 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.io.InputStreamWithChannel;
public class SftpInputStreamAsync extends InputStreamWithChannel {
-
- static class Ack {
- int id;
- long offset;
- int length;
-
- Ack(int id, long offset, int length) {
- this.id = id;
- this.offset = offset;
- this.length = length;
- }
- }
+ protected final byte[] bb = new byte[1];
+ protected final int bufferSize;
+ protected final long fileSize;
+ protected Buffer buffer;
+ protected CloseableHandle handle;
+ protected long requestOffset;
+ protected long clientOffset;
+ protected final Deque<SftpAckData> pendingReads = new LinkedList<>();
+ protected boolean eofIndicator;
private final AbstractSftpClient client;
private final String path;
- private final byte[] bb = new byte[1];
- private final int bufferSize;
- private final long fileSize;
- private Buffer buffer;
- private CloseableHandle handle;
- private long requestOffset;
- private long clientOffset;
- private final Deque<Ack> pendingReads = new LinkedList<>();
- private boolean eofIndicator;
public SftpInputStreamAsync(AbstractSftpClient client, int bufferSize,
String path, Collection<OpenMode> mode) throws IOException {
@@ -156,8 +147,9 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
if (!isOpen()) {
throw new IOException("transferTo(" + getPath() + ") stream closed");
}
+
long orgOffset = clientOffset;
- while (!eofIndicator && max > 0) {
+ while ((!eofIndicator) && (max > 0L)) {
if (hasNoData()) {
fillData();
if (eofIndicator && hasNoData()) {
@@ -179,11 +171,11 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
return clientOffset - orgOffset;
}
- @SuppressWarnings("PMD.MissingOverride")
public long transferTo(OutputStream out) throws IOException {
if (!isOpen()) {
throw new IOException("transferTo(" + getPath() + ") stream closed");
}
+
long orgOffset = clientOffset;
while (!eofIndicator) {
if (hasNoData()) {
@@ -207,41 +199,46 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
if (!isOpen()) {
throw new IOException("skip(" + getPath() + ") stream closed");
}
- if (clientOffset == 0 && pendingReads.isEmpty()) {
+ if ((clientOffset == 0L) && pendingReads.isEmpty()) {
clientOffset = n;
return n;
}
return super.skip(n);
}
- boolean hasNoData() {
- return buffer == null || buffer.available() == 0;
+ protected boolean hasNoData() {
+ return (buffer == null) || (buffer.available() == 0);
}
- void sendRequests() throws IOException {
+ protected void sendRequests() throws IOException {
if (!eofIndicator) {
- long windowSize = client.getChannel().getLocalWindow().getMaxSize();
- while (pendingReads.size() < (int) (windowSize / bufferSize) && requestOffset < fileSize + bufferSize
+ Channel channel = client.getChannel();
+ Window localWindow = channel.getLocalWindow();
+ long windowSize = localWindow.getMaxSize();
+ Session session = client.getSession();
+ byte[] id = handle.getIdentifier();
+
+ while ((pendingReads.size() < (int) (windowSize / bufferSize)) && (requestOffset < (fileSize + bufferSize))
|| pendingReads.isEmpty()) {
- Buffer buf = client.getSession().createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA,
- 23 /* sftp packet */ + 16 + handle.getIdentifier().length);
+ Buffer buf = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA,
+ 23 /* sftp packet */ + 16 + id.length);
buf.rpos(23);
buf.wpos(23);
- buf.putBytes(handle.getIdentifier());
+ buf.putBytes(id);
buf.putLong(requestOffset);
buf.putInt(bufferSize);
int reqId = client.send(SftpConstants.SSH_FXP_READ, buf);
- pendingReads.add(new Ack(reqId, requestOffset, bufferSize));
+ pendingReads.add(new SftpAckData(reqId, requestOffset, bufferSize));
requestOffset += bufferSize;
}
}
}
- void fillData() throws IOException {
- Ack ack = pendingReads.pollFirst();
+ protected void fillData() throws IOException {
+ SftpAckData ack = pendingReads.pollFirst();
if (ack != null) {
pollBuffer(ack);
- if (!eofIndicator && clientOffset < ack.offset) {
+ if ((!eofIndicator) && (clientOffset < ack.offset)) {
// we are actually missing some data
// so request is synchronously
byte[] data = new byte[(int) (ack.offset - clientOffset + buffer.available())];
@@ -250,7 +247,8 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
AtomicReference<Boolean> eof = new AtomicReference<>();
while (cur < nb) {
int dlen = client.read(handle, clientOffset, data, cur, nb - cur, eof);
- eofIndicator = dlen < 0 || eof.get() != null && eof.get();
+ Boolean eofSignal = eof.getAndSet(null);
+ eofIndicator = (dlen < 0) || ((eofSignal != null) && eofSignal.booleanValue());
cur += dlen;
}
buffer.getRawBytes(data, nb, buffer.available());
@@ -259,7 +257,7 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
}
}
- void pollBuffer(Ack ack) throws IOException {
+ protected void pollBuffer(SftpAckData ack) throws IOException {
Buffer buf = client.receive(ack.id);
int length = buf.getInt();
int type = buf.getUByte();
@@ -270,7 +268,7 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
int rpos = buf.rpos();
buf.rpos(rpos + dlen);
Boolean b = SftpHelper.getEndOfFileIndicatorValue(buf, client.getVersion());
- eofIndicator = b != null && b;
+ eofIndicator = (b != null) && b.booleanValue();
buf.rpos(rpos);
buf.wpos(rpos + dlen);
this.buffer = buf;
@@ -298,7 +296,7 @@ public class SftpInputStreamAsync extends InputStreamWithChannel {
try {
try {
while (!pendingReads.isEmpty()) {
- Ack ack = pendingReads.removeFirst();
+ SftpAckData ack = pendingReads.removeFirst();
pollBuffer(ack);
}
} finally {
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
index d8f1974..b5b809c 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpOutputStreamAsync.java
@@ -28,6 +28,7 @@ import org.apache.sshd.client.subsystem.sftp.SftpClient;
import org.apache.sshd.client.subsystem.sftp.SftpClient.CloseableHandle;
import org.apache.sshd.client.subsystem.sftp.SftpClient.OpenMode;
import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.subsystem.sftp.SftpConstants;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
@@ -39,27 +40,15 @@ import org.apache.sshd.common.util.io.OutputStreamWithChannel;
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public class SftpOutputStreamAsync extends OutputStreamWithChannel {
-
- static class Ack {
- int id;
- long offset;
- int length;
-
- Ack(int id, long offset, int length) {
- this.id = id;
- this.offset = offset;
- this.length = length;
- }
- }
+ protected final byte[] bb = new byte[1];
+ protected final int bufferSize;
+ protected Buffer buffer;
+ protected CloseableHandle handle;
+ protected long offset;
+ protected final Deque<SftpAckData> pendingWrites = new LinkedList<>();
private final AbstractSftpClient client;
private final String path;
- private final byte[] bb = new byte[1];
- private final int bufferSize;
- private Buffer buffer;
- private CloseableHandle handle;
- private long offset;
- private final Deque<Ack> pendingWrites = new LinkedList<>();
public SftpOutputStreamAsync(AbstractSftpClient client, int bufferSize,
String path, Collection<OpenMode> mode) throws IOException {
@@ -112,14 +101,17 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel {
@Override
public void write(byte[] b, int off, int len) throws IOException {
+ byte[] id = handle.getIdentifier();
+ Session session = client.getSession();
+
do {
if (buffer == null) {
- buffer = client.getSession().createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize);
- int hdr = (9 + 16 + 8 + handle.getIdentifier().length) + buffer.wpos();
+ buffer = session.createBuffer(SshConstants.SSH_MSG_CHANNEL_DATA, bufferSize);
+ int hdr = 9 + 16 + 8 + id.length + buffer.wpos();
buffer.rpos(hdr);
buffer.wpos(hdr);
}
- int max = bufferSize - (9 + 16 + handle.getIdentifier().length + 72);
+ int max = bufferSize - (9 + 16 + id.length + 72);
int nb = Math.min(len, max - (buffer.wpos() - buffer.rpos()));
buffer.putRawBytes(b, off, nb);
if (buffer.available() == max) {
@@ -137,9 +129,9 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel {
}
for (;;) {
- Ack ack = pendingWrites.peek();
+ SftpAckData ack = pendingWrites.peek();
if (ack != null) {
- Buffer response = client.receive(ack.id, 0);
+ Buffer response = client.receive(ack.id, 0L);
if (response != null) {
pendingWrites.removeFirst();
client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
@@ -154,7 +146,7 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel {
byte[] id = handle.getIdentifier();
int avail = buffer.available();
Buffer buf;
- if (buffer.rpos() >= 16 + id.length) {
+ if (buffer.rpos() >= (16 + id.length)) {
int wpos = buffer.wpos();
buffer.rpos(buffer.rpos() - 16 - id.length);
buffer.wpos(buffer.rpos());
@@ -171,7 +163,7 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel {
}
int reqId = client.send(SftpConstants.SSH_FXP_WRITE, buf);
- pendingWrites.add(new Ack(reqId, offset, avail));
+ pendingWrites.add(new SftpAckData(reqId, offset, avail));
offset += avail;
buffer = null;
@@ -182,11 +174,11 @@ public class SftpOutputStreamAsync extends OutputStreamWithChannel {
if (isOpen()) {
try {
try {
- if (buffer != null && buffer.available() > 0) {
+ if ((buffer != null) && (buffer.available() > 0)) {
flush();
}
while (!pendingWrites.isEmpty()) {
- Ack ack = pendingWrites.removeFirst();
+ SftpAckData ack = pendingWrites.removeFirst();
Buffer response = client.receive(ack.id);
client.checkResponseStatus(SftpConstants.SSH_FXP_WRITE, response);
}
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
index b6275d1..b698c9f 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/client/subsystem/sftp/impl/SftpRemotePathChannel.java
@@ -227,18 +227,6 @@ public class SftpRemotePathChannel extends FileChannel {
return doWrite(buffers, -1L);
}
- static class Ack {
- int id;
- long offset;
- int length;
-
- Ack(int id, long offset, int length) {
- this.id = id;
- this.offset = offset;
- this.length = length;
- }
- }
-
protected long doWrite(Collection<? extends ByteBuffer> buffers, long position) throws IOException {
ensureOpen(WRITE_MODES);
@@ -362,11 +350,12 @@ public class SftpRemotePathChannel extends FileChannel {
try {
beginBlocking("transferTo");
+ // DO NOT CLOSE THE STREAM AS IT WOULD CLOSE THE HANDLE
+ @SuppressWarnings("resource")
SftpInputStreamAsync input = new SftpInputStreamAsync(
(AbstractSftpClient) sftp,
copySize, position, count, getRemotePath(), handle);
totalRead = input.transferTo(count, target);
- // DO NOT CLOSE THE STREAM AS IT WOULD CLOSE THE HANDLE
eof = input.isEof();
completed = true;
} finally {
@@ -379,7 +368,7 @@ public class SftpRemotePathChannel extends FileChannel {
this, position, count, copySize, totalRead, eof, target);
}
- return totalRead > 0L ? totalRead : eof ? -1L : 0L;
+ return (totalRead > 0L) ? totalRead : eof ? -1L : 0L;
}
@Override
@@ -400,7 +389,6 @@ public class SftpRemotePathChannel extends FileChannel {
}
boolean completed = false;
- long curPos = (position >= 0L) ? position : posTracker.get();
long totalRead = 0L;
byte[] buffer = new byte[(int) Math.min(copySize, count)];
@@ -408,6 +396,8 @@ public class SftpRemotePathChannel extends FileChannel {
try {
beginBlocking("transferFrom");
+ // DO NOT CLOSE THE OUTPUT STREAM AS IT WOULD CLOSE THE HANDLE
+ @SuppressWarnings("resource")
SftpOutputStreamAsync output = new SftpOutputStreamAsync(
(AbstractSftpClient) sftp,
copySize, getRemotePath(), handle);
@@ -417,7 +407,6 @@ public class SftpRemotePathChannel extends FileChannel {
int read = src.read(wrap);
if (read > 0) {
output.write(buffer, 0, read);
- curPos += read;
totalRead += read;
} else {
break;
@@ -441,8 +430,7 @@ public class SftpRemotePathChannel extends FileChannel {
@Override
public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
throw new UnsupportedOperationException(
- "map(" + getRemotePath() + ")"
- + "[" + mode + "," + position + "," + size + "] N/A");
+ "map(" + getRemotePath() + ")[" + mode + "," + position + "," + size + "] N/A");
}
@Override
diff --git a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java
index 6676eb4..9e4237d 100644
--- a/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java
+++ b/sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/FileHandle.java
@@ -110,6 +110,7 @@ public class FileHandle extends Handle {
return read(data, 0, data.length, offset);
}
+ @SuppressWarnings("resource")
public int read(byte[] data, int doff, int length, long offset) throws IOException {
SeekableByteChannel channel = getFileChannel();
channel = channel.position(offset);