You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2016/02/15 17:51:19 UTC

[1/6] mina-sshd git commit: Remove superfluous override of ClientSessionImpl#handleMessage

Repository: mina-sshd
Updated Branches:
  refs/heads/master 1b0e4bf21 -> 33d9dc65a


Remove superfluous override of ClientSessionImpl#handleMessage


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/9d22091f
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/9d22091f
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/9d22091f

Branch: refs/heads/master
Commit: 9d22091f83edc8cbca53303c44620b8acf447985
Parents: 1b0e4bf
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Mon Feb 15 18:42:39 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Mon Feb 15 18:42:39 2016 +0200

----------------------------------------------------------------------
 .../org/apache/sshd/client/session/ClientSessionImpl.java     | 7 -------
 1 file changed, 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9d22091f/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
index 37656ba..14b5681 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
@@ -269,13 +269,6 @@ public class ClientSessionImpl extends AbstractClientSession {
     }
 
     @Override
-    protected void handleMessage(Buffer buffer) throws Exception {
-        synchronized (lock) {
-            super.handleMessage(buffer);
-        }
-    }
-
-    @Override
     public Set<ClientSessionEvent> waitFor(Collection<ClientSessionEvent> mask, long timeout) {
         ValidateUtils.checkNotNull(mask, "No mask specified");
         long t = 0L;


[2/6] mina-sshd git commit: Added ChannelHolder interface

Posted by lg...@apache.org.
Added ChannelHolder interface


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/28675bdb
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/28675bdb
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/28675bdb

Branch: refs/heads/master
Commit: 28675bdb0b00bfcc3eadfa2e9f7e42fa10f1591c
Parents: 9d22091
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Mon Feb 15 18:48:41 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Mon Feb 15 18:48:41 2016 +0200

----------------------------------------------------------------------
 .../sshd/client/subsystem/SubsystemClient.java  |  8 +++++-
 .../subsystem/sftp/AbstractSftpClient.java      |  7 ++++-
 .../common/channel/ChannelAsyncInputStream.java | 24 +++++++++++-----
 .../sshd/common/channel/ChannelHolder.java      | 30 ++++++++++++++++++++
 .../common/channel/ChannelOutputStream.java     | 28 +++++++++++-------
 .../org/apache/sshd/common/channel/Window.java  | 18 ++++++++----
 6 files changed, 89 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/28675bdb/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
index dca1b38..cfc7970 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
@@ -25,12 +25,18 @@ import org.apache.sshd.client.channel.ClientChannel;
 import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.client.session.ClientSessionHolder;
 import org.apache.sshd.common.NamedResource;
+import org.apache.sshd.common.channel.ChannelHolder;
 import org.apache.sshd.common.session.SessionHolder;
 
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public interface SubsystemClient extends SessionHolder<ClientSession>, ClientSessionHolder, NamedResource, Channel {
+public interface SubsystemClient
+        extends SessionHolder<ClientSession>,
+                ClientSessionHolder,
+                NamedResource,
+                ChannelHolder,
+                Channel {
     /**
      * @return The underlying {@link ClientChannel} used
      */

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/28675bdb/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/AbstractSftpClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/AbstractSftpClient.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/AbstractSftpClient.java
index 81292c3..16b9105 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/AbstractSftpClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/AbstractSftpClient.java
@@ -36,6 +36,7 @@ import org.apache.sshd.client.subsystem.sftp.extensions.BuiltinSftpClientExtensi
 import org.apache.sshd.client.subsystem.sftp.extensions.SftpClientExtension;
 import org.apache.sshd.client.subsystem.sftp.extensions.SftpClientExtensionFactory;
 import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.channel.Channel;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.common.subsystem.sftp.SftpException;
 import org.apache.sshd.common.subsystem.sftp.SftpHelper;
@@ -51,7 +52,6 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public abstract class AbstractSftpClient extends AbstractSubsystemClient implements SftpClient, RawSftpClient {
-
     private final AtomicReference<Map<String, Object>> parsedExtensionsHolder = new AtomicReference<>(null);
 
     protected AbstractSftpClient() {
@@ -59,6 +59,11 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
     }
 
     @Override
+    public Channel getChannel() {
+        return getClientChannel();
+    }
+
+    @Override
     public String getName() {
         return SftpConstants.SFTP_SUBSYSTEM_NAME;
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/28675bdb/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
index 11be85b..75ca78d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
@@ -19,6 +19,7 @@
 package org.apache.sshd.common.channel;
 
 import java.io.IOException;
+
 import org.apache.sshd.common.RuntimeSshException;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.future.CloseFuture;
@@ -26,7 +27,9 @@ import org.apache.sshd.common.future.DefaultVerifiableSshFuture;
 import org.apache.sshd.common.io.IoInputStream;
 import org.apache.sshd.common.io.IoReadFuture;
 import org.apache.sshd.common.io.ReadPendingException;
+import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.Readable;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.common.util.closeable.AbstractCloseable;
@@ -34,14 +37,18 @@ import org.apache.sshd.common.util.closeable.AbstractCloseable;
 /**
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public class ChannelAsyncInputStream extends AbstractCloseable implements IoInputStream {
-
-    private final Channel channel;
+public class ChannelAsyncInputStream extends AbstractCloseable implements IoInputStream, ChannelHolder {
+    private final Channel channelInstance;
     private final Buffer buffer = new ByteArrayBuffer();
     private IoReadFutureImpl pending;
 
     public ChannelAsyncInputStream(Channel channel) {
-        this.channel = channel;
+        this.channelInstance = ValidateUtils.checkNotNull(channel, "No channel");
+    }
+
+    @Override
+    public Channel getChannel() {
+        return channelInstance;
     }
 
     public void write(Readable src) throws IOException {
@@ -109,10 +116,13 @@ public class ChannelAsyncInputStream extends AbstractCloseable implements IoInpu
             }
         }
         if (nbRead > 0) {
+            Channel channel = getChannel();
             try {
-                channel.getLocalWindow().consumeAndCheck(nbRead);
+                Window wLocal = channel.getLocalWindow();
+                wLocal.consumeAndCheck(nbRead);
             } catch (IOException e) {
-                channel.getSession().exceptionCaught(e);
+                Session session = channel.getSession();
+                session.exceptionCaught(e);
             }
             future.setValue(nbRead);
         }
@@ -120,7 +130,7 @@ public class ChannelAsyncInputStream extends AbstractCloseable implements IoInpu
 
     @Override
     public String toString() {
-        return "ChannelAsyncInputStream[" + channel + "]";
+        return getClass().getSimpleName() + "[" + getChannel() + "]";
     }
 
     public static class IoReadFutureImpl extends DefaultVerifiableSshFuture<IoReadFuture> implements IoReadFuture {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/28675bdb/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelHolder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelHolder.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelHolder.java
new file mode 100644
index 0000000..869dc9c
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelHolder.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.channel;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface ChannelHolder {
+    /**
+     * @return The associated {@link Channel} instance
+     */
+    Channel getChannel();
+}

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/28675bdb/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
index 9525bd2..3ed3f59 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
@@ -21,7 +21,6 @@ package org.apache.sshd.common.channel;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
-import java.nio.channels.Channel;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -34,18 +33,16 @@ import org.apache.sshd.common.util.buffer.Buffer;
 import org.slf4j.Logger;
 
 /**
- * TODO Add javadoc
- *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public class ChannelOutputStream extends OutputStream implements Channel {
+public class ChannelOutputStream extends OutputStream implements java.nio.channels.Channel, ChannelHolder {
     /**
      * Configure max. wait time (millis) to wait for space to become available
      */
     public static final String WAIT_FOR_SPACE_TIMEOUT = "channel-output-wait-for-space-timeout";
     public static final long DEFAULT_WAIT_FOR_SPACE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L);
 
-    private final AbstractChannel channel;
+    private final AbstractChannel channelInstance;
     private final Window remoteWindow;
     private final long maxWaitTimeout;
     private final Logger log;
@@ -63,7 +60,7 @@ public class ChannelOutputStream extends OutputStream implements Channel {
     }
 
     public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, long maxWaitTimeout, Logger log, byte cmd, boolean eofOnClose) {
-        this.channel = ValidateUtils.checkNotNull(channel, "No channel");
+        this.channelInstance = ValidateUtils.checkNotNull(channel, "No channel");
         this.remoteWindow = ValidateUtils.checkNotNull(remoteWindow, "No remote window");
         ValidateUtils.checkTrue(maxWaitTimeout > 0L, "Non-positive max. wait time: %d", maxWaitTimeout);
         this.maxWaitTimeout = maxWaitTimeout;
@@ -73,6 +70,11 @@ public class ChannelOutputStream extends OutputStream implements Channel {
         newBuffer(0);
     }
 
+    @Override   // co-variant return
+    public AbstractChannel getChannel() {
+        return channelInstance;
+    }
+
     public boolean isEofOnClose() {
         return eofOnClose;
     }
@@ -102,6 +104,7 @@ public class ChannelOutputStream extends OutputStream implements Channel {
             throw new SshException("write(" + this + ") len=" + l + " - channel already closed");
         }
 
+        Channel channel = getChannel();
         Session session = channel.getSession();
         while (l > 0) {
             // The maximum amount we should admit without flushing again
@@ -152,6 +155,7 @@ public class ChannelOutputStream extends OutputStream implements Channel {
         }
 
         try {
+            AbstractChannel channel = getChannel();
             Session session = channel.getSession();
             while (bufferLength > 0) {
                 session.resetIdleTimeout();
@@ -207,6 +211,7 @@ public class ChannelOutputStream extends OutputStream implements Channel {
             try {
                 flush();
                 if (isEofOnClose()) {
+                    AbstractChannel channel = getChannel();
                     channel.sendEof();
                 }
             } finally {
@@ -215,12 +220,8 @@ public class ChannelOutputStream extends OutputStream implements Channel {
         }
     }
 
-    @Override
-    public String toString() {
-        return getClass().getSimpleName() + "[" + channel + "]";
-    }
-
     protected void newBuffer(int size) {
+        Channel channel = getChannel();
         Session session = channel.getSession();
         buffer = session.createBuffer(cmd, size <= 0 ? 12 : 12 + size);
         buffer.putInt(channel.getRecipient());
@@ -230,4 +231,9 @@ public class ChannelOutputStream extends OutputStream implements Channel {
         buffer.putInt(0);
         bufferLength = 0;
     }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "[" + getChannel() + "] " + SshConstants.getCommandMessageName(cmd & 0xFF);
+    }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/28675bdb/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
index b393112..b9a26be 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
@@ -43,7 +43,7 @@ import org.apache.sshd.common.util.logging.AbstractLoggingBean;
  *
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
-public class Window extends AbstractLoggingBean implements java.nio.channels.Channel, PropertyResolver {
+public class Window extends AbstractLoggingBean implements java.nio.channels.Channel, ChannelHolder, PropertyResolver {
     /**
      * Default {@link Predicate} used to test if space became available
      */
@@ -59,7 +59,7 @@ public class Window extends AbstractLoggingBean implements java.nio.channels.Cha
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final AtomicBoolean initialized = new AtomicBoolean(false);
     private final AtomicInteger sizeHolder = new AtomicInteger(0);
-    private final AbstractChannel channel;
+    private final AbstractChannel channelInstance;
     private final Object lock;
     private final String suffix;
 
@@ -68,9 +68,9 @@ public class Window extends AbstractLoggingBean implements java.nio.channels.Cha
     private Map<String, Object> props = Collections.<String, Object>emptyMap();
 
     public Window(AbstractChannel channel, Object lock, boolean client, boolean local) {
-        this.channel = ValidateUtils.checkNotNull(channel, "No channel provided");
+        this.channelInstance = ValidateUtils.checkNotNull(channel, "No channel provided");
         this.lock = (lock != null) ? lock : this;
-        this.suffix = ": " + (client ? "client" : "server") + " " + (local ? "local" : "remote") + " window";
+        this.suffix = (client ? "client" : "server") + "/" + (local ? "local" : "remote");
     }
 
     @Override
@@ -80,7 +80,12 @@ public class Window extends AbstractLoggingBean implements java.nio.channels.Cha
 
     @Override
     public PropertyResolver getParentPropertyResolver() {
-        return channel;
+        return getChannel();
+    }
+
+    @Override   // co-variant return
+    public AbstractChannel getChannel() {
+        return channelInstance;
     }
 
     public int getSize() {
@@ -191,6 +196,7 @@ public class Window extends AbstractLoggingBean implements java.nio.channels.Cha
         checkInitialized("check");
 
         int adjustSize = -1;
+        AbstractChannel channel = getChannel();
         synchronized (lock) {
             // TODO make the adjust factor configurable via FactoryManager property
             int size = sizeHolder.get();
@@ -339,6 +345,6 @@ public class Window extends AbstractLoggingBean implements java.nio.channels.Cha
 
     @Override
     public String toString() {
-        return String.valueOf(channel) + suffix;
+        return getClass().getSimpleName() + "[" + suffix + "](" + String.valueOf(getChannel()) + ")";
     }
 }


[3/6] mina-sshd git commit: Display more details at TRACE level if channel close fails

Posted by lg...@apache.org.
Display more details at TRACE level if channel close fails


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/2bde68e1
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/2bde68e1
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/2bde68e1

Branch: refs/heads/master
Commit: 2bde68e173eac1ced093d021ec61898884014f58
Parents: 28675bd
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Mon Feb 15 18:49:18 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Mon Feb 15 18:49:18 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/sshd/common/channel/AbstractChannel.java  | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/2bde68e1/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 7090868..d74a46f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -472,8 +472,13 @@ public abstract class AbstractChannel
                                     gracefulFuture.setClosed();
                                 }
                             } else {
+                                Throwable t = future.getException();
                                 if (log.isDebugEnabled()) {
-                                    log.debug("close({})[immediately={}] failed to write SSH_MSG_CHANNEL_CLOSE on channel", channel, immediately);
+                                    log.debug("close({})[immediately={}] failed ({}) to write SSH_MSG_CHANNEL_CLOSE on channel: {}",
+                                              channel, immediately, t.getClass().getSimpleName(), t.getMessage());
+                                }
+                                if (log.isTraceEnabled()) {
+                                    log.trace("close(" + channel + ") SSH_MSG_CHANNEL_CLOSE failure details", t);
                                 }
                                 channel.close(true);
                             }


[6/6] mina-sshd git commit: Fix a few minor code style issues

Posted by lg...@apache.org.
Fix a few minor code style issues


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/33d9dc65
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/33d9dc65
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/33d9dc65

Branch: refs/heads/master
Commit: 33d9dc65afc9e70b376ede8458d75e14a6af7aff
Parents: 711e01a
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Mon Feb 15 18:51:57 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Mon Feb 15 18:51:57 2016 +0200

----------------------------------------------------------------------
 .../apache/sshd/server/session/ServerSessionImpl.java  | 13 +++++++------
 .../test/java/org/apache/sshd/client/ClientTest.java   |  2 ++
 2 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/33d9dc65/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
index 8e6343c..12f0105 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
@@ -203,12 +203,13 @@ public class ServerSessionImpl extends AbstractServerSession {
 
         if (!clientVersion.startsWith(DEFAULT_SSH_VERSION_PREFIX)) {
             String msg = "Unsupported protocol version: " + clientVersion;
-            ioSession.write(new ByteArrayBuffer((msg + "\n").getBytes(StandardCharsets.UTF_8))).addListener(new SshFutureListener<IoWriteFuture>() {
-                @Override
-                public void operationComplete(IoWriteFuture future) {
-                    close(true);
-                }
-            });
+            ioSession.write(new ByteArrayBuffer((msg + "\n").getBytes(StandardCharsets.UTF_8)))
+                     .addListener(new SshFutureListener<IoWriteFuture>() {
+                         @Override
+                         public void operationComplete(IoWriteFuture future) {
+                             close(true);
+                         }
+                     });
             throw new SshException(msg);
         } else {
             kexState.set(KexState.INIT);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/33d9dc65/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
index 666c9cd..f3b882d 100644
--- a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
@@ -548,6 +548,7 @@ public class ClientTest extends BaseTestSupport {
                                 public void operationComplete(IoReadFuture future) {
                                     try {
                                         future.verify(5L, TimeUnit.SECONDS);
+
                                         Buffer buffer = future.getBuffer();
                                         baosOut.write(buffer.array(), buffer.rpos(), buffer.available());
                                         buffer.rpos(buffer.rpos() + buffer.available());
@@ -567,6 +568,7 @@ public class ClientTest extends BaseTestSupport {
                                 public void operationComplete(IoReadFuture future) {
                                     try {
                                         future.verify(5L, TimeUnit.SECONDS);
+
                                         Buffer buffer = future.getBuffer();
                                         baosErr.write(buffer.array(), buffer.rpos(), buffer.available());
                                         buffer.rpos(buffer.rpos() + buffer.available());


[5/6] mina-sshd git commit: Use the randomizer instance as lock for some internal values instead of the "master" lock

Posted by lg...@apache.org.
Use the randomizer instance as lock for some internal values instead of the "master" lock


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/711e01a1
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/711e01a1
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/711e01a1

Branch: refs/heads/master
Commit: 711e01a1e9041692a84d33d98e3d8d19aa919ed8
Parents: 823540d
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Mon Feb 15 18:51:41 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Mon Feb 15 18:51:41 2016 +0200

----------------------------------------------------------------------
 .../apache/sshd/common/session/AbstractSession.java  | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/711e01a1/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index 8455313..daaa0c1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -392,7 +392,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
      * Refresh whatever internal configuration is not {@code final}
      */
     protected void refreshConfiguration() {
-        synchronized (lock) {
+        synchronized (random) {
             // re-keying configuration
             maxRekeyBytes = PropertyResolverUtils.getLongProperty(this, FactoryManager.REKEY_BYTES_LIMIT, maxRekeyBytes);
             maxRekeyInterval = PropertyResolverUtils.getLongProperty(this, FactoryManager.REKEY_TIME_LIMIT, maxRekeyInterval);
@@ -915,7 +915,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
             ignoreBuf.putInt(ignoreDataLen);
 
             int wpos = ignoreBuf.wpos();
-            synchronized (lock) {
+            synchronized (random) {
                 random.fill(ignoreBuf.array(), wpos, ignoreDataLen);
             }
             ignoreBuf.wpos(wpos + ignoreDataLen);
@@ -954,7 +954,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
             return 0;
         }
 
-        synchronized (lock) {
+        synchronized (random) {
             ignorePacketsCount.set(calculateNextIgnorePacketCount(random, ignorePacketsFrequency, ignorePacketsVariance));
             return ignorePacketDataLength + random.random(ignorePacketDataLength);
         }
@@ -1135,7 +1135,10 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
             buffer.putByte((byte) pad);
             // Fill padding
             buffer.wpos(off + oldLen + SshConstants.SSH_PACKET_HEADER_LEN + pad);
-            random.fill(buffer.array(), buffer.wpos() - pad, pad);
+            synchronized (random) {
+                random.fill(buffer.array(), buffer.wpos() - pad, pad);
+            }
+
             // Compute mac
             if (outMac != null) {
                 int macSize = outMac.getBlockSize();
@@ -1398,7 +1401,9 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         Buffer buffer = createBuffer(SshConstants.SSH_MSG_KEXINIT);
         int p = buffer.wpos();
         buffer.wpos(p + SshConstants.MSG_KEX_COOKIE_SIZE);
-        random.fill(buffer.array(), p, SshConstants.MSG_KEX_COOKIE_SIZE);
+        synchronized (random) {
+            random.fill(buffer.array(), p, SshConstants.MSG_KEX_COOKIE_SIZE);
+        }
         if (log.isTraceEnabled()) {
             log.trace("sendKexInit(" + toString() + ") cookie=" + BufferUtils.toHex(buffer.array(), p, SshConstants.MSG_KEX_COOKIE_SIZE, ':'));
         }


[4/6] mina-sshd git commit: Take into account IoWriteFuture#isWritten result

Posted by lg...@apache.org.
Take into account IoWriteFuture#isWritten result


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/823540d4
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/823540d4
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/823540d4

Branch: refs/heads/master
Commit: 823540d4c68014cb02a272e04c743a4b14a29926
Parents: 2bde68e
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Mon Feb 15 18:51:10 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Mon Feb 15 18:51:10 2016 +0200

----------------------------------------------------------------------
 .../channel/ChannelAsyncOutputStream.java       | 63 +++++++++++++++++---
 .../sshd/server/forward/TcpipServerChannel.java | 32 ++++++++--
 .../java/org/apache/sshd/WindowAdjustTest.java  | 16 +++--
 .../sshd/util/test/AsyncEchoShellFactory.java   | 17 ++++--
 4 files changed, 104 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index c790f29..7305937 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -28,20 +28,26 @@ import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.io.WritePendingException;
 import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.closeable.AbstractCloseable;
 
-public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOutputStream {
+public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOutputStream, ChannelHolder {
 
-    private final Channel channel;
+    private final Channel channelInstance;
     private final byte cmd;
     private final AtomicReference<IoWriteFutureImpl> pendingWrite = new AtomicReference<>();
 
     public ChannelAsyncOutputStream(Channel channel, byte cmd) {
-        this.channel = channel;
+        this.channelInstance = ValidateUtils.checkNotNull(channel, "No channel");
         this.cmd = cmd;
     }
 
+    @Override
+    public Channel getChannel() {
+        return channelInstance;
+    }
+
     public void onWindowExpanded() throws IOException {
         doWriteIfPossible(true);
     }
@@ -77,6 +83,7 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
         final Buffer buffer = future.getBuffer();
         final int total = buffer.available();
         if (total > 0) {
+            Channel channel = getChannel();
             Window remoteWindow = channel.getRemoteWindow();
             final int length = Math.min(Math.min(remoteWindow.getSize(), total), remoteWindow.getPacketSize());
             if (log.isTraceEnabled()) {
@@ -101,32 +108,72 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
                 buffer.rpos(buffer.rpos() + length);
                 remoteWindow.consume(length);
                 try {
+                    final ChannelAsyncOutputStream stream = this;
                     s.writePacket(buf).addListener(new SshFutureListener<IoWriteFuture>() {
-                        @SuppressWarnings("synthetic-access")
                         @Override
                         public void operationComplete(IoWriteFuture f) {
+                            if (f.isWritten()) {
+                                handleOperationCompleted();
+                            } else {
+                                handleOperationFailed(f.getException());
+                            }
+                        }
+
+                        @SuppressWarnings("synthetic-access")
+                        private void handleOperationCompleted() {
                             if (total > length) {
+                                if (log.isTraceEnabled()) {
+                                    log.trace("doWriteIfPossible({}) completed write of {} out of {}", stream, length, total);
+                                }
                                 doWriteIfPossible(false);
                             } else {
-                                pendingWrite.compareAndSet(future, null);
+                                boolean nullified = pendingWrite.compareAndSet(future, null);
+                                if (log.isTraceEnabled()) {
+                                    log.trace("doWriteIfPossible({}) completed write len={}, more={}",
+                                              stream, total, !nullified);
+                                }
                                 future.setValue(Boolean.TRUE);
                             }
                         }
+
+                        @SuppressWarnings("synthetic-access")
+                        private void handleOperationFailed(Throwable reason) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("doWriteIfPossible({}) failed ({}) to complete write of {} out of {}: {}",
+                                          stream, reason.getClass().getSimpleName(), length, total, reason.getMessage());
+                            }
+
+                            if (log.isTraceEnabled()) {
+                                log.trace("doWriteIfPossible(" + this + ") write failure details", reason);
+                            }
+
+                            boolean nullified = pendingWrite.compareAndSet(future, null);
+                            if (log.isTraceEnabled()) {
+                                log.trace("doWriteIfPossible({}) failed write len={}, more={}",
+                                          stream, total, !nullified);
+                            }
+                            future.setValue(reason);
+                        }
                     });
                 } catch (IOException e) {
                     future.setValue(e);
                 }
             } else if (!resume) {
-                log.debug("Delaying write to {} until space is available in the remote window", this);
+                if (log.isDebugEnabled()) {
+                    log.debug("doWriteIfPossible({}) delaying write until space is available in the remote window", this);
+                }
             }
         } else {
-            pendingWrite.compareAndSet(future, null);
+            boolean nullified = pendingWrite.compareAndSet(future, null);
+            if (log.isTraceEnabled()) {
+                log.trace("doWriteIfPossible({}) current buffer sent - more={}", this, !nullified);
+            }
             future.setValue(Boolean.TRUE);
         }
     }
 
     @Override
     public String toString() {
-        return "ChannelAsyncOutputStream[" + channel + "]";
+        return getClass().getSimpleName() + "[" + getChannel() + "] cmd=" + SshConstants.getCommandMessageName(cmd & 0xFF);
     }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 11ff1b4..4b66a24 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -52,6 +52,7 @@ import org.apache.sshd.common.util.net.SshdSocketAddress;
 import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
 import org.apache.sshd.common.util.threads.ThreadUtils;
 import org.apache.sshd.server.channel.AbstractServerChannel;
+import org.apache.sshd.server.channel.ServerChannel;
 
 /**
  * TODO Add javadoc
@@ -348,15 +349,34 @@ public class TcpipServerChannel extends AbstractServerChannel {
     protected void doWriteData(byte[] data, int off, final int len) throws IOException {
         // Make sure we copy the data as the incoming buffer may be reused
         Buffer buf = ByteArrayBuffer.getCompactClone(data, off, len);
+        final ServerChannel channel = this;
         ioSession.write(buf).addListener(new SshFutureListener<IoWriteFuture>() {
             @Override
+            @SuppressWarnings("synthetic-access")
             public void operationComplete(IoWriteFuture future) {
-                try {
-                    Window wLocal = getLocalWindow();
-                    wLocal.consumeAndCheck(len);
-                } catch (IOException e) {
-                    Session session = getSession();
-                    session.exceptionCaught(e);
+                Session session = getSession();
+                if (future.isWritten()) {
+                    try {
+                        Window wLocal = getLocalWindow();
+                        wLocal.consumeAndCheck(len);
+                    } catch (IOException e) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("doWriteData({}) failed ({}) to consume len={}: {}",
+                                      channel, e.getClass().getSimpleName(), len, e.getMessage());
+                        }
+                        session.exceptionCaught(e);
+                    }
+                } else {
+                    Throwable t = future.getException();
+                    if (log.isDebugEnabled()) {
+                        log.debug("doWriteData({}) failed ({}) to write len={}: {}",
+                                  channel, t.getClass().getSimpleName(), len, t.getMessage());
+                    }
+
+                    if (log.isTraceEnabled()) {
+                        log.trace("doWriteData(" + channel + ") len=" + len + " write failure details", t);
+                    }
+                    session.exceptionCaught(t);
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
index f7017b3..b20cfb7 100644
--- a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
@@ -112,7 +112,6 @@ public class WindowAdjustTest extends BaseTestSupport {
 
     @Test(timeout = 6L * 60L * 1000L)
     public void testTrafficHeavyLoad() throws Exception {
-
         try (SshClient client = setupTestClient()) {
             client.start();
 
@@ -274,7 +273,7 @@ public class WindowAdjustTest extends BaseTestSupport {
     /**
      * Wrapper for asyncIn stream that catches Pending exception and queues the pending messages for later retry (send after previous messages were fully transfered)
      */
-    private static class AsyncInPendingWrapper {
+    private static class AsyncInPendingWrapper extends AbstractLoggingBean {
         private IoOutputStream asyncIn;
 
         // Order has to be preserved for queued writes
@@ -309,11 +308,16 @@ public class WindowAdjustTest extends BaseTestSupport {
                 asyncIn.write(msg).addListener(new SshFutureListener<IoWriteFuture>() {
                     @SuppressWarnings("synthetic-access")
                     @Override
-                    public void operationComplete(final IoWriteFuture future) {
-                        if (wasPending) {
-                            pending.remove();
+                    public void operationComplete(IoWriteFuture future) {
+                        if (future.isWritten()) {
+                            if (wasPending) {
+                                pending.remove();
+                            }
+                            writePendingIfAny();
+                        } else {
+                            Throwable t = future.getException();
+                            log.warn("Failed to write message", t);
                         }
-                        writePendingIfAny();
                     }
                 });
             } catch (final WritePendingException e) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
index a66c3b6..4623a52 100644
--- a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
+++ b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
@@ -25,11 +25,13 @@ import java.nio.charset.StandardCharsets;
 
 import org.apache.sshd.common.Factory;
 import org.apache.sshd.common.channel.BufferedIoOutputStream;
+import org.apache.sshd.common.channel.Window;
 import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoInputStream;
 import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
 import org.apache.sshd.server.AsyncCommand;
 import org.apache.sshd.server.ChannelSessionAware;
@@ -144,10 +146,17 @@ public class AsyncEchoShellFactory implements Factory<Command> {
                     out.write(new ByteArrayBuffer(bytes)).addListener(new SshFutureListener<IoWriteFuture>() {
                         @Override
                         public void operationComplete(IoWriteFuture future) {
-                            try {
-                                channel.getLocalWindow().consumeAndCheck(bytes.length);
-                            } catch (IOException e) {
-                                channel.getSession().exceptionCaught(e);
+                            Session session = channel.getSession();
+                            if (future.isWritten()) {
+                                try {
+                                    Window wLocal = channel.getLocalWindow();
+                                    wLocal.consumeAndCheck(bytes.length);
+                                } catch (IOException e) {
+                                    session.exceptionCaught(e);
+                                }
+                            } else {
+                                Throwable t = future.getException();
+                                session.exceptionCaught(t);
                             }
                         }
                     });