You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2016/02/21 12:39:48 UTC

[1/5] mina-sshd git commit: Separate channel close packet write success/failure to specific methods

Repository: mina-sshd
Updated Branches:
  refs/heads/master 5c9a90274 -> 751ea6a71


Separate channel close packet write success/failure to specific methods


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/845fc3c8
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/845fc3c8
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/845fc3c8

Branch: refs/heads/master
Commit: 845fc3c8f866a4c70fc1f065b9d0237721ad0005
Parents: 5c9a902
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Sun Feb 21 12:21:19 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Sun Feb 21 12:21:19 2016 +0200

----------------------------------------------------------------------
 .../sshd/common/channel/AbstractChannel.java    | 49 +++++++++++++-------
 1 file changed, 31 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/845fc3c8/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index e560e54..8a78d6e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -500,28 +500,12 @@ public abstract class AbstractChannel
                 try {
                     long timeout = PropertyResolverUtils.getLongProperty(channel, FactoryManager.CHANNEL_CLOSE_TIMEOUT, FactoryManager.DEFAULT_CHANNEL_CLOSE_TIMEOUT);
                     s.writePacket(buffer, timeout, TimeUnit.MILLISECONDS).addListener(new SshFutureListener<IoWriteFuture>() {
-                        @SuppressWarnings("synthetic-access")
                         @Override
                         public void operationComplete(IoWriteFuture future) {
                             if (future.isWritten()) {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("close({})[immediately={}] SSH_MSG_CHANNEL_CLOSE written on channel", channel, immediately);
-                                }
-                                if (gracefulState.compareAndSet(GracefulState.Opened, GracefulState.CloseSent)) {
-                                    // Waiting for CLOSE message to come back from the remote side
-                                } else if (gracefulState.compareAndSet(GracefulState.CloseReceived, GracefulState.Closed)) {
-                                    gracefulFuture.setClosed();
-                                }
+                                handleClosePacketWritten(channel, immediately);
                             } else {
-                                Throwable t = future.getException();
-                                if (log.isDebugEnabled()) {
-                                    log.debug("close({})[immediately={}] failed ({}) to write SSH_MSG_CHANNEL_CLOSE on channel: {}",
-                                              channel, immediately, t.getClass().getSimpleName(), t.getMessage());
-                                }
-                                if (log.isTraceEnabled()) {
-                                    log.trace("close(" + channel + ") SSH_MSG_CHANNEL_CLOSE failure details", t);
-                                }
-                                channel.close(true);
+                                handleClosePacketWriteFailure(channel, immediately, future.getException());
                             }
                         }
                     });
@@ -549,6 +533,35 @@ public abstract class AbstractChannel
 
             return gracefulFuture;
         }
+
+        protected void handleClosePacketWritten(Channel channel, boolean immediately) {
+            if (log.isDebugEnabled()) {
+                log.debug("handleClosePacketWritten({})[immediately={}] SSH_MSG_CHANNEL_CLOSE written on channel",
+                          channel, immediately);
+            }
+
+            if (gracefulState.compareAndSet(GracefulState.Opened, GracefulState.CloseSent)) {
+                // Waiting for CLOSE message to come back from the remote side
+            } else if (gracefulState.compareAndSet(GracefulState.CloseReceived, GracefulState.Closed)) {
+                gracefulFuture.setClosed();
+            }
+        }
+
+        protected void handleClosePacketWriteFailure(Channel channel, boolean immediately, Throwable t) {
+            if (log.isDebugEnabled()) {
+                log.debug("handleClosePacketWriteFailure({})[immediately={}] failed ({}) to write SSH_MSG_CHANNEL_CLOSE on channel: {}",
+                          this, immediately, t.getClass().getSimpleName(), t.getMessage());
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("handleClosePacketWriteFailure(" + channel + ") SSH_MSG_CHANNEL_CLOSE failure details", t);
+            }
+            channel.close(true);
+        }
+
+        @Override
+        public String toString() {
+            return getClass().getSimpleName() + "[" + AbstractChannel.this + "]";
+        }
     }
 
     @Override


[5/5] mina-sshd git commit: Expose IoWriteFuture as return value wherever applicable

Posted by lg...@apache.org.
Expose IoWriteFuture as return value wherever applicable


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/751ea6a7
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/751ea6a7
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/751ea6a7

Branch: refs/heads/master
Commit: 751ea6a718d383042e15b013bc8ad08f139d93d2
Parents: 51a767b
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Sun Feb 21 13:40:27 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Sun Feb 21 13:40:27 2016 +0200

----------------------------------------------------------------------
 .../client/auth/password/UserAuthPassword.java  |   7 +-
 .../client/session/ClientConnectionService.java |  18 +++-
 .../org/apache/sshd/common/FactoryManager.java  |  11 ++
 .../sshd/common/channel/AbstractChannel.java    |  25 +++--
 .../common/forward/DefaultTcpipForwarder.java   |   2 +-
 .../session/AbstractConnectionService.java      |  16 ++-
 .../sshd/common/session/AbstractSession.java    | 103 ++++++++++++-------
 .../org/apache/sshd/common/session/Session.java |   3 +-
 .../sshd/server/auth/gss/UserAuthGSS.java       |   1 -
 .../sshd/server/channel/ChannelSession.java     |  15 +--
 .../sshd/common/auth/AuthenticationTest.java    |   7 +-
 11 files changed, 144 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/751ea6a7/sshd-core/src/main/java/org/apache/sshd/client/auth/password/UserAuthPassword.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/auth/password/UserAuthPassword.java b/sshd-core/src/main/java/org/apache/sshd/client/auth/password/UserAuthPassword.java
index 5ab7ebf..37f3ba5 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/auth/password/UserAuthPassword.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/auth/password/UserAuthPassword.java
@@ -27,6 +27,7 @@ import org.apache.sshd.client.auth.keyboard.UserInteraction;
 import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.common.RuntimeSshException;
 import org.apache.sshd.common.SshConstants;
+import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 
@@ -125,9 +126,11 @@ public class UserAuthPassword extends AbstractUserAuth {
      * @param session The target {@link ClientSession}
      * @param oldPassword The previous password
      * @param newPassword The new password
+     * @return An {@link IoWriteFuture} that can be used to wait and check
+     * on the success/failure of the request packet being sent
      * @throws IOException If failed to send the message.
      */
-    protected void sendPassword(Buffer buffer, ClientSession session, String oldPassword, String newPassword) throws IOException {
+    protected IoWriteFuture sendPassword(Buffer buffer, ClientSession session, String oldPassword, String newPassword) throws IOException {
         String username = session.getUsername();
         String service = getService();
         String name = getName();
@@ -150,6 +153,6 @@ public class UserAuthPassword extends AbstractUserAuth {
         if (modified) {
             buffer.putString(newPassword);
         }
-        session.writePacket(buffer);
+        return session.writePacket(buffer);
     }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/751ea6a7/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
index fff23fd..5e2677e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
@@ -28,6 +28,8 @@ import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.io.AbstractIoWriteFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.session.AbstractConnectionService;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.server.x11.X11ForwardSupport;
@@ -74,18 +76,30 @@ public class ClientConnectionService extends AbstractConnectionService<AbstractC
         }
     }
 
-    protected void sendHeartBeat() {
+    /**
+     * Sends a heartbeat message
+     * @return The {@link IoWriteFuture} that can be used to wait for the
+     * message write completion
+     */
+    protected IoWriteFuture sendHeartBeat() {
         ClientSession session = getClientSession();
         String request = PropertyResolverUtils.getStringProperty(session, ClientFactoryManager.HEARTBEAT_REQUEST, ClientFactoryManager.DEFAULT_KEEP_ALIVE_HEARTBEAT_STRING);
         try {
             Buffer buf = session.createBuffer(SshConstants.SSH_MSG_GLOBAL_REQUEST, request.length() + Byte.SIZE);
             buf.putString(request);
             buf.putBoolean(false);
-            session.writePacket(buf);
+            return session.writePacket(buf);
         } catch (IOException e) {
             if (log.isDebugEnabled()) {
                 log.debug("Error (" + e.getClass().getSimpleName() + ") sending keepalive message=" + request + ": " + e.getMessage());
             }
+
+            final Throwable t = e;
+            return new AbstractIoWriteFuture(null) {
+                {
+                    setValue(t);
+                }
+            };
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/751ea6a7/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
index bc9dd14..9ab1f7e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/FactoryManager.java
@@ -216,6 +216,17 @@ public interface FactoryManager
     String DEFAULT_VERSION = "SSHD-UNKNOWN";
 
     /**
+     * Maximum allowed size of the initial identification text sent during
+     * the handshake
+     */
+    String MAX_IDENTIFICATION_SIZE = "max-identification-size";
+
+    /**
+     * Default value for {@link #MAX_IDENTIFICATION_SIZE} if none set
+     */
+    int DEFAULT_MAX_IDENTIFICATION_SIZE = 16 * 1024;
+
+    /**
      * Key re-exchange will be automatically performed after the session
      * has sent or received the given amount of bytes. If non-positive,
      * then disabled. The default value is {@link #DEFAULT_REKEY_BYTES_LIMIT}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/751ea6a7/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 8a78d6e..afced68 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -18,6 +18,7 @@
  */
 package org.apache.sshd.common.channel;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -41,6 +42,7 @@ import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.DefaultCloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.AbstractIoWriteFuture;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.session.ConnectionService;
 import org.apache.sshd.common.session.Session;
@@ -313,13 +315,17 @@ public abstract class AbstractChannel
         return RequestHandler.Result.Unsupported;
     }
 
-    protected void sendResponse(Buffer buffer, String req, RequestHandler.Result result, boolean wantReply) throws IOException {
+    protected IoWriteFuture sendResponse(Buffer buffer, String req, RequestHandler.Result result, boolean wantReply) throws IOException {
         if (log.isDebugEnabled()) {
             log.debug("sendResponse({}) request={} result={}, want-reply={}", this, req, result, wantReply);
         }
 
         if (RequestHandler.Result.Replied.equals(result) || (!wantReply)) {
-            return;
+            return new AbstractIoWriteFuture(null) {
+                {
+                    setValue(Boolean.TRUE);
+                }
+            };
         }
 
         byte cmd = RequestHandler.Result.ReplySuccess.equals(result)
@@ -328,7 +334,7 @@ public abstract class AbstractChannel
         Session session = getSession();
         Buffer rsp = session.createBuffer(cmd, Integer.SIZE / Byte.SIZE);
         rsp.putInt(recipient);
-        session.writePacket(rsp);
+        return session.writePacket(rsp);
     }
 
     @Override
@@ -616,12 +622,19 @@ public abstract class AbstractChannel
         super.doCloseImmediately();
     }
 
-    protected void writePacket(Buffer buffer) throws IOException {
+    protected IoWriteFuture writePacket(Buffer buffer) throws IOException {
         if (!isClosing()) {
             Session s = getSession();
-            s.writePacket(buffer);
+            return s.writePacket(buffer);
         } else {
-            log.debug("writePacket({}) Discarding output packet because channel is being closed", this);
+            if (log.isDebugEnabled()) {
+                log.debug("writePacket({}) Discarding output packet because channel is being closed", this);
+            }
+            return new AbstractIoWriteFuture(null) {
+                {
+                    setValue(new EOFException("Channel is being closed"));
+                }
+            };
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/751ea6a7/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
index 905a16d..b8aa438 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/forward/DefaultTcpipForwarder.java
@@ -188,7 +188,7 @@ public class DefaultTcpipForwarder
         buffer.putInt(remotePort);
 
         long timeout = PropertyResolverUtils.getLongProperty(session, FORWARD_REQUEST_TIMEOUT, DEFAULT_FORWARD_REQUEST_TIMEOUT);
-        Buffer result = session.request(buffer, timeout, TimeUnit.MILLISECONDS);
+        Buffer result = session.request("tcpip-forward", buffer, timeout, TimeUnit.MILLISECONDS);
         if (result == null) {
             throw new SshException("Tcpip forwarding request denied by server");
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/751ea6a7/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
index b3696fc..7f26424 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractConnectionService.java
@@ -45,6 +45,8 @@ import org.apache.sshd.common.channel.Window;
 import org.apache.sshd.common.forward.TcpipForwarder;
 import org.apache.sshd.common.forward.TcpipForwarderFactory;
 import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.AbstractIoWriteFuture;
+import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.Int2IntFunction;
 import org.apache.sshd.common.util.ValidateUtils;
@@ -555,7 +557,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> exten
         });
     }
 
-    protected void sendChannelOpenFailure(Buffer buffer, int sender, int reasonCode, String message, String lang) throws IOException {
+    protected IoWriteFuture sendChannelOpenFailure(Buffer buffer, int sender, int reasonCode, String message, String lang) throws IOException {
         if (log.isDebugEnabled()) {
             log.debug("sendChannelOpenFailure({}) sender={}, reason={}, lang={}, message='{}'",
                       this, sender, SshConstants.getOpenErrorCodeName(reasonCode), lang, message);
@@ -568,7 +570,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> exten
         buf.putInt(reasonCode);
         buf.putString(message);
         buf.putString(lang);
-        session.writePacket(buf);
+        return session.writePacket(buf);
     }
 
     /**
@@ -624,13 +626,17 @@ public abstract class AbstractConnectionService<S extends AbstractSession> exten
         sendGlobalResponse(buffer, req, RequestHandler.Result.Unsupported, wantReply);
     }
 
-    protected void sendGlobalResponse(Buffer buffer, String req, RequestHandler.Result result, boolean wantReply) throws IOException {
+    protected IoWriteFuture sendGlobalResponse(Buffer buffer, String req, RequestHandler.Result result, boolean wantReply) throws IOException {
         if (log.isDebugEnabled()) {
             log.debug("sendGlobalResponse({})[{}] result={}, want-reply={}", this, req, result, wantReply);
         }
 
         if (RequestHandler.Result.Replied.equals(result) || (!wantReply)) {
-            return;
+            return new AbstractIoWriteFuture(null) {
+                {
+                    setValue(Boolean.TRUE);
+                }
+            };
         }
 
         byte cmd = RequestHandler.Result.ReplySuccess.equals(result)
@@ -638,7 +644,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession> exten
                  : SshConstants.SSH_MSG_REQUEST_FAILURE;
         Session session = getSession();
         Buffer rsp = session.createBuffer(cmd, 2);
-        session.writePacket(rsp);
+        return session.writePacket(rsp);
     }
 
     protected void requestSuccess(Buffer buffer) throws Exception {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/751ea6a7/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index 967650f..a7c7242 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -517,7 +517,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
             Pair<String, String> result = comparePreferredKexProposalOption(option);
             if (result != null) {
                 if (log.isDebugEnabled()) {
-                    log.debug("handleKexMessage({})[{}] 1st follow KEX packet {} option mismatch: client={}, server={}",
+                    log.debug("handleFirstKexPacketFollows({})[{}] 1st follow KEX packet {} option mismatch: client={}, server={}",
                               this, SshConstants.getCommandMessageName(cmd), option, result.getFirst(), result.getSecond());
                 }
                 return false;
@@ -574,6 +574,10 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         if (log.isDebugEnabled()) {
             log.debug("handleUnimplented({}) SSH_MSG_UNIMPLEMENTED #{}", this, seqNo);
         }
+
+        if (log.isTraceEnabled()) {
+            log.trace("handleUnimplemented({}) data: {}", this, buffer.toHex());
+        }
     }
 
     protected void handleDebug(Buffer buffer) throws Exception {
@@ -611,7 +615,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         validateKexState(SshConstants.SSH_MSG_SERVICE_REQUEST, KexState.DONE);
         try {
             startService(serviceName);
-        } catch (Exception e) {
+        } catch (Throwable e) {
             if (log.isDebugEnabled()) {
                 log.debug("handleServiceRequest({}) Service {} rejected: {} = {}",
                           this, serviceName, e.getClass().getSimpleName(), e.getMessage());
@@ -847,15 +851,6 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         throw new IllegalStateException("Attempted to access unknown service " + clazz.getSimpleName());
     }
 
-    /**
-     * Encode and send the given buffer.
-     * The buffer has to have 5 bytes free at the beginning to allow the encoding to take place.
-     * Also, the write position of the buffer has to be set to the position of the last byte to write.
-     *
-     * @param buffer the buffer to encode and send
-     * @return a future that can be used to check when the packet has actually been sent
-     * @throws IOException if an error occurred when encoding sending the packet
-     */
     @Override
     public IoWriteFuture writePacket(Buffer buffer) throws IOException {
         // While exchanging key, queue high level packets
@@ -981,12 +976,16 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
     }
 
     @Override
-    public Buffer request(Buffer buffer, long timeout, TimeUnit unit) throws IOException {
+    public Buffer request(String request, Buffer buffer, long timeout, TimeUnit unit) throws IOException {
         ValidateUtils.checkTrue(timeout > 0L, "Non-positive timeout requested: %d", timeout);
 
         long maxWaitMillis = TimeUnit.MILLISECONDS.convert(timeout, unit);
         if (maxWaitMillis <= 0L) {
-            throw new IllegalArgumentException("Requested timeout below 1 msec: " + timeout + " " + unit);
+            throw new IllegalArgumentException("Requested timeout for " + request + " below 1 msec: " + timeout + " " + unit);
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("request({}) request={}, timeout={} {}", this, request, timeout, unit);
         }
 
         Object result;
@@ -1011,16 +1010,21 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
                     result = requestResult.getAndSet(null);
                 }
             } catch (InterruptedException e) {
-                throw (InterruptedIOException) new InterruptedIOException("Interrupted while waiting for request result").initCause(e);
+                throw (InterruptedIOException) new InterruptedIOException("Interrupted while waiting for request=" + request + " result").initCause(e);
             }
         }
 
         if (!isOpen()) {
-            throw new IOException("Session is closed or closing");
+            throw new IOException("Session is closed or closing while awaiting reply for request=" + request);
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("request({}) request={}, timeout={} {}, result received={}",
+                      this, request, timeout, unit, result != null);
         }
 
         if (result == null) {
-            throw new SocketTimeoutException("No response received after " + timeout + " " + unit);
+            throw new SocketTimeoutException("No response received after " + timeout + " " + unit + " for request=" + request);
         }
 
         if (result instanceof Buffer) {
@@ -1286,10 +1290,10 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
      * @param ident our identification to send
      */
     protected void sendIdentification(String ident) {
+        byte[] data = (ident + "\r\n").getBytes(StandardCharsets.UTF_8);
         if (log.isDebugEnabled()) {
             log.debug("sendIdentification({}): {}", this, ident);
         }
-        byte[] data = (ident + "\r\n").getBytes(StandardCharsets.UTF_8);
         ioSession.write(new ByteArrayBuffer(data));
     }
 
@@ -1318,8 +1322,15 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
      * @return the remote identification or {@code null} if more data is needed
      */
     protected String doReadIdentification(Buffer buffer, boolean server) {
-        byte[] data = new byte[256];
-        for (;;) {
+        int maxIdentSize = PropertyResolverUtils.getIntProperty(this,
+                FactoryManager.MAX_IDENTIFICATION_SIZE, FactoryManager.DEFAULT_MAX_IDENTIFICATION_SIZE);
+        /*
+         * see https://tools.ietf.org/html/rfc4253 section 4.2
+         *
+         *      The maximum length of the string is 255 characters,
+         *      including the Carriage Return and Line Feed.
+         */
+        for (byte[] data = new byte[256];;) {
             int rpos = buffer.rpos();
             int pos = 0;
             boolean needLf = false;
@@ -1345,12 +1356,13 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
                 }
                 data[pos++] = b;
             }
-            String str = new String(data, 0, pos);
+
+            String str = new String(data, 0, pos, StandardCharsets.UTF_8);
             if (server || str.startsWith("SSH-")) {
                 return str;
             }
-            if (buffer.rpos() > (16 * 1024)) {
-                throw new IllegalStateException("Incorrect identification: too many header lines");
+            if (buffer.rpos() > maxIdentSize) {
+                throw new IllegalStateException("Incorrect identification: too many header lines - size > " + maxIdentSize);
             }
         }
     }
@@ -1407,13 +1419,14 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
             random.fill(buffer.array(), p, SshConstants.MSG_KEX_COOKIE_SIZE);
         }
         if (log.isTraceEnabled()) {
-            log.trace("sendKexInit(" + toString() + ") cookie=" + BufferUtils.toHex(buffer.array(), p, SshConstants.MSG_KEX_COOKIE_SIZE, ':'));
+            log.trace("sendKexInit({}) cookie={}",
+                      this, BufferUtils.toHex(buffer.array(), p, SshConstants.MSG_KEX_COOKIE_SIZE, ':'));
         }
 
         for (KexProposalOption paramType : KexProposalOption.VALUES) {
             String s = proposal.get(paramType);
             if (log.isTraceEnabled()) {
-                log.trace("sendKexInit(" + toString() + ")[" + paramType.getDescription() + "] " + s);
+                log.trace("sendKexInit(}|)[{}] {}", this, paramType.getDescription(), s);
             }
             buffer.putString(GenericUtils.trimToEmpty(s));
         }
@@ -1446,7 +1459,8 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         buffer.rpos(cookieStartPos + SshConstants.MSG_KEX_COOKIE_SIZE);
         size += SshConstants.MSG_KEX_COOKIE_SIZE;
         if (log.isTraceEnabled()) {
-            log.trace("receiveKexInit(" + toString() + ") cookie=" + BufferUtils.toHex(d, cookieStartPos, SshConstants.MSG_KEX_COOKIE_SIZE, ':'));
+            log.trace("receiveKexInit({}) cookie={}",
+                      this, BufferUtils.toHex(d, cookieStartPos, SshConstants.MSG_KEX_COOKIE_SIZE, ':'));
         }
 
         // Read proposal
@@ -1454,7 +1468,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
             int lastPos = buffer.rpos();
             String value = buffer.getString();
             if (log.isTraceEnabled()) {
-                log.trace("receiveKexInit(" + toString() + ")[" + paramType.getDescription() + "] " + value);
+                log.trace("receiveKexInit({})[{}] {}", this, paramType.getDescription(), value);
             }
             int curPos = buffer.rpos();
             int readLen = curPos - lastPos;
@@ -1464,13 +1478,13 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
 
         firstKexPacketFollows = buffer.getBoolean();
         if (log.isTraceEnabled()) {
-            log.trace("receiveKexInit(" + toString() + ") first kex packet follows: " + firstKexPacketFollows);
+            log.trace("receiveKexInit({}) first kex packet follows: {}", this, firstKexPacketFollows);
         }
 
         long reserved = buffer.getUInt();
         if (reserved != 0) {
             if (log.isTraceEnabled()) {
-                log.trace("receiveKexInit(" + toString() + ") non-zero reserved value: " + reserved);
+                log.trace("receiveKexInit({}) non-zero reserved value: {}", this, reserved);
             }
         }
 
@@ -1483,14 +1497,16 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
     /**
      * Send a message to put new keys into use.
      *
+     * @return An {@link IoWriteFuture} that can be used to wait and
+     * check the result of sending the packet
      * @throws IOException if an error occurs sending the message
      */
-    protected void sendNewKeys() throws IOException {
+    protected IoWriteFuture sendNewKeys() throws IOException {
         if (log.isDebugEnabled()) {
             log.debug("sendNewKeys({}) Send SSH_MSG_NEWKEYS", this);
         }
         Buffer buffer = createBuffer(SshConstants.SSH_MSG_NEWKEYS, Byte.SIZE);
-        writePacket(buffer);
+        return writePacket(buffer);
     }
 
     /**
@@ -1517,11 +1533,12 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         buffer.putRawBytes(h);
         buffer.putByte((byte) 0x41);
         buffer.putRawBytes(sessionId);
+
         int pos = buffer.available();
         byte[] buf = buffer.array();
         hash.update(buf, 0, pos);
-        byte[] iv_c2s = hash.digest();
 
+        byte[] iv_c2s = hash.digest();
         int j = pos - sessionId.length - 1;
 
         buf[j]++;
@@ -1664,6 +1681,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         buffer.putInt(reason);
         buffer.putString(msg);
         buffer.putString("");   // TODO configure language...
+
         // Write the packet with a timeout to ensure a timely close of the session
         // in case the consumer does not read packets anymore.
         long disconnectTimeoutMs = PropertyResolverUtils.getLongProperty(this, FactoryManager.DISCONNECT_TIMEOUT, FactoryManager.DEFAULT_DISCONNECT_TIMEOUT);
@@ -1699,12 +1717,25 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
      * sequence id of the unsupported packet: this number is assumed to
      * be the last packet received.
      *
+     * @return An {@link IoWriteFuture} that can be used to wait for packet write completion
+     * @throws IOException if an error occurred sending the packet
+     * @see #sendNotImplemented(long)
+     */
+    protected IoWriteFuture notImplemented() throws IOException {
+        return sendNotImplemented(seqi - 1);
+    }
+
+    /**
+     * Sends a {@code SSH_MSG_UNIMPLEMENTED} message
+     *
+     * @param seqNoValue The referenced sequence number
+     * @return An {@link IoWriteFuture} that can be used to wait for packet write completion
      * @throws IOException if an error occurred sending the packet
      */
-    protected void notImplemented() throws IOException {
+    protected IoWriteFuture sendNotImplemented(long seqNoValue) throws IOException {
         Buffer buffer = createBuffer(SshConstants.SSH_MSG_UNIMPLEMENTED, Byte.SIZE);
-        buffer.putInt(seqi - 1);
-        writePacket(buffer);
+        buffer.putInt(seqNoValue);
+        return writePacket(buffer);
     }
 
     /**
@@ -1743,14 +1774,14 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
                 // OK if could not negotiate languages
                 if (KexProposalOption.S2CLANG.equals(paramType) || KexProposalOption.C2SLANG.equals(paramType)) {
                     if (log.isTraceEnabled()) {
-                        log.trace(message);
+                        log.trace("negotiate({}) {}", this, message);
                     }
                 } else {
                     throw new IllegalStateException(message);
                 }
             } else {
                 if (log.isTraceEnabled()) {
-                    log.trace("Kex: negotiate(" + paramType.getDescription() + ") guess=" + value
+                    log.trace("negotiate(" + this + ")[" + paramType.getDescription() + "] guess=" + value
                             + " (client: " + clientParamValue + " / server: " + serverParamValue + ")");
                 }
             }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/751ea6a7/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
index 43d0f7c..94f08b4 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
@@ -182,13 +182,14 @@ public interface Session
      * Send a global request and wait for the response. This must only be used when sending
      * a {@code SSH_MSG_GLOBAL_REQUEST} with a result expected, else it will time out
      *
+     * @param request the request name - used mainly for logging and debugging
      * @param buffer the buffer containing the global request
      * @param timeout The number of time units to wait - must be <U>positive</U>
      * @param unit The {@link TimeUnit} to wait for the response
      * @return the return buffer if the request was successful, {@code null} otherwise.
      * @throws IOException if an error occurred when encoding sending the packet
      */
-    Buffer request(Buffer buffer, long timeout, TimeUnit unit) throws IOException;
+    Buffer request(String request, Buffer buffer, long timeout, TimeUnit unit) throws IOException;
 
     /**
      * Handle any exceptions that occurred on this session.

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/751ea6a7/sshd-core/src/main/java/org/apache/sshd/server/auth/gss/UserAuthGSS.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/auth/gss/UserAuthGSS.java b/sshd-core/src/main/java/org/apache/sshd/server/auth/gss/UserAuthGSS.java
index 68116ec..1687cd1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/auth/gss/UserAuthGSS.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/auth/gss/UserAuthGSS.java
@@ -174,7 +174,6 @@ public class UserAuthGSS extends AbstractUserAuth {
 
                 if (NumberUtils.length(out) > 0) {
                     Buffer b = session.createBuffer(SshConstants.SSH_MSG_USERAUTH_INFO_RESPONSE, out.length + Integer.SIZE);
-
                     b.putBytes(out);
                     session.writePacket(b);
                     return null;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/751ea6a7/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index 45a8f4f..61241e1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -54,6 +54,7 @@ import org.apache.sshd.common.file.FileSystemFactory;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.DefaultCloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
+import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
@@ -332,32 +333,31 @@ public class ChannelSession extends AbstractServerChannel {
     }
 
     @Override
-    protected void sendResponse(Buffer buffer, String req, Result result, boolean wantReply) throws IOException {
-        super.sendResponse(buffer, req, result, wantReply);
-
+    protected IoWriteFuture sendResponse(Buffer buffer, String req, Result result, boolean wantReply) throws IOException {
+        IoWriteFuture future = super.sendResponse(buffer, req, result, wantReply);
         if (!RequestHandler.Result.ReplySuccess.equals(result)) {
-            return;
+            return future;
         }
 
         if (commandInstance == null) {
             if (log.isDebugEnabled()) {
                 log.debug("sendResponse({}) request={} no pending command", this, req);
             }
-            return; // no pending command to activate
+            return future; // no pending command to activate
         }
 
         if (!Objects.equals(this.type, req)) {
             if (log.isDebugEnabled()) {
                 log.debug("sendResponse({}) request={} mismatched channel type: {}", this, req, this.type);
             }
-            return; // request does not match the current channel type
+            return future; // request does not match the current channel type
         }
 
         if (commandStarted.getAndSet(true)) {
             if (log.isDebugEnabled()) {
                 log.debug("sendResponse({}) request={} pending command already started", this, req);
             }
-            return;
+            return future;
         }
 
         // TODO - consider if (Channel.CHANNEL_SHELL.equals(req) || Channel.CHANNEL_EXEC.equals(req) || Channel.CHANNEL_SUBSYSTEM.equals(req)) {
@@ -365,6 +365,7 @@ public class ChannelSession extends AbstractServerChannel {
             log.debug("sendResponse({}) request={} activate command", this, req);
         }
         commandInstance.start(getEnvironment());
+        return future;
     }
 
     protected RequestHandler.Result handleEnv(Buffer buffer, boolean wantReply) throws IOException {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/751ea6a7/sshd-core/src/test/java/org/apache/sshd/common/auth/AuthenticationTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/common/auth/AuthenticationTest.java b/sshd-core/src/test/java/org/apache/sshd/common/auth/AuthenticationTest.java
index a3de28f..09de71d 100644
--- a/sshd-core/src/test/java/org/apache/sshd/common/auth/AuthenticationTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/common/auth/AuthenticationTest.java
@@ -49,6 +49,7 @@ import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.config.keys.KeyUtils;
 import org.apache.sshd.common.io.IoSession;
+import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.keyprovider.KeyPairProvider;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.session.SessionListener;
@@ -226,14 +227,14 @@ public class AuthenticationTest extends BaseTestSupport {
                     public org.apache.sshd.client.auth.password.UserAuthPassword create() {
                         return new org.apache.sshd.client.auth.password.UserAuthPassword() {
                             @Override
-                            protected void sendPassword(Buffer buffer, ClientSession session, String oldPassword, String newPassword) throws IOException {
+                            protected IoWriteFuture sendPassword(Buffer buffer, ClientSession session, String oldPassword, String newPassword) throws IOException {
                                 int count = sentCount.incrementAndGet();
                                 // 1st one is the original one (which is denied by the server)
                                 // 2nd one is the updated one retrieved from the user interaction
                                 if (count == 2) {
-                                    super.sendPassword(buffer, session, getClass().getName(), newPassword);
+                                    return super.sendPassword(buffer, session, getClass().getName(), newPassword);
                                 } else {
-                                    super.sendPassword(buffer, session, oldPassword, newPassword);
+                                    return super.sendPassword(buffer, session, oldPassword, newPassword);
                                 }
                             }
                         };


[4/5] mina-sshd git commit: Made members and methods of BufferedIoOutputStream protected to allow implementation overrides

Posted by lg...@apache.org.
Made members and methods of BufferedIoOutputStream protected to allow implementation overrides


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/51a767b0
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/51a767b0
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/51a767b0

Branch: refs/heads/master
Commit: 51a767b07684e988fa10ffb2bc3b0cd074d3a3db
Parents: 6f246ac
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Sun Feb 21 12:23:38 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Sun Feb 21 12:23:38 2016 +0200

----------------------------------------------------------------------
 .../sshd/common/channel/BufferedIoOutputStream.java | 16 +++++++---------
 1 file changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/51a767b0/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
index b035c1d..d496e24 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/BufferedIoOutputStream.java
@@ -31,13 +31,12 @@ import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.closeable.AbstractInnerCloseable;
 
 /**
- * An IoOutputStream capable of queuing write requests
+ * An {@link IoOutputStream} capable of queuing write requests
  */
 public class BufferedIoOutputStream extends AbstractInnerCloseable implements IoOutputStream {
-
-    private final IoOutputStream out;
-    private final Queue<IoWriteFutureImpl> writes = new ConcurrentLinkedQueue<IoWriteFutureImpl>();
-    private final AtomicReference<IoWriteFutureImpl> currentWrite = new AtomicReference<IoWriteFutureImpl>();
+    protected final IoOutputStream out;
+    protected final Queue<IoWriteFutureImpl> writes = new ConcurrentLinkedQueue<IoWriteFutureImpl>();
+    protected final AtomicReference<IoWriteFutureImpl> currentWrite = new AtomicReference<IoWriteFutureImpl>();
 
     public BufferedIoOutputStream(IoOutputStream out) {
         this.out = out;
@@ -45,7 +44,7 @@ public class BufferedIoOutputStream extends AbstractInnerCloseable implements Io
 
     @Override
     public IoWriteFuture write(Buffer buffer) {
-        final IoWriteFutureImpl future = new IoWriteFutureImpl(buffer);
+        IoWriteFutureImpl future = new IoWriteFutureImpl(buffer);
         if (isClosing()) {
             future.setValue(new IOException("Closed"));
         } else {
@@ -55,7 +54,7 @@ public class BufferedIoOutputStream extends AbstractInnerCloseable implements Io
         return future;
     }
 
-    private void startWriting() {
+    protected void startWriting() {
         final IoWriteFutureImpl future = writes.peek();
         if (future != null) {
             if (currentWrite.compareAndSet(null, future)) {
@@ -70,7 +69,6 @@ public class BufferedIoOutputStream extends AbstractInnerCloseable implements Io
                         finishWrite();
                     }
 
-                    @SuppressWarnings("synthetic-access")
                     private void finishWrite() {
                         writes.remove(future);
                         currentWrite.compareAndSet(future, null);
@@ -91,6 +89,6 @@ public class BufferedIoOutputStream extends AbstractInnerCloseable implements Io
 
     @Override
     public String toString() {
-        return "BufferedIoOutputStream[" + out + "]";
+        return getClass().getSimpleName() + "[" + out + "]";
     }
 }


[2/5] mina-sshd git commit: Separate TcpipServerChannel data writing success/failure handling to different methods

Posted by lg...@apache.org.
Separate TcpipServerChannel data writing success/failure handling to different methods


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/d421ac4a
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/d421ac4a
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/d421ac4a

Branch: refs/heads/master
Commit: d421ac4a8382b98ef2d3309aebdb4f7282dbe8f9
Parents: 845fc3c
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Sun Feb 21 12:22:14 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Sun Feb 21 12:22:14 2016 +0200

----------------------------------------------------------------------
 .../sshd/server/forward/TcpipServerChannel.java | 59 +++++++++++---------
 1 file changed, 34 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d421ac4a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 4b66a24..d558d74 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -52,7 +52,6 @@ import org.apache.sshd.common.util.net.SshdSocketAddress;
 import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
 import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.sshd.server.channel.AbstractServerChannel;
-import org.apache.sshd.server.channel.ServerChannel;
 
 /**
  * TODO Add javadoc
@@ -348,35 +347,14 @@ public class TcpipServerChannel extends AbstractServerChannel {
     @Override
     protected void doWriteData(byte[] data, int off, final int len) throws IOException {
         // Make sure we copy the data as the incoming buffer may be reused
-        Buffer buf = ByteArrayBuffer.getCompactClone(data, off, len);
-        final ServerChannel channel = this;
+        final Buffer buf = ByteArrayBuffer.getCompactClone(data, off, len);
         ioSession.write(buf).addListener(new SshFutureListener<IoWriteFuture>() {
             @Override
-            @SuppressWarnings("synthetic-access")
             public void operationComplete(IoWriteFuture future) {
-                Session session = getSession();
                 if (future.isWritten()) {
-                    try {
-                        Window wLocal = getLocalWindow();
-                        wLocal.consumeAndCheck(len);
-                    } catch (IOException e) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("doWriteData({}) failed ({}) to consume len={}: {}",
-                                      channel, e.getClass().getSimpleName(), len, e.getMessage());
-                        }
-                        session.exceptionCaught(e);
-                    }
+                    handleWriteDataSuccess(SshConstants.SSH_MSG_CHANNEL_DATA, buf.array(), 0, len);
                 } else {
-                    Throwable t = future.getException();
-                    if (log.isDebugEnabled()) {
-                        log.debug("doWriteData({}) failed ({}) to write len={}: {}",
-                                  channel, t.getClass().getSimpleName(), len, t.getMessage());
-                    }
-
-                    if (log.isTraceEnabled()) {
-                        log.trace("doWriteData(" + channel + ") len=" + len + " write failure details", t);
-                    }
-                    session.exceptionCaught(t);
+                    handleWriteDataFailure(SshConstants.SSH_MSG_CHANNEL_DATA, buf.array(), 0, len, future.getException());
                 }
             }
         });
@@ -386,4 +364,35 @@ public class TcpipServerChannel extends AbstractServerChannel {
     protected void doWriteExtendedData(byte[] data, int off, int len) throws IOException {
         throw new UnsupportedOperationException(type + "Tcpip channel does not support extended data");
     }
+
+    protected void handleWriteDataSuccess(byte cmd, byte[] data, int off, int len) {
+        Session session = getSession();
+        try {
+            Window wLocal = getLocalWindow();
+            wLocal.consumeAndCheck(len);
+        } catch (Throwable e) {
+            if (log.isDebugEnabled()) {
+                log.debug("handleWriteDataSuccess({})[{}] failed ({}) to consume len={}: {}",
+                          this, SshConstants.getCommandMessageName(cmd & 0xFF),
+                          e.getClass().getSimpleName(), len, e.getMessage());
+            }
+            session.exceptionCaught(e);
+        }
+    }
+
+    protected void handleWriteDataFailure(byte cmd, byte[] data, int off, int len, Throwable t) {
+        Session session = getSession();
+        if (log.isDebugEnabled()) {
+            log.debug("handleWriteDataFailure({})[{}] failed ({}) to write len={}: {}",
+                      this, SshConstants.getCommandMessageName(cmd & 0xFF),
+                      t.getClass().getSimpleName(), len, t.getMessage());
+        }
+
+        if (log.isTraceEnabled()) {
+            log.trace("doWriteData(" + this + ")[" + SshConstants.getCommandMessageName(cmd & 0xFF) + "]"
+                    + " len=" + len + " write failure details", t);
+        }
+
+        session.exceptionCaught(t);
+    }
 }


[3/5] mina-sshd git commit: Use only IoWriteFuture as return value of MinaSession#write methods

Posted by lg...@apache.org.
Use only IoWriteFuture as return value of MinaSession#write methods


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/6f246acf
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/6f246acf
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/6f246acf

Branch: refs/heads/master
Commit: 6f246acf24c919550160df17236deb1768813e5f
Parents: d421ac4
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Sun Feb 21 12:22:59 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Sun Feb 21 12:22:59 2016 +0200

----------------------------------------------------------------------
 .../apache/sshd/common/io/mina/MinaSession.java | 34 +++++++++++++-------
 1 file changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/6f246acf/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
index 24c26a6..9a34af1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/mina/MinaSession.java
@@ -32,6 +32,7 @@ import org.apache.sshd.common.io.AbstractIoWriteFuture;
 import org.apache.sshd.common.io.IoService;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.util.NumberUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.closeable.AbstractInnerCloseable;
@@ -83,11 +84,6 @@ public class MinaSession extends AbstractInnerCloseable implements IoSession {
         return session.getId();
     }
 
-    public WriteFuture write(byte[] data, int offset, int len) {
-        IoBuffer buffer = IoBuffer.wrap(data, offset, len);
-        return session.write(buffer);
-    }
-
     @Override
     protected Closeable getInnerCloseable() {
         return new IoBaseCloseable() {
@@ -130,14 +126,30 @@ public class MinaSession extends AbstractInnerCloseable implements IoSession {
         };
     }
 
-    @Override
+    // NOTE !!! data buffer may NOT be re-used when method returns - at least until IoWriteFuture is signalled
+    public IoWriteFuture write(byte[] data) {
+        return write(data, 0, NumberUtils.length(data));
+    }
+
+    // NOTE !!! data buffer may NOT be re-used when method returns - at least until IoWriteFuture is signalled
+    public IoWriteFuture write(byte[] data, int offset, int len) {
+        return write(IoBuffer.wrap(data, offset, len));
+    }
+
+    @Override // NOTE !!! data buffer may NOT be re-used when method returns - at least until IoWriteFuture is signalled
     public IoWriteFuture write(Buffer buffer) {
+        return write(MinaSupport.asIoBuffer(buffer));
+    }
+
+    // NOTE !!! data buffer may NOT be re-used when method returns - at least until IoWriteFuture is signalled
+    public IoWriteFuture write(IoBuffer buffer) {
         final Future future = new Future(null);
-        session.write(MinaSupport.asIoBuffer(buffer)).addListener(new IoFutureListener<WriteFuture>() {
+        session.write(buffer).addListener(new IoFutureListener<WriteFuture>() {
             @Override
             public void operationComplete(WriteFuture cf) {
-                if (cf.getException() != null) {
-                    future.setException(cf.getException());
+                Throwable t = cf.getException();
+                if (t != null) {
+                    future.setException(t);
                 } else {
                     future.setWritten();
                 }
@@ -146,8 +158,8 @@ public class MinaSession extends AbstractInnerCloseable implements IoSession {
         return future;
     }
 
-    private static class Future extends AbstractIoWriteFuture {
-        Future(Object lock) {
+    public static class Future extends AbstractIoWriteFuture {
+        public Future(Object lock) {
             super(lock);
         }