You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2014/06/05 18:07:54 UTC
[1/2] [SSHD-312] Provide fully asynchronous interfaces for
ClientChannel and Command
Repository: mina-sshd
Updated Branches:
refs/heads/master f1381667d -> 58c7a835d
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/test/java/org/apache/sshd/util/AsyncEchoShellFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/util/AsyncEchoShellFactory.java b/sshd-core/src/test/java/org/apache/sshd/util/AsyncEchoShellFactory.java
new file mode 100644
index 0000000..3d02bb7
--- /dev/null
+++ b/sshd-core/src/test/java/org/apache/sshd/util/AsyncEchoShellFactory.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.sshd.common.Factory;
+import org.apache.sshd.common.channel.BufferedIoOutputStream;
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.Buffer;
+import org.apache.sshd.server.AsyncCommand;
+import org.apache.sshd.server.ChannelSessionAware;
+import org.apache.sshd.server.Command;
+import org.apache.sshd.server.Environment;
+import org.apache.sshd.server.ExitCallback;
+import org.apache.sshd.server.channel.ChannelDataReceiver;
+import org.apache.sshd.server.channel.ChannelSession;
+
+/**
+ * TODO Add javadoc
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public class AsyncEchoShellFactory implements Factory<Command> {
+
+ public Command create() {
+ return new EchoShell();
+ }
+
+ public static class EchoShell implements AsyncCommand, ChannelDataReceiver, ChannelSessionAware {
+
+ private IoOutputStream out;
+ private IoOutputStream err;
+ private ExitCallback callback;
+ private Environment environment;
+ private ChannelSession session;
+ private StringBuilder buffer = new StringBuilder();
+
+ public IoOutputStream getOut() {
+ return out;
+ }
+
+ public IoOutputStream getErr() {
+ return err;
+ }
+
+ public Environment getEnvironment() {
+ return environment;
+ }
+
+ public void setInputStream(InputStream in) {
+ }
+
+ public void setOutputStream(OutputStream out) {
+ }
+
+ public void setErrorStream(OutputStream err) {
+ }
+
+ public void setIoInputStream(IoInputStream in) {
+ }
+
+ public void setIoOutputStream(IoOutputStream out) {
+ this.out = new BufferedIoOutputStream(out);
+ }
+
+ public void setIoErrorStream(IoOutputStream err) {
+ this.err = new BufferedIoOutputStream(err);
+ }
+
+ public void setExitCallback(ExitCallback callback) {
+ this.callback = callback;
+ }
+
+ public void setChannelSession(ChannelSession session) {
+ this.session = session;
+ }
+
+ public void start(Environment env) throws IOException {
+ environment = env;
+ session.setDataReceiver(this);
+ }
+
+ public void close() throws IOException {
+ out.close(false).addListener(new SshFutureListener<CloseFuture>() {
+ public void operationComplete(CloseFuture future) {
+ callback.onExit(0);
+ }
+ });
+ }
+
+ public void destroy() {
+ }
+
+ public int data(final ChannelSession channel, byte[] buf, int start, int len) throws IOException {
+ buffer.append(new String(buf, start, len));
+ for (int i = 0; i < buffer.length(); i++) {
+ if (buffer.charAt(i) == '\n') {
+ final String s = buffer.substring(0, i + 1);
+ final byte[] bytes = s.getBytes();
+ out.write(new Buffer(bytes)).addListener(new SshFutureListener<IoWriteFuture>() {
+ public void operationComplete(IoWriteFuture future) {
+ try {
+ channel.getLocalWindow().consumeAndCheck(bytes.length);
+ } catch (IOException e) {
+ channel.getSession().exceptionCaught(e);
+ }
+ }
+ });
+ buffer = new StringBuilder(buffer.substring(i + 1));
+ i = 0;
+ }
+ }
+ return 0;
+ }
+ }
+
+}
[2/2] git commit: [SSHD-312] Provide fully asynchronous interfaces
for ClientChannel and Command
Posted by gn...@apache.org.
[SSHD-312] Provide fully asynchronous interfaces for ClientChannel and Command
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/58c7a835
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/58c7a835
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/58c7a835
Branch: refs/heads/master
Commit: 58c7a835d653b45befb801175a2cb33a99889368
Parents: f138166
Author: Guillaume Nodet <gn...@apache.org>
Authored: Thu Jun 5 18:07:44 2014 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Thu Jun 5 18:07:44 2014 +0200
----------------------------------------------------------------------
.../java/org/apache/sshd/ClientChannel.java | 17 ++
.../java/org/apache/sshd/ClientSession.java | 1 -
.../main/java/org/apache/sshd/SshServer.java | 2 +-
.../sshd/agent/local/AgentForwardedChannel.java | 3 +
.../sshd/agent/unix/AgentForwardedChannel.java | 3 +
.../client/channel/AbstractClientChannel.java | 67 +++++++-
.../sshd/client/channel/ChannelDirectTcpip.java | 15 +-
.../sshd/client/channel/ChannelSession.java | 63 ++++---
.../sshd/common/channel/AbstractChannel.java | 9 +-
.../common/channel/BufferedIoOutputStream.java | 92 ++++++++++
.../common/channel/ChannelAsyncInputStream.java | 171 +++++++++++++++++++
.../channel/ChannelAsyncOutputStream.java | 161 +++++++++++++++++
.../common/forward/DefaultTcpipForwarder.java | 2 +-
.../sshd/common/forward/TcpipClientChannel.java | 3 +
.../sshd/common/forward/TcpipServerChannel.java | 15 +-
.../apache/sshd/common/io/IoInputStream.java | 31 ++++
.../apache/sshd/common/io/IoOutputStream.java | 31 ++++
.../org/apache/sshd/common/io/IoReadFuture.java | 40 +++++
.../org/apache/sshd/common/io/IoSession.java | 1 +
.../apache/sshd/common/io/IoWriteFuture.java | 22 +--
.../sshd/common/io/ReadPendingException.java | 23 +++
.../sshd/common/io/WritePendingException.java | 23 +++
.../apache/sshd/common/io/mina/MinaSession.java | 15 +-
.../apache/sshd/common/io/nio2/Nio2Session.java | 15 +-
.../sshd/common/session/AbstractSession.java | 14 +-
.../org/apache/sshd/common/util/Buffer.java | 11 +-
.../apache/sshd/common/util/CloseableUtils.java | 122 ++++++++++++-
.../org/apache/sshd/server/AsyncCommand.java | 53 ++++++
.../sshd/server/channel/AsyncDataReceiver.java | 48 ++++++
.../sshd/server/channel/ChannelSession.java | 48 ++++--
.../sshd/server/x11/X11ForwardSupport.java | 7 +-
.../test/java/org/apache/sshd/ClientTest.java | 95 ++++++++++-
.../apache/sshd/util/AsyncEchoShellFactory.java | 140 +++++++++++++++
33 files changed, 1279 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/ClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/ClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/ClientChannel.java
index 4b01b0a..653f490 100644
--- a/sshd-core/src/main/java/org/apache/sshd/ClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/ClientChannel.java
@@ -23,6 +23,8 @@ import java.io.InputStream;
import java.io.OutputStream;
import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.future.CloseFuture;
/**
@@ -47,6 +49,21 @@ public interface ClientChannel {
int EXIT_SIGNAL = 0x0040;
int OPENED = 0x0080;
+ enum Streaming {
+ Async,
+ Sync
+ }
+
+ Streaming getStreaming();
+
+ void setStreaming(Streaming streaming);
+
+ IoOutputStream getAsyncIn();
+
+ IoInputStream getAsyncOut();
+
+ IoInputStream getAsyncErr();
+
/**
* Access to an output stream to send data directly to the remote channel.
* This can be used instead of using {@link #setIn(java.io.InputStream)} method
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/ClientSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/ClientSession.java b/sshd-core/src/main/java/org/apache/sshd/ClientSession.java
index 15b0005..bc4971d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/ClientSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/ClientSession.java
@@ -31,7 +31,6 @@ import org.apache.sshd.client.channel.ChannelShell;
import org.apache.sshd.client.channel.ChannelSubsystem;
import org.apache.sshd.client.future.AuthFuture;
import org.apache.sshd.common.Session;
-import org.apache.sshd.common.SshException;
import org.apache.sshd.common.SshdSocketAddress;
import org.apache.sshd.common.future.CloseFuture;
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/SshServer.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/SshServer.java b/sshd-core/src/main/java/org/apache/sshd/SshServer.java
index 7dd4403..69a5033 100644
--- a/sshd-core/src/main/java/org/apache/sshd/SshServer.java
+++ b/sshd-core/src/main/java/org/apache/sshd/SshServer.java
@@ -370,7 +370,7 @@ public class SshServer extends AbstractFactoryManager implements ServerFactoryMa
}
@Override
- protected SshFuture<CloseFuture> doCloseGracefully() {
+ protected CloseFuture doCloseGracefully() {
stopSessionTimeoutListener();
CloseFuture future;
if (acceptor != null) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
index 84d51eb..e632e2c 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/AgentForwardedChannel.java
@@ -75,6 +75,9 @@ public class AgentForwardedChannel extends AbstractClientChannel {
@Override
protected void doOpen() throws IOException {
+ if (streaming == Streaming.Async) {
+ throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel");
+ }
invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
index 4f097e4..ca7172a 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
@@ -62,6 +62,9 @@ public class AgentForwardedChannel extends AbstractClientChannel implements Runn
@Override
protected synchronized void doOpen() throws IOException {
+ if (streaming == Streaming.Async) {
+ throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel");
+ }
invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 1e89c84..793e8b8 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -26,10 +26,15 @@ import org.apache.sshd.ClientChannel;
import org.apache.sshd.client.future.DefaultOpenFuture;
import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.Channel;
+import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.RequestHandler;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.channel.AbstractChannel;
+import org.apache.sshd.common.channel.ChannelAsyncInputStream;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.util.Buffer;
@@ -45,6 +50,13 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
protected volatile boolean opened;
protected final String type;
+
+ protected Streaming streaming;
+
+ protected ChannelAsyncOutputStream asyncIn;
+ protected ChannelAsyncInputStream asyncOut;
+ protected ChannelAsyncInputStream asyncErr;
+
protected InputStream in;
protected OutputStream invertedIn;
protected OutputStream out;
@@ -59,10 +71,31 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
protected AbstractClientChannel(String type) {
this.type = type;
+ this.streaming = Streaming.Sync;
addRequestHandler(new ExitStatusChannelRequestHandler());
addRequestHandler(new ExitSignalChannelRequestHandler());
}
+ public Streaming getStreaming() {
+ return streaming;
+ }
+
+ public void setStreaming(Streaming streaming) {
+ this.streaming = streaming;
+ }
+
+ public IoOutputStream getAsyncIn() {
+ return asyncIn;
+ }
+
+ public IoInputStream getAsyncOut() {
+ return asyncOut;
+ }
+
+ public IoInputStream getAsyncErr() {
+ return asyncErr;
+ }
+
public OutputStream getInvertedIn() {
return invertedIn;
}
@@ -137,6 +170,14 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
}
@Override
+ protected Closeable getInnerCloseable() {
+ return builder()
+ .parallel(asyncIn, asyncOut, asyncErr)
+ .close(super.getInnerCloseable())
+ .build();
+ }
+
+ @Override
protected void doCloseImmediately() {
// Close inverted streams after
// If the inverted stream is closed before, there's a small time window
@@ -146,6 +187,8 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
// which leads to an IOException("Pipe closed") when reading.
IoUtils.closeQuietly(in, out, err);
IoUtils.closeQuietly(invertedIn, invertedOut, invertedErr);
+ // TODO: graceful close ?
+ CloseableUtils.parallel(asyncIn, asyncOut, asyncErr).close(true);
super.doCloseImmediately();
}
@@ -252,23 +295,39 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
if (state.get() != CloseableUtils.AbstractCloseable.OPENED) {
return;
}
- if (out != null) {
+ if (asyncOut != null) {
+ asyncOut.write(new Buffer(data, off, len));
+ } else if (out != null) {
out.write(data, off, len);
out.flush();
+ localWindow.consumeAndCheck(len);
} else {
throw new IllegalStateException("No output stream for channel");
}
- localWindow.consumeAndCheck(len);
}
protected void doWriteExtendedData(byte[] data, int off, int len) throws IOException {
- if (err != null) {
+ // If we're already closing, ignore incoming data
+ if (state.get() != CloseableUtils.AbstractCloseable.OPENED) {
+ return;
+ }
+ if (asyncErr != null) {
+ asyncErr.write(new Buffer(data, off, len));
+ } else if (err != null) {
err.write(data, off, len);
err.flush();
+ localWindow.consumeAndCheck(len);
} else {
throw new IllegalStateException("No error stream for channel");
}
- localWindow.consumeAndCheck(len);
+ }
+
+ @Override
+ public void handleWindowAdjust(Buffer buffer) throws IOException {
+ super.handleWindowAdjust(buffer);
+ if (asyncIn != null) {
+ asyncIn.onWindowExpanded();
+ }
}
public Integer getExitStatus() {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
index 66d3456..36ffd28 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelDirectTcpip.java
@@ -27,6 +27,8 @@ import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.SshdSocketAddress;
+import org.apache.sshd.common.channel.ChannelAsyncInputStream;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
import org.apache.sshd.common.channel.ChannelOutputStream;
import org.apache.sshd.common.channel.ChannelPipedInputStream;
import org.apache.sshd.common.channel.ChannelPipedOutputStream;
@@ -81,10 +83,15 @@ public class ChannelDirectTcpip extends AbstractClientChannel {
@Override
protected void doOpen() throws IOException {
- invertedIn = out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
- ChannelPipedInputStream pis = new ChannelPipedInputStream(localWindow);
- pipe = new ChannelPipedOutputStream(pis);
- invertedOut = in = pis;
+ if (streaming == Streaming.Async) {
+ asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA);
+ asyncOut = new ChannelAsyncInputStream(this);
+ } else {
+ invertedIn = out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
+ ChannelPipedInputStream pis = new ChannelPipedInputStream(localWindow);
+ pipe = new ChannelPipedOutputStream(pis);
+ invertedOut = in = pis;
+ }
}
public OpenFuture open() throws IOException {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index ac2b713..e02160b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -26,8 +26,9 @@ import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.channel.ChannelOutputStream;
import org.apache.sshd.common.channel.ChannelPipedInputStream;
import org.apache.sshd.common.channel.ChannelPipedOutputStream;
+import org.apache.sshd.common.channel.ChannelAsyncInputStream;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
import org.apache.sshd.common.future.CloseFuture;
-import org.apache.sshd.common.util.Buffer;
import org.apache.sshd.common.util.CloseableUtils;
/**
@@ -49,31 +50,47 @@ public class ChannelSession extends AbstractClientChannel {
@Override
protected void doOpen() throws IOException {
- invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
- if (out == null) {
- ChannelPipedInputStream pis = new ChannelPipedInputStream(localWindow);
- ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis);
- out = pos;
- invertedOut = pis;
- }
- if (err == null) {
- ChannelPipedInputStream pis = new ChannelPipedInputStream(localWindow);
- ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis);
- err = pos;
- invertedErr = pis;
- }
- if (in != null) {
- streamPumper = new Thread("ClientInputStreamPump") {
+ if (streaming == Streaming.Async) {
+ asyncIn = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA) {
@Override
- public void run() {
- pumpInputStream();
+ protected CloseFuture doCloseGracefully() {
+ try {
+ sendEof();
+ } catch (IOException e) {
+ session.exceptionCaught(e);
+ }
+ return super.doCloseGracefully();
}
};
- // Interrupt does not really work and the thread will only exit when
- // the call to read() will return. So ensure this thread is a daemon
- // to avoid blocking the whole app
- streamPumper.setDaemon(true);
- streamPumper.start();
+ asyncOut = new ChannelAsyncInputStream(this);
+ asyncErr = new ChannelAsyncInputStream(this);
+ } else {
+ invertedIn = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
+ if (out == null) {
+ ChannelPipedInputStream pis = new ChannelPipedInputStream(localWindow);
+ ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis);
+ out = pos;
+ invertedOut = pis;
+ }
+ if (err == null) {
+ ChannelPipedInputStream pis = new ChannelPipedInputStream(localWindow);
+ ChannelPipedOutputStream pos = new ChannelPipedOutputStream(pis);
+ err = pos;
+ invertedErr = pis;
+ }
+ if (in != null) {
+ streamPumper = new Thread("ClientInputStreamPump") {
+ @Override
+ public void run() {
+ pumpInputStream();
+ }
+ };
+ // Interrupt does not really work and the thread will only exit when
+ // the call to read() will return. So ensure this thread is a daemon
+ // to avoid blocking the whole app
+ streamPumper.setDaemon(true);
+ streamPumper.start();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 2e333e2..4bfb4e6 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -139,7 +139,7 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea
}
public void handleClose() throws IOException {
- log.debug("Received SSH_MSG_CHANNEL_CLOSE on channel {}", this);
+ log.debug("Received SSH_MSG_CHANNEL_CLOSE on channel {}", this);
if (gracefulState.compareAndSet(0, CLOSE_RECV)) {
close(false);
} else if (gracefulState.compareAndSet(CLOSE_SENT, CLOSE_SENT | CLOSE_RECV)) {
@@ -147,7 +147,7 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea
}
}
- protected Closeable getGracefulCloseable() {
+ protected Closeable getInnerCloseable() {
return new Closeable() {
public boolean isClosed() {
return gracefulFuture.isClosed();
@@ -188,11 +188,6 @@ public abstract class AbstractChannel extends CloseableUtils.AbstractInnerClosea
}
@Override
- protected Closeable getInnerCloseable() {
- return getGracefulCloseable();
- }
-
- @Override
protected void doCloseImmediately() {
super.doCloseImmediately();
service.unregisterChannel(AbstractChannel.this);
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
new file mode 100644
index 0000000..3c342c3
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.Buffer;
+import org.apache.sshd.common.util.CloseableUtils;
+
+/**
+ * An IoOutputStream capable of queuing write requests
+ */
+public class BufferedIoOutputStream extends CloseableUtils.AbstractInnerCloseable implements IoOutputStream {
+
+ private final IoOutputStream out;
+ private final Queue<ChannelAsyncOutputStream.IoWriteFutureImpl> writes = new ConcurrentLinkedQueue<ChannelAsyncOutputStream.IoWriteFutureImpl>();
+ private final AtomicReference<ChannelAsyncOutputStream.IoWriteFutureImpl> currentWrite = new AtomicReference<ChannelAsyncOutputStream.IoWriteFutureImpl>();
+
+ public BufferedIoOutputStream(IoOutputStream out) {
+ this.out = out;
+ }
+
+ public IoWriteFuture write(Buffer buffer) {
+ final ChannelAsyncOutputStream.IoWriteFutureImpl future = new ChannelAsyncOutputStream.IoWriteFutureImpl(buffer);
+ if (isClosing()) {
+ future.setValue(new IOException("Closed"));
+ } else {
+ writes.add(future);
+ startWriting();
+ }
+ return future;
+ }
+
+ private void startWriting() {
+ final ChannelAsyncOutputStream.IoWriteFutureImpl future = writes.peek();
+ if (future != null) {
+ if (currentWrite.compareAndSet(null, future)) {
+ out.write(future.getBuffer()).addListener(new SshFutureListener<IoWriteFuture>() {
+ public void operationComplete(IoWriteFuture f) {
+ if (f.isWritten()) {
+ future.setValue(true);
+ } else {
+ future.setValue(f.getException());
+ }
+ finishWrite();
+ }
+ private void finishWrite() {
+ writes.remove(future);
+ currentWrite.compareAndSet(future, null);
+ startWriting();
+ }
+ });
+ }
+ }
+ }
+
+ @Override
+ protected Closeable getInnerCloseable() {
+ return builder()
+ .when(writes)
+ .close(out)
+ .build();
+ }
+
+ @Override
+ public String toString() {
+ return "BufferedIoOutputStream[" + out + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
new file mode 100644
index 0000000..71155f1
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+import java.io.IOException;
+
+import org.apache.sshd.common.Channel;
+import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.RuntimeSshException;
+import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.DefaultCloseFuture;
+import org.apache.sshd.common.future.DefaultSshFuture;
+import org.apache.sshd.common.future.SshFuture;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoReadFuture;
+import org.apache.sshd.common.io.ReadPendingException;
+import org.apache.sshd.common.util.Buffer;
+import org.apache.sshd.common.util.CloseableUtils;
+import org.apache.sshd.common.util.Readable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ChannelAsyncInputStream extends CloseableUtils.AbstractInnerCloseable implements IoInputStream {
+
+ private final Channel channel;
+ private final Buffer buffer = new Buffer();
+ private IoReadFutureImpl pending;
+
+ public ChannelAsyncInputStream(Channel channel) {
+ this.channel = channel;
+ }
+
+ public void write(Readable src) throws IOException {
+ synchronized (buffer) {
+ buffer.putBuffer(src);
+ }
+ doRead(true);
+ }
+
+ public IoReadFuture read(Buffer buf) {
+ IoReadFutureImpl future = new IoReadFutureImpl(buf);
+ if (isClosing()) {
+ future.setValue(new IOException("Closed"));
+ } else {
+ synchronized (buffer) {
+ if (pending != null) {
+ throw new ReadPendingException();
+ }
+ pending = future;
+ }
+ doRead(false);
+ }
+ return future;
+ }
+
+ @Override
+ protected Closeable getInnerCloseable() {
+ synchronized (buffer) {
+ if (buffer.available() == 0) {
+ if (pending != null) {
+ pending.setValue(new SshException("Closed"));
+ }
+ }
+ return builder()
+ .when(pending)
+ .build();
+ }
+ }
+
+ private void doRead(boolean resume) {
+ IoReadFutureImpl future = null;
+ int nbRead = 0;
+ synchronized (buffer) {
+ if (buffer.available() > 0) {
+ if (resume) {
+// LOGGER.debug("Resuming read due to incoming data");
+ }
+ future = pending;
+ pending = null;
+ if (future != null) {
+ nbRead = future.buffer.putBuffer(buffer, false);
+ buffer.compact();
+ }
+ } else {
+ if (!resume) {
+// LOGGER.debug("Delaying read until data is available");
+ }
+ }
+ }
+ if (nbRead > 0) {
+ try {
+ channel.getLocalWindow().consumeAndCheck(nbRead);
+ } catch (IOException e) {
+ channel.getSession().exceptionCaught(e);
+ }
+ future.setValue(nbRead);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ChannelAsyncInputStream[" + channel + "]";
+ }
+
+ public static class IoReadFutureImpl extends DefaultSshFuture<IoReadFuture> implements IoReadFuture {
+
+ final Buffer buffer;
+
+ public IoReadFutureImpl(Buffer buffer) {
+ super(null);
+ this.buffer = buffer;
+ }
+
+ public Buffer getBuffer() {
+ return buffer;
+ }
+
+ public void verify() throws SshException {
+ try {
+ await();
+ }
+ catch (InterruptedException e) {
+ throw new SshException("Interrupted", e);
+ }
+ if (getValue() instanceof Throwable) {
+ throw new SshException("Write failed", getException());
+ }
+ }
+ public int getRead() {
+ Object v = getValue();
+ if (v instanceof RuntimeException) {
+ throw (RuntimeException) v;
+ } else if (v instanceof Error) {
+ throw (Error) v;
+ } else if (v instanceof Throwable) {
+ throw (RuntimeSshException) new RuntimeSshException("Error reading from channel.").initCause((Throwable) v);
+ } else if (v instanceof Integer) {
+ return (Integer) v;
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+
+ public Throwable getException() {
+ Object v = getValue();
+ if (v instanceof Throwable) {
+ return (Throwable) v;
+ } else {
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
new file mode 100644
index 0000000..3455c15
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.channel;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.sshd.common.Channel;
+import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.future.DefaultSshFuture;
+import org.apache.sshd.common.io.IoOutputStream;
+import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.io.WritePendingException;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.util.Buffer;
+import org.apache.sshd.common.util.CloseableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ChannelAsyncOutputStream extends CloseableUtils.AbstractInnerCloseable implements IoOutputStream {
+
+ private final Channel channel;
+ private final byte cmd;
+ private final AtomicReference<IoWriteFutureImpl> pendingWrite = new AtomicReference<IoWriteFutureImpl>();
+
+ public ChannelAsyncOutputStream(Channel channel, byte cmd) {
+ this.channel = channel;
+ this.cmd = cmd;
+ }
+
+ public void onWindowExpanded() throws IOException {
+ doWriteIfPossible(true);
+ }
+
+ public synchronized IoWriteFuture write(final Buffer buffer) {
+ final IoWriteFutureImpl future = new IoWriteFutureImpl(buffer);
+ if (isClosing()) {
+ future.setValue(new IOException("Closed"));
+ } else {
+ if (!pendingWrite.compareAndSet(null, future)) {
+ throw new WritePendingException();
+ }
+ doWriteIfPossible(false);
+ }
+ return future;
+ }
+
+ @Override
+ protected Closeable getInnerCloseable() {
+ return builder()
+ .when(pendingWrite.get())
+ .build();
+ }
+
+ protected synchronized void doWriteIfPossible(boolean resume) {
+ final IoWriteFutureImpl future = pendingWrite.get();
+ if (future != null) {
+ final Buffer buffer = future.buffer;
+ final int total = buffer.available();
+ if (total > 0) {
+ final int length = Math.min(Math.min(channel.getRemoteWindow().getSize(), total), channel.getRemoteWindow().getPacketSize());
+ if (length > 0) {
+ if (resume) {
+ log.debug("Resuming write due to more space available in the remote window");
+ }
+ Buffer buf = channel.getSession().createBuffer(cmd, length + 12);
+ buf.putInt(channel.getRecipient());
+ if (cmd == SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA) {
+ buf.putInt(1);
+ }
+ buf.putInt(length);
+ buf.putRawBytes(buffer.array(), buffer.rpos(), length);
+ buffer.rpos(buffer.rpos() + length);
+ channel.getRemoteWindow().consume(length);
+ try {
+ channel.getSession().writePacket(buf).addListener(new SshFutureListener<org.apache.sshd.common.io.IoWriteFuture>() {
+ public void operationComplete(org.apache.sshd.common.io.IoWriteFuture f) {
+ if (total > length) {
+ doWriteIfPossible(false);
+ } else {
+ pendingWrite.compareAndSet(future, null);
+ future.setValue(true);
+ }
+ }
+ });
+ } catch (IOException e) {
+ future.setValue(e);
+ }
+ } else if (!resume) {
+ log.debug("Delaying write until space is available in the remote window");
+ }
+ } else {
+ pendingWrite.compareAndSet(future, null);
+ future.setValue(true);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ChannelAsyncOutputStream[" + channel + "]";
+ }
+
+ public static class IoWriteFutureImpl extends DefaultSshFuture<IoWriteFuture> implements IoWriteFuture {
+
+ final Buffer buffer;
+
+ public IoWriteFutureImpl(Buffer buffer) {
+ super(null);
+ this.buffer = buffer;
+ }
+
+ public Buffer getBuffer() {
+ return buffer;
+ }
+
+ public void verify() throws SshException {
+ try {
+ await();
+ }
+ catch (InterruptedException e) {
+ throw new SshException("Interrupted", e);
+ }
+ if (!isWritten()) {
+ throw new SshException("Write failed", getException());
+ }
+ }
+
+ public boolean isWritten() {
+ return getValue() instanceof Boolean;
+ }
+
+ public Throwable getException() {
+ Object v = getValue();
+ if (v instanceof Throwable) {
+ return (Throwable) v;
+ } else {
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
index 57e73f0..6d664d7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
@@ -164,7 +164,7 @@ public class DefaultTcpipForwarder extends CloseableUtils.AbstractInnerCloseable
@Override
protected synchronized Closeable getInnerCloseable() {
- return acceptor != null ? acceptor : new CloseableUtils.AbstractCloseable() { };
+ return builder().close(acceptor).build();
}
//
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
index 479fb24..e449c8d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipClientChannel.java
@@ -96,6 +96,9 @@ public class TcpipClientChannel extends AbstractClientChannel {
@Override
protected synchronized void doOpen() throws IOException {
+ if (streaming == Streaming.Async) {
+ throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel");
+ }
invertedIn = out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
index 667876a..9bbdea3 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/TcpipServerChannel.java
@@ -36,6 +36,7 @@ import org.apache.sshd.common.io.IoConnectFuture;
import org.apache.sshd.common.io.IoConnector;
import org.apache.sshd.common.io.IoHandler;
import org.apache.sshd.common.io.IoSession;
+import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.util.Buffer;
import org.apache.sshd.common.util.Readable;
import org.apache.sshd.server.channel.AbstractServerChannel;
@@ -107,6 +108,7 @@ public class TcpipServerChannel extends AbstractServerChannel {
return f;
}
+ // TODO: revisit for better threading. Use async io ?
out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
IoHandler handler = new IoHandler() {
public void messageReceived(IoSession session, Readable message) throws Exception {
@@ -182,12 +184,19 @@ public class TcpipServerChannel extends AbstractServerChannel {
});
}
- protected void doWriteData(byte[] data, int off, int len) throws IOException {
- localWindow.consumeAndCheck(len);
+ protected void doWriteData(byte[] data, int off, final int len) throws IOException {
// Make sure we copy the data as the incoming buffer may be reused
Buffer buf = new Buffer(data, off, len);
buf = new Buffer(buf.getCompactData());
- ioSession.write(buf);
+ ioSession.write(buf).addListener(new SshFutureListener<IoWriteFuture>() {
+ public void operationComplete(IoWriteFuture future) {
+ try {
+ localWindow.consumeAndCheck(len);
+ } catch (IOException e) {
+ session.exceptionCaught(e);
+ }
+ }
+ });
}
protected void doWriteExtendedData(byte[] data, int off, int len) throws IOException {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/IoInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoInputStream.java
new file mode 100644
index 0000000..c585b67
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoInputStream.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.util.Buffer;
+
+public interface IoInputStream extends Closeable {
+
+ /**
+ * NOTE: the buffer must not be touched until the returned read future is completed.
+ */
+ IoReadFuture read(Buffer buffer);
+
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java
new file mode 100644
index 0000000..f2b8994
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoOutputStream.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.util.Buffer;
+
+public interface IoOutputStream extends Closeable {
+
+ /**
+ * NOTE: the buffer must not be touched until the returned write future is completed.
+ */
+ IoWriteFuture write(Buffer buffer);
+
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java
new file mode 100644
index 0000000..d4f07a9
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoReadFuture.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.future.SshFuture;
+import org.apache.sshd.common.util.Buffer;
+
+public interface IoReadFuture extends SshFuture<IoReadFuture> {
+
+ /**
+ * Wait and verify that the read succeeded.
+ *
+ * @throws org.apache.sshd.common.SshException if the write failed for any reason
+ */
+ void verify() throws SshException;
+
+ Buffer getBuffer();
+
+ int getRead();
+
+ Throwable getException();
+
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java
index 9e78fba..caccfca 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoSession.java
@@ -62,6 +62,7 @@ public interface IoSession extends Closeable {
/**
* Write a packet on the socket.
+ * Multiple writes can be issued concurrently and will be queued.
*/
IoWriteFuture write(Buffer buffer);
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/IoWriteFuture.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/IoWriteFuture.java b/sshd-core/src/main/java/org/apache/sshd/common/io/IoWriteFuture.java
index 79d1ae2..121d135 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/IoWriteFuture.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/IoWriteFuture.java
@@ -18,11 +18,19 @@
*/
package org.apache.sshd.common.io;
+import org.apache.sshd.common.SshException;
import org.apache.sshd.common.future.SshFuture;
public interface IoWriteFuture extends SshFuture<IoWriteFuture> {
/**
+ * Wait and verify that the write succeeded.
+ *
+ * @throws SshException if the write failed for any reason
+ */
+ void verify() throws SshException;
+
+ /**
* Returns <tt>true</tt> if the write operation is finished successfully.
*/
boolean isWritten();
@@ -34,18 +42,4 @@ public interface IoWriteFuture extends SshFuture<IoWriteFuture> {
*/
Throwable getException();
- /**
- * Sets the message is written, and notifies all threads waiting for
- * this future. This method is invoked by MINA internally. Please do
- * not call this method directly.
- */
- void setWritten();
-
- /**
- * Sets the cause of the write failure, and notifies all threads waiting
- * for this future. This method is invoked by MINA internally. Please
- * do not call this method directly.
- */
- void setException(Throwable cause);
-
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/ReadPendingException.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/ReadPendingException.java b/sshd-core/src/main/java/org/apache/sshd/common/io/ReadPendingException.java
new file mode 100644
index 0000000..261acac
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/ReadPendingException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+public class ReadPendingException extends IllegalStateException {
+
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/WritePendingException.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/WritePendingException.java b/sshd-core/src/main/java/org/apache/sshd/common/io/WritePendingException.java
new file mode 100644
index 0000000..5fde663
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/WritePendingException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.common.io;
+
+public class WritePendingException extends IllegalStateException {
+
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
index d366fb6..6f07207 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
@@ -25,6 +25,7 @@ import org.apache.mina.core.future.IoFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.future.WriteFuture;
import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.SshException;
import org.apache.sshd.common.future.DefaultCloseFuture;
import org.apache.sshd.common.future.DefaultSshFuture;
import org.apache.sshd.common.io.IoService;
@@ -106,6 +107,18 @@ public class MinaSession extends CloseableUtils.AbstractInnerCloseable implement
super(lock);
}
+ public void verify() throws SshException {
+ try {
+ await();
+ }
+ catch (InterruptedException e) {
+ throw new SshException("Interrupted", e);
+ }
+ if (!isWritten()) {
+ throw new SshException("Write failed", getException());
+ }
+ }
+
public boolean isWritten() {
return getValue() instanceof Boolean;
}
@@ -126,7 +139,7 @@ public class MinaSession extends CloseableUtils.AbstractInnerCloseable implement
setValue(exception);
}
}
- final IoWriteFuture future = new Future(null);
+ final Future future = new Future(null);
session.write(MinaSupport.asIoBuffer(buffer)).addListener(new IoFutureListener<WriteFuture>() {
public void operationComplete(WriteFuture cf) {
if (cf.getException() != null) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
index c044815..03b17e6 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Session.java
@@ -31,6 +31,7 @@ import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.sshd.common.SshException;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.DefaultSshFuture;
import org.apache.sshd.common.future.SshFuture;
@@ -118,7 +119,7 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
private void exceptionCaught(Throwable exc) {
if (!closeFuture.isClosed()) {
- if (state.get() != OPENED || !socket.isOpen()) {
+ if (isClosing() || !socket.isOpen()) {
close(true);
} else {
try {
@@ -253,6 +254,18 @@ public class Nio2Session extends CloseableUtils.AbstractCloseable implements IoS
super(lock);
this.buffer = buffer;
}
+ public void verify() throws SshException {
+ try {
+ await();
+ }
+ catch (InterruptedException e) {
+ throw new SshException("Interrupted", e);
+ }
+ if (!isWritten()) {
+ throw new SshException("Write failed", getException());
+ }
+ }
+
public boolean isWritten() {
return getValue() instanceof Boolean;
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index f2cddf9..ac859d7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -1266,6 +1266,18 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
return buffer;
}
+ public void verify() throws SshException {
+ try {
+ await();
+ }
+ catch (InterruptedException e) {
+ throw new SshException("Interrupted", e);
+ }
+ if (!isWritten()) {
+ throw new SshException("Write failed", getException());
+ }
+ }
+
public boolean isWritten() {
return getValue() instanceof Boolean;
}
@@ -1290,7 +1302,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
if (future.isWritten()) {
setWritten();
} else {
- future.setException(future.getException());
+ setException(future.getException());
}
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/util/Buffer.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/Buffer.java b/sshd-core/src/main/java/org/apache/sshd/common/util/Buffer.java
index 42c0d50..824144b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/Buffer.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/Buffer.java
@@ -116,6 +116,10 @@ public final class Buffer implements Readable {
return wpos - rpos;
}
+ public int capacity() {
+ return data.length - wpos;
+ }
+
public byte[] array() {
return data;
}
@@ -372,10 +376,15 @@ public final class Buffer implements Readable {
}
public void putBuffer(Readable buffer) {
- int r = buffer.available();
+ putBuffer(buffer, true);
+ }
+
+ public int putBuffer(Readable buffer, boolean expand) {
+ int r = expand ? buffer.available() : Math.min(buffer.available(), capacity());
ensureCapacity(r);
buffer.getRawBytes(data, wpos, r);
wpos += r;
+ return r;
}
/**
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
index 2ae71c9..5d1d97f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/CloseableUtils.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.sshd.common.Closeable;
+import org.apache.sshd.common.SshException;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.DefaultCloseFuture;
import org.apache.sshd.common.future.DefaultSshFuture;
@@ -197,7 +198,11 @@ public class CloseableUtils {
}
public static <T extends SshFuture> CloseFuture parallel(final SshFuture<T>... futures) {
- final CloseFuture future = new DefaultCloseFuture(null);
+ return parallel(null, futures);
+ }
+
+ public static <T extends SshFuture> CloseFuture parallel(Object lock, final SshFuture<T>... futures) {
+ final CloseFuture future = new DefaultCloseFuture(lock);
if (futures.length > 0) {
final AtomicInteger count = new AtomicInteger(futures.length);
SshFutureListener<T> listener = new SshFutureListener<T>() {
@@ -220,6 +225,98 @@ public class CloseableUtils {
return future;
}
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static Builder builder(Logger logger, Object lock) {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private final Object lock;
+ private Closeable closeable = null;
+ public Builder() {
+ this(null);
+ }
+ public Builder(Object lock) {
+ this.lock = lock;
+ }
+ public <T extends SshFuture> Builder when(final SshFuture<T>... futures) {
+ return close(new Closeable() {
+ private volatile boolean closing;
+ private volatile boolean closed;
+ public CloseFuture close(boolean immediately) {
+ closing = true;
+ if (immediately) {
+ for (SshFuture<?> future : futures) {
+ if (future instanceof DefaultSshFuture) {
+ ((DefaultSshFuture<?>) future).setValue(new SshException("Closed"));
+ }
+ }
+ closed = true;
+ return closed();
+ } else {
+ return CloseableUtils.parallel(lock, futures).addListener(new SshFutureListener<CloseFuture>() {
+ public void operationComplete(CloseFuture future) {
+ closed = true;
+ }
+ });
+ }
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ public boolean isClosing() {
+ return closing || closed;
+ }
+ });
+ }
+ public <T extends SshFuture> Builder when(Collection<? extends SshFuture<T>> futures) {
+ return when(futures.toArray(new SshFuture[futures.size()]));
+ }
+ public Builder sequential(Closeable... closeables) {
+ return close(CloseableUtils.sequential(lock, closeables));
+ }
+ public Builder sequential(Collection<Closeable> closeables) {
+ return close(CloseableUtils.sequential(lock, closeables));
+ }
+ public Builder parallel(Closeable... closeables) {
+ return close(CloseableUtils.parallel(lock, closeables));
+ }
+ public Builder parallel(Collection<? extends Closeable> closeables) {
+ return close(CloseableUtils.parallel(lock, closeables));
+ }
+ public Builder close(Closeable c) {
+ if (closeable == null) {
+ closeable = c;
+ } else {
+ closeable = CloseableUtils.sequential(lock, closeable, c);
+ }
+ return this;
+ }
+ public Closeable build() {
+ if (closeable == null) {
+ closeable = new Closeable() {
+ private volatile boolean closed;
+ public CloseFuture close(boolean immediately) {
+ closed = true;
+ return closed();
+ }
+ public boolean isClosed() {
+ return closed;
+ }
+ public boolean isClosing() {
+ return closed;
+ }
+ };
+ }
+ return closeable;
+ }
+ }
+
public static abstract class AbstractCloseable implements Closeable {
protected static final int OPENED = 0;
@@ -230,11 +327,20 @@ public class CloseableUtils {
/** Our logger */
protected final Logger log = LoggerFactory.getLogger(getClass());
/** Lock object for this session state */
- protected final Object lock = new Object();
+ protected final Object lock;
/** State of this object */
protected final AtomicInteger state = new AtomicInteger(OPENED);
/** A future that will be set 'closed' when the object is actually closed */
- protected final CloseFuture closeFuture = new DefaultCloseFuture(lock);
+ protected final CloseFuture closeFuture;
+
+ protected AbstractCloseable() {
+ this(new Object());
+ }
+
+ protected AbstractCloseable(Object lock) {
+ this.lock = lock;
+ this.closeFuture = new DefaultCloseFuture(lock);
+ }
public CloseFuture close(boolean immediately) {
if (immediately) {
@@ -256,7 +362,7 @@ public class CloseableUtils {
public void operationComplete(CloseFuture future) {
if (state.compareAndSet(GRACEFUL, IMMEDIATE)) {
doCloseImmediately();
- log.debug("{} closed", this);
+ log.debug("{} closed", AbstractCloseable.this);
}
}
});
@@ -284,7 +390,7 @@ public class CloseableUtils {
protected void preClose() {
}
- protected SshFuture<CloseFuture> doCloseGracefully() {
+ protected CloseFuture doCloseGracefully() {
return null;
}
@@ -293,6 +399,10 @@ public class CloseableUtils {
state.set(CLOSED);
}
+ protected Builder builder() {
+ return new Builder(lock);
+ }
+
}
public static abstract class AbstractInnerCloseable extends AbstractCloseable {
@@ -300,7 +410,7 @@ public class CloseableUtils {
protected abstract Closeable getInnerCloseable();
@Override
- protected SshFuture<CloseFuture> doCloseGracefully() {
+ protected CloseFuture doCloseGracefully() {
return getInnerCloseable().close(false);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/server/AsyncCommand.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/AsyncCommand.java b/sshd-core/src/main/java/org/apache/sshd/server/AsyncCommand.java
new file mode 100644
index 0000000..8654dce
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/server/AsyncCommand.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.server;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.io.IoOutputStream;
+
+/**
+ * Represents a command capable of doing non-blocking io.
+ * If this interface is implemented by a command, the usual
+ * blocking input / output / error streams won't be set.
+ */
+public interface AsyncCommand extends Command {
+
+ /**
+ * Set the input stream that can be used by the shell to read input.
+ * @param in
+ */
+ void setIoInputStream(IoInputStream in);
+
+ /**
+ * Set the output stream that can be used by the shell to write its output.
+ * @param out
+ */
+ void setIoOutputStream(IoOutputStream out);
+
+ /**
+ * Set the error stream that can be used by the shell to write its errors.
+ * @param err
+ */
+ void setIoErrorStream(IoOutputStream err);
+
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/server/channel/AsyncDataReceiver.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/AsyncDataReceiver.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/AsyncDataReceiver.java
new file mode 100644
index 0000000..24d19b3
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/AsyncDataReceiver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sshd.server.channel;
+
+import java.io.IOException;
+
+import org.apache.sshd.common.Channel;
+import org.apache.sshd.common.channel.ChannelAsyncInputStream;
+import org.apache.sshd.common.io.IoInputStream;
+import org.apache.sshd.common.util.Buffer;
+
+public class AsyncDataReceiver implements ChannelDataReceiver {
+
+ private ChannelAsyncInputStream in;
+
+ public AsyncDataReceiver(Channel channel) {
+ in = new ChannelAsyncInputStream(channel);
+ }
+
+ public IoInputStream getIn() {
+ return in;
+ }
+
+ public int data(ChannelSession channel, byte[] buf, int start, int len) throws IOException {
+ in.write(new Buffer(buf, start, len));
+ return 0;
+ }
+
+ public void close() throws IOException {
+ in.close(false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index 5866a59..b797346 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -39,6 +39,8 @@ import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.common.PtyMode;
import org.apache.sshd.common.RequestHandler;
import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.channel.ChannelAsyncInputStream;
+import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
import org.apache.sshd.common.channel.ChannelOutputStream;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.DefaultCloseFuture;
@@ -47,6 +49,7 @@ import org.apache.sshd.common.util.Buffer;
import org.apache.sshd.common.util.CloseableUtils;
import org.apache.sshd.common.util.IoUtils;
import org.apache.sshd.common.util.LoggingFilterOutputStream;
+import org.apache.sshd.server.AsyncCommand;
import org.apache.sshd.server.ChannelSessionAware;
import org.apache.sshd.server.Command;
import org.apache.sshd.server.Environment;
@@ -173,6 +176,8 @@ public class ChannelSession extends AbstractServerChannel {
}
protected String type;
+ protected ChannelAsyncOutputStream asyncOut;
+ protected ChannelAsyncOutputStream asyncErr;
protected OutputStream out;
protected OutputStream err;
protected Command command;
@@ -238,6 +243,8 @@ public class ChannelSession extends AbstractServerChannel {
}
remoteWindow.notifyClosed();
IoUtils.closeQuietly(out, err, receiver);
+ // TODO: graceful close ?
+ CloseableUtils.parallel(asyncOut, asyncErr).close(true);
super.doCloseImmediately();
}
@@ -436,7 +443,8 @@ public class ChannelSession extends AbstractServerChannel {
/**
* For {@link Command} to install {@link ChannelDataReceiver}.
- * When you do this, {@link Command#setInputStream(InputStream)}
+ * When you do this, {@link Command#setInputStream(InputStream)} or
+ * {@link org.apache.sshd.server.AsyncCommand#setIoInputStream(org.apache.sshd.common.io.IoInputStream)}
* will no longer be invoked. If you call this method from {@link Command#start(Environment)},
* the input stream you received in {@link Command#setInputStream(InputStream)} will
* not read any data.
@@ -447,7 +455,7 @@ public class ChannelSession extends AbstractServerChannel {
protected void prepareCommand() throws IOException {
// Add the user
- addEnvVariable(Environment.ENV_USER, ((ServerSession) session).getUsername());
+ addEnvVariable(Environment.ENV_USER, session.getUsername());
// If the shell wants to be aware of the session, let's do that
if (command instanceof SessionAware) {
((SessionAware) command).setSession((ServerSession) session);
@@ -460,21 +468,35 @@ public class ChannelSession extends AbstractServerChannel {
FileSystemFactory factory = ((ServerSession) session).getFactoryManager().getFileSystemFactory();
((FileSystemAware) command).setFileSystemView(factory.createFileSystemView(session));
}
- out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
- err = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA);
- if (log != null && log.isTraceEnabled()) {
- // Wrap in logging filters
- out = new LoggingFilterOutputStream(out, "OUT:", log);
- err = new LoggingFilterOutputStream(err, "ERR:", log);
+ // If the shell wants to use non-blocking io
+ if (command instanceof AsyncCommand) {
+ asyncOut = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_DATA);
+ asyncErr = new ChannelAsyncOutputStream(this, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA);
+ ((AsyncCommand) command).setIoOutputStream(asyncOut);
+ ((AsyncCommand) command).setIoErrorStream(asyncErr);
+ } else {
+ out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
+ err = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_EXTENDED_DATA);
+ if (log.isTraceEnabled()) {
+ // Wrap in logging filters
+ out = new LoggingFilterOutputStream(out, "OUT:", log);
+ err = new LoggingFilterOutputStream(err, "ERR:", log);
+ }
+ command.setOutputStream(out);
+ command.setErrorStream(err);
}
- command.setOutputStream(out);
- command.setErrorStream(err);
if (this.receiver==null) {
// if the command hasn't installed any ChannelDataReceiver, install the default
// and give the command an InputStream
- PipeDataReceiver recv = new PipeDataReceiver(localWindow);
- setDataReceiver(recv);
- command.setInputStream(recv.getIn());
+ if (command instanceof AsyncCommand) {
+ AsyncDataReceiver recv = new AsyncDataReceiver(this);
+ setDataReceiver(recv);
+ ((AsyncCommand) command).setIoInputStream(recv.getIn());
+ } else {
+ PipeDataReceiver recv = new PipeDataReceiver(localWindow);
+ setDataReceiver(recv);
+ command.setInputStream(recv.getIn());
+ }
}
if (tempBuffer != null) {
Buffer buffer = tempBuffer;
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
index f43f528..2b878b8 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/x11/X11ForwardSupport.java
@@ -101,7 +101,7 @@ public class X11ForwardSupport extends CloseableUtils.AbstractInnerCloseable imp
}
int displayNumber, port;
- InetSocketAddress addr = null;
+ InetSocketAddress addr;
for (displayNumber = X11_DISPLAY_OFFSET; displayNumber < MAX_DISPLAYS; displayNumber++) {
port = 6000 + displayNumber;
@@ -124,7 +124,7 @@ public class X11ForwardSupport extends CloseableUtils.AbstractInnerCloseable imp
// only support non windows systems
String os = System.getProperty("os.name").toLowerCase();
- if (os.indexOf("windows") < 0) {
+ if (!os.contains("windows")) {
try {
String authDisplay = "unix:" + displayNumber + "." + screen;
Process p = new ProcessBuilder(xauthCommand, "remove", authDisplay).start();
@@ -204,6 +204,9 @@ public class X11ForwardSupport extends CloseableUtils.AbstractInnerCloseable imp
@Override
protected synchronized void doOpen() throws IOException {
+ if (streaming == Streaming.Async) {
+ throw new IllegalArgumentException("Asynchronous streaming isn't supported yet on this channel");
+ }
invertedIn = out = new ChannelOutputStream(this, remoteWindow, log, SshConstants.SSH_MSG_CHANNEL_DATA);
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/58c7a835/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
index a391e9e..b1dfa41 100644
--- a/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/ClientTest.java
@@ -53,6 +53,8 @@ import org.apache.sshd.common.SshConstants;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.forward.TcpipServerChannel;
import org.apache.sshd.common.future.CloseFuture;
+import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoReadFuture;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.io.mina.MinaSession;
@@ -71,6 +73,7 @@ import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
import org.apache.sshd.server.session.ServerConnectionService;
import org.apache.sshd.server.session.ServerSession;
import org.apache.sshd.server.session.ServerUserAuthService;
+import org.apache.sshd.util.AsyncEchoShellFactory;
import org.apache.sshd.util.BaseTest;
import org.apache.sshd.util.BogusPasswordAuthenticator;
import org.apache.sshd.util.BogusPublickeyAuthenticator;
@@ -83,7 +86,6 @@ import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -145,6 +147,11 @@ public class ClientTest extends BaseTest {
}
return super.open(recipient, rwsize, rmpsize, buffer);
}
+
+ @Override
+ public String toString() {
+ return "ChannelSession" + "[id=" + id + ", recipient=" + recipient + "]";
+ }
};
}
},
@@ -161,6 +168,92 @@ public class ClientTest extends BaseTest {
}
@Test
+ public void testAsyncClient() throws Exception {
+ sshd.getProperties().put(SshServer.WINDOW_SIZE, "1024");
+ sshd.setShellFactory(new AsyncEchoShellFactory());
+
+ SshClient client = SshClient.setUpDefaultClient();
+ client.getProperties().put(SshClient.WINDOW_SIZE, "1024");
+ client.start();
+ ClientSession session = client.connect("smx", "localhost", port).await().getSession();
+ session.addPasswordIdentity("smx");
+ session.auth().verify();
+ final ChannelShell channel = session.createShellChannel();
+ channel.setStreaming(ClientChannel.Streaming.Async);
+ channel.open().verify();
+
+
+ final byte[] message = "0123456789\n".getBytes();
+ final int nbMessages = 1000;
+
+ final ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
+ final ByteArrayOutputStream baosErr = new ByteArrayOutputStream();
+ final AtomicInteger writes = new AtomicInteger(nbMessages);
+
+ channel.getAsyncIn().write(new Buffer(message))
+ .addListener(new SshFutureListener<IoWriteFuture>() {
+ public void operationComplete(IoWriteFuture future) {
+ try {
+ if (future.isWritten()) {
+ if (writes.decrementAndGet() > 0) {
+ channel.getAsyncIn().write(new Buffer(message)).addListener(this);
+ } else {
+ channel.getAsyncIn().close(false);
+ }
+ } else {
+ throw new SshException("Error writing", future.getException());
+ }
+ } catch (IOException e) {
+ if (!channel.isClosing()) {
+ e.printStackTrace();
+ channel.close(true);
+ }
+ }
+ }
+ });
+ channel.getAsyncOut().read(new Buffer())
+ .addListener(new SshFutureListener<IoReadFuture>() {
+ public void operationComplete(IoReadFuture future) {
+ try {
+ future.verify();
+ Buffer buffer = future.getBuffer();
+ baosOut.write(buffer.array(), buffer.rpos(), buffer.available());
+ buffer.rpos(buffer.rpos() + buffer.available());
+ buffer.compact();
+ channel.getAsyncOut().read(buffer).addListener(this);
+ } catch (IOException e) {
+ if (!channel.isClosing()) {
+ e.printStackTrace();
+ channel.close(true);
+ }
+ }
+ }
+ });
+ channel.getAsyncErr().read(new Buffer())
+ .addListener(new SshFutureListener<IoReadFuture>() {
+ public void operationComplete(IoReadFuture future) {
+ try {
+ future.verify();
+ Buffer buffer = future.getBuffer();
+ baosErr.write(buffer.array(), buffer.rpos(), buffer.available());
+ buffer.rpos(buffer.rpos() + buffer.available());
+ buffer.compact();
+ channel.getAsyncErr().read(buffer).addListener(this);
+ } catch (IOException e) {
+ if (!channel.isClosing()) {
+ e.printStackTrace();
+ channel.close(true);
+ }
+ }
+ }
+ });
+
+ channel.waitFor(ClientChannel.CLOSED, 0);
+
+ assertEquals(nbMessages * message.length, baosOut.size());
+ }
+
+ @Test
public void testCommandDeadlock() throws Exception {
SshClient client = SshClient.setUpDefaultClient();
client.start();