You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2016/02/15 17:51:19 UTC
[1/6] mina-sshd git commit: Remove superfluous override of
ClientSessionImpl#handleMessage
Repository: mina-sshd
Updated Branches:
refs/heads/master 1b0e4bf21 -> 33d9dc65a
Remove superfluous override of ClientSessionImpl#handleMessage
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/9d22091f
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/9d22091f
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/9d22091f
Branch: refs/heads/master
Commit: 9d22091f83edc8cbca53303c44620b8acf447985
Parents: 1b0e4bf
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Mon Feb 15 18:42:39 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Mon Feb 15 18:42:39 2016 +0200
----------------------------------------------------------------------
.../org/apache/sshd/client/session/ClientSessionImpl.java | 7 -------
1 file changed, 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/9d22091f/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
index 37656ba..14b5681 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
@@ -269,13 +269,6 @@ public class ClientSessionImpl extends AbstractClientSession {
}
@Override
- protected void handleMessage(Buffer buffer) throws Exception {
- synchronized (lock) {
- super.handleMessage(buffer);
- }
- }
-
- @Override
public Set<ClientSessionEvent> waitFor(Collection<ClientSessionEvent> mask, long timeout) {
ValidateUtils.checkNotNull(mask, "No mask specified");
long t = 0L;
[2/6] mina-sshd git commit: Added ChannelHolder interface
Posted by lg...@apache.org.
Added ChannelHolder interface
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/28675bdb
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/28675bdb
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/28675bdb
Branch: refs/heads/master
Commit: 28675bdb0b00bfcc3eadfa2e9f7e42fa10f1591c
Parents: 9d22091
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Mon Feb 15 18:48:41 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Mon Feb 15 18:48:41 2016 +0200
----------------------------------------------------------------------
.../sshd/client/subsystem/SubsystemClient.java | 8 +++++-
.../subsystem/sftp/AbstractSftpClient.java | 7 ++++-
.../common/channel/ChannelAsyncInputStream.java | 24 +++++++++++-----
.../sshd/common/channel/ChannelHolder.java | 30 ++++++++++++++++++++
.../common/channel/ChannelOutputStream.java | 28 +++++++++++-------
.../org/apache/sshd/common/channel/Window.java | 18 ++++++++----
6 files changed, 89 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/28675bdb/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
index dca1b38..cfc7970 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/SubsystemClient.java
@@ -25,12 +25,18 @@ import org.apache.sshd.client.channel.ClientChannel;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.client.session.ClientSessionHolder;
import org.apache.sshd.common.NamedResource;
+import org.apache.sshd.common.channel.ChannelHolder;
import org.apache.sshd.common.session.SessionHolder;
/**
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public interface SubsystemClient extends SessionHolder<ClientSession>, ClientSessionHolder, NamedResource, Channel {
+public interface SubsystemClient
+ extends SessionHolder<ClientSession>,
+ ClientSessionHolder,
+ NamedResource,
+ ChannelHolder,
+ Channel {
/**
* @return The underlying {@link ClientChannel} used
*/
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/28675bdb/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/AbstractSftpClient.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/AbstractSftpClient.java b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/AbstractSftpClient.java
index 81292c3..16b9105 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/AbstractSftpClient.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/subsystem/sftp/AbstractSftpClient.java
@@ -36,6 +36,7 @@ import org.apache.sshd.client.subsystem.sftp.extensions.BuiltinSftpClientExtensi
import org.apache.sshd.client.subsystem.sftp.extensions.SftpClientExtension;
import org.apache.sshd.client.subsystem.sftp.extensions.SftpClientExtensionFactory;
import org.apache.sshd.common.SshException;
+import org.apache.sshd.common.channel.Channel;
import org.apache.sshd.common.subsystem.sftp.SftpConstants;
import org.apache.sshd.common.subsystem.sftp.SftpException;
import org.apache.sshd.common.subsystem.sftp.SftpHelper;
@@ -51,7 +52,6 @@ import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public abstract class AbstractSftpClient extends AbstractSubsystemClient implements SftpClient, RawSftpClient {
-
private final AtomicReference<Map<String, Object>> parsedExtensionsHolder = new AtomicReference<>(null);
protected AbstractSftpClient() {
@@ -59,6 +59,11 @@ public abstract class AbstractSftpClient extends AbstractSubsystemClient impleme
}
@Override
+ public Channel getChannel() {
+ return getClientChannel();
+ }
+
+ @Override
public String getName() {
return SftpConstants.SFTP_SUBSYSTEM_NAME;
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/28675bdb/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
index 11be85b..75ca78d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncInputStream.java
@@ -19,6 +19,7 @@
package org.apache.sshd.common.channel;
import java.io.IOException;
+
import org.apache.sshd.common.RuntimeSshException;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.future.CloseFuture;
@@ -26,7 +27,9 @@ import org.apache.sshd.common.future.DefaultVerifiableSshFuture;
import org.apache.sshd.common.io.IoInputStream;
import org.apache.sshd.common.io.IoReadFuture;
import org.apache.sshd.common.io.ReadPendingException;
+import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.Readable;
+import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.common.util.closeable.AbstractCloseable;
@@ -34,14 +37,18 @@ import org.apache.sshd.common.util.closeable.AbstractCloseable;
/**
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public class ChannelAsyncInputStream extends AbstractCloseable implements IoInputStream {
-
- private final Channel channel;
+public class ChannelAsyncInputStream extends AbstractCloseable implements IoInputStream, ChannelHolder {
+ private final Channel channelInstance;
private final Buffer buffer = new ByteArrayBuffer();
private IoReadFutureImpl pending;
public ChannelAsyncInputStream(Channel channel) {
- this.channel = channel;
+ this.channelInstance = ValidateUtils.checkNotNull(channel, "No channel");
+ }
+
+ @Override
+ public Channel getChannel() {
+ return channelInstance;
}
public void write(Readable src) throws IOException {
@@ -109,10 +116,13 @@ public class ChannelAsyncInputStream extends AbstractCloseable implements IoInpu
}
}
if (nbRead > 0) {
+ Channel channel = getChannel();
try {
- channel.getLocalWindow().consumeAndCheck(nbRead);
+ Window wLocal = channel.getLocalWindow();
+ wLocal.consumeAndCheck(nbRead);
} catch (IOException e) {
- channel.getSession().exceptionCaught(e);
+ Session session = channel.getSession();
+ session.exceptionCaught(e);
}
future.setValue(nbRead);
}
@@ -120,7 +130,7 @@ public class ChannelAsyncInputStream extends AbstractCloseable implements IoInpu
@Override
public String toString() {
- return "ChannelAsyncInputStream[" + channel + "]";
+ return getClass().getSimpleName() + "[" + getChannel() + "]";
}
public static class IoReadFutureImpl extends DefaultVerifiableSshFuture<IoReadFuture> implements IoReadFuture {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/28675bdb/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelHolder.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelHolder.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelHolder.java
new file mode 100644
index 0000000..869dc9c
--- /dev/null
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelHolder.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.channel;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public interface ChannelHolder {
+ /**
+ * @return The associated {@link Channel} instance
+ */
+ Channel getChannel();
+}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/28675bdb/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
index 9525bd2..3ed3f59 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelOutputStream.java
@@ -21,7 +21,6 @@ package org.apache.sshd.common.channel;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
-import java.nio.channels.Channel;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,18 +33,16 @@ import org.apache.sshd.common.util.buffer.Buffer;
import org.slf4j.Logger;
/**
- * TODO Add javadoc
- *
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public class ChannelOutputStream extends OutputStream implements Channel {
+public class ChannelOutputStream extends OutputStream implements java.nio.channels.Channel, ChannelHolder {
/**
* Configure max. wait time (millis) to wait for space to become available
*/
public static final String WAIT_FOR_SPACE_TIMEOUT = "channel-output-wait-for-space-timeout";
public static final long DEFAULT_WAIT_FOR_SPACE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L);
- private final AbstractChannel channel;
+ private final AbstractChannel channelInstance;
private final Window remoteWindow;
private final long maxWaitTimeout;
private final Logger log;
@@ -63,7 +60,7 @@ public class ChannelOutputStream extends OutputStream implements Channel {
}
public ChannelOutputStream(AbstractChannel channel, Window remoteWindow, long maxWaitTimeout, Logger log, byte cmd, boolean eofOnClose) {
- this.channel = ValidateUtils.checkNotNull(channel, "No channel");
+ this.channelInstance = ValidateUtils.checkNotNull(channel, "No channel");
this.remoteWindow = ValidateUtils.checkNotNull(remoteWindow, "No remote window");
ValidateUtils.checkTrue(maxWaitTimeout > 0L, "Non-positive max. wait time: %d", maxWaitTimeout);
this.maxWaitTimeout = maxWaitTimeout;
@@ -73,6 +70,11 @@ public class ChannelOutputStream extends OutputStream implements Channel {
newBuffer(0);
}
+ @Override // co-variant return
+ public AbstractChannel getChannel() {
+ return channelInstance;
+ }
+
public boolean isEofOnClose() {
return eofOnClose;
}
@@ -102,6 +104,7 @@ public class ChannelOutputStream extends OutputStream implements Channel {
throw new SshException("write(" + this + ") len=" + l + " - channel already closed");
}
+ Channel channel = getChannel();
Session session = channel.getSession();
while (l > 0) {
// The maximum amount we should admit without flushing again
@@ -152,6 +155,7 @@ public class ChannelOutputStream extends OutputStream implements Channel {
}
try {
+ AbstractChannel channel = getChannel();
Session session = channel.getSession();
while (bufferLength > 0) {
session.resetIdleTimeout();
@@ -207,6 +211,7 @@ public class ChannelOutputStream extends OutputStream implements Channel {
try {
flush();
if (isEofOnClose()) {
+ AbstractChannel channel = getChannel();
channel.sendEof();
}
} finally {
@@ -215,12 +220,8 @@ public class ChannelOutputStream extends OutputStream implements Channel {
}
}
- @Override
- public String toString() {
- return getClass().getSimpleName() + "[" + channel + "]";
- }
-
protected void newBuffer(int size) {
+ Channel channel = getChannel();
Session session = channel.getSession();
buffer = session.createBuffer(cmd, size <= 0 ? 12 : 12 + size);
buffer.putInt(channel.getRecipient());
@@ -230,4 +231,9 @@ public class ChannelOutputStream extends OutputStream implements Channel {
buffer.putInt(0);
bufferLength = 0;
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "[" + getChannel() + "] " + SshConstants.getCommandMessageName(cmd & 0xFF);
+ }
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/28675bdb/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
index b393112..b9a26be 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/Window.java
@@ -43,7 +43,7 @@ import org.apache.sshd.common.util.logging.AbstractLoggingBean;
*
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
-public class Window extends AbstractLoggingBean implements java.nio.channels.Channel, PropertyResolver {
+public class Window extends AbstractLoggingBean implements java.nio.channels.Channel, ChannelHolder, PropertyResolver {
/**
* Default {@link Predicate} used to test if space became available
*/
@@ -59,7 +59,7 @@ public class Window extends AbstractLoggingBean implements java.nio.channels.Cha
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final AtomicInteger sizeHolder = new AtomicInteger(0);
- private final AbstractChannel channel;
+ private final AbstractChannel channelInstance;
private final Object lock;
private final String suffix;
@@ -68,9 +68,9 @@ public class Window extends AbstractLoggingBean implements java.nio.channels.Cha
private Map<String, Object> props = Collections.<String, Object>emptyMap();
public Window(AbstractChannel channel, Object lock, boolean client, boolean local) {
- this.channel = ValidateUtils.checkNotNull(channel, "No channel provided");
+ this.channelInstance = ValidateUtils.checkNotNull(channel, "No channel provided");
this.lock = (lock != null) ? lock : this;
- this.suffix = ": " + (client ? "client" : "server") + " " + (local ? "local" : "remote") + " window";
+ this.suffix = (client ? "client" : "server") + "/" + (local ? "local" : "remote");
}
@Override
@@ -80,7 +80,12 @@ public class Window extends AbstractLoggingBean implements java.nio.channels.Cha
@Override
public PropertyResolver getParentPropertyResolver() {
- return channel;
+ return getChannel();
+ }
+
+ @Override // co-variant return
+ public AbstractChannel getChannel() {
+ return channelInstance;
}
public int getSize() {
@@ -191,6 +196,7 @@ public class Window extends AbstractLoggingBean implements java.nio.channels.Cha
checkInitialized("check");
int adjustSize = -1;
+ AbstractChannel channel = getChannel();
synchronized (lock) {
// TODO make the adjust factor configurable via FactoryManager property
int size = sizeHolder.get();
@@ -339,6 +345,6 @@ public class Window extends AbstractLoggingBean implements java.nio.channels.Cha
@Override
public String toString() {
- return String.valueOf(channel) + suffix;
+ return getClass().getSimpleName() + "[" + suffix + "](" + String.valueOf(getChannel()) + ")";
}
}
[3/6] mina-sshd git commit: Display more details at TRACE level if
channel close fails
Posted by lg...@apache.org.
Display more details at TRACE level if channel close fails
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/2bde68e1
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/2bde68e1
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/2bde68e1
Branch: refs/heads/master
Commit: 2bde68e173eac1ced093d021ec61898884014f58
Parents: 28675bd
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Mon Feb 15 18:49:18 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Mon Feb 15 18:49:18 2016 +0200
----------------------------------------------------------------------
.../java/org/apache/sshd/common/channel/AbstractChannel.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/2bde68e1/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 7090868..d74a46f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -472,8 +472,13 @@ public abstract class AbstractChannel
gracefulFuture.setClosed();
}
} else {
+ Throwable t = future.getException();
if (log.isDebugEnabled()) {
- log.debug("close({})[immediately={}] failed to write SSH_MSG_CHANNEL_CLOSE on channel", channel, immediately);
+ log.debug("close({})[immediately={}] failed ({}) to write SSH_MSG_CHANNEL_CLOSE on channel: {}",
+ channel, immediately, t.getClass().getSimpleName(), t.getMessage());
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("close(" + channel + ") SSH_MSG_CHANNEL_CLOSE failure details", t);
}
channel.close(true);
}
[6/6] mina-sshd git commit: Fix a few minor code style issues
Posted by lg...@apache.org.
Fix a few minor code style issues
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/33d9dc65
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/33d9dc65
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/33d9dc65
Branch: refs/heads/master
Commit: 33d9dc65afc9e70b376ede8458d75e14a6af7aff
Parents: 711e01a
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Mon Feb 15 18:51:57 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Mon Feb 15 18:51:57 2016 +0200
----------------------------------------------------------------------
.../apache/sshd/server/session/ServerSessionImpl.java | 13 +++++++------
.../test/java/org/apache/sshd/client/ClientTest.java | 2 ++
2 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/33d9dc65/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
index 8e6343c..12f0105 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
@@ -203,12 +203,13 @@ public class ServerSessionImpl extends AbstractServerSession {
if (!clientVersion.startsWith(DEFAULT_SSH_VERSION_PREFIX)) {
String msg = "Unsupported protocol version: " + clientVersion;
- ioSession.write(new ByteArrayBuffer((msg + "\n").getBytes(StandardCharsets.UTF_8))).addListener(new SshFutureListener<IoWriteFuture>() {
- @Override
- public void operationComplete(IoWriteFuture future) {
- close(true);
- }
- });
+ ioSession.write(new ByteArrayBuffer((msg + "\n").getBytes(StandardCharsets.UTF_8)))
+ .addListener(new SshFutureListener<IoWriteFuture>() {
+ @Override
+ public void operationComplete(IoWriteFuture future) {
+ close(true);
+ }
+ });
throw new SshException(msg);
} else {
kexState.set(KexState.INIT);
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/33d9dc65/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
index 666c9cd..f3b882d 100644
--- a/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/client/ClientTest.java
@@ -548,6 +548,7 @@ public class ClientTest extends BaseTestSupport {
public void operationComplete(IoReadFuture future) {
try {
future.verify(5L, TimeUnit.SECONDS);
+
Buffer buffer = future.getBuffer();
baosOut.write(buffer.array(), buffer.rpos(), buffer.available());
buffer.rpos(buffer.rpos() + buffer.available());
@@ -567,6 +568,7 @@ public class ClientTest extends BaseTestSupport {
public void operationComplete(IoReadFuture future) {
try {
future.verify(5L, TimeUnit.SECONDS);
+
Buffer buffer = future.getBuffer();
baosErr.write(buffer.array(), buffer.rpos(), buffer.available());
buffer.rpos(buffer.rpos() + buffer.available());
[5/6] mina-sshd git commit: Use the randomizer instance as lock for
some internal values instead of the "master" lock
Posted by lg...@apache.org.
Use the randomizer instance as lock for some internal values instead of the "master" lock
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/711e01a1
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/711e01a1
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/711e01a1
Branch: refs/heads/master
Commit: 711e01a1e9041692a84d33d98e3d8d19aa919ed8
Parents: 823540d
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Mon Feb 15 18:51:41 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Mon Feb 15 18:51:41 2016 +0200
----------------------------------------------------------------------
.../apache/sshd/common/session/AbstractSession.java | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/711e01a1/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index 8455313..daaa0c1 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -392,7 +392,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
* Refresh whatever internal configuration is not {@code final}
*/
protected void refreshConfiguration() {
- synchronized (lock) {
+ synchronized (random) {
// re-keying configuration
maxRekeyBytes = PropertyResolverUtils.getLongProperty(this, FactoryManager.REKEY_BYTES_LIMIT, maxRekeyBytes);
maxRekeyInterval = PropertyResolverUtils.getLongProperty(this, FactoryManager.REKEY_TIME_LIMIT, maxRekeyInterval);
@@ -915,7 +915,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
ignoreBuf.putInt(ignoreDataLen);
int wpos = ignoreBuf.wpos();
- synchronized (lock) {
+ synchronized (random) {
random.fill(ignoreBuf.array(), wpos, ignoreDataLen);
}
ignoreBuf.wpos(wpos + ignoreDataLen);
@@ -954,7 +954,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
return 0;
}
- synchronized (lock) {
+ synchronized (random) {
ignorePacketsCount.set(calculateNextIgnorePacketCount(random, ignorePacketsFrequency, ignorePacketsVariance));
return ignorePacketDataLength + random.random(ignorePacketDataLength);
}
@@ -1135,7 +1135,10 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
buffer.putByte((byte) pad);
// Fill padding
buffer.wpos(off + oldLen + SshConstants.SSH_PACKET_HEADER_LEN + pad);
- random.fill(buffer.array(), buffer.wpos() - pad, pad);
+ synchronized (random) {
+ random.fill(buffer.array(), buffer.wpos() - pad, pad);
+ }
+
// Compute mac
if (outMac != null) {
int macSize = outMac.getBlockSize();
@@ -1398,7 +1401,9 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
Buffer buffer = createBuffer(SshConstants.SSH_MSG_KEXINIT);
int p = buffer.wpos();
buffer.wpos(p + SshConstants.MSG_KEX_COOKIE_SIZE);
- random.fill(buffer.array(), p, SshConstants.MSG_KEX_COOKIE_SIZE);
+ synchronized (random) {
+ random.fill(buffer.array(), p, SshConstants.MSG_KEX_COOKIE_SIZE);
+ }
if (log.isTraceEnabled()) {
log.trace("sendKexInit(" + toString() + ") cookie=" + BufferUtils.toHex(buffer.array(), p, SshConstants.MSG_KEX_COOKIE_SIZE, ':'));
}
[4/6] mina-sshd git commit: Take into account IoWriteFuture#isWritten
result
Posted by lg...@apache.org.
Take into account IoWriteFuture#isWritten result
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/823540d4
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/823540d4
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/823540d4
Branch: refs/heads/master
Commit: 823540d4c68014cb02a272e04c743a4b14a29926
Parents: 2bde68e
Author: Lyor Goldstein <ly...@gmail.com>
Authored: Mon Feb 15 18:51:10 2016 +0200
Committer: Lyor Goldstein <ly...@gmail.com>
Committed: Mon Feb 15 18:51:10 2016 +0200
----------------------------------------------------------------------
.../channel/ChannelAsyncOutputStream.java | 63 +++++++++++++++++---
.../sshd/server/forward/TcpipServerChannel.java | 32 ++++++++--
.../java/org/apache/sshd/WindowAdjustTest.java | 16 +++--
.../sshd/util/test/AsyncEchoShellFactory.java | 17 ++++--
4 files changed, 104 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
index c790f29..7305937 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/ChannelAsyncOutputStream.java
@@ -28,20 +28,26 @@ import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.io.WritePendingException;
import org.apache.sshd.common.session.Session;
+import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.closeable.AbstractCloseable;
-public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOutputStream {
+public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOutputStream, ChannelHolder {
- private final Channel channel;
+ private final Channel channelInstance;
private final byte cmd;
private final AtomicReference<IoWriteFutureImpl> pendingWrite = new AtomicReference<>();
public ChannelAsyncOutputStream(Channel channel, byte cmd) {
- this.channel = channel;
+ this.channelInstance = ValidateUtils.checkNotNull(channel, "No channel");
this.cmd = cmd;
}
+ @Override
+ public Channel getChannel() {
+ return channelInstance;
+ }
+
public void onWindowExpanded() throws IOException {
doWriteIfPossible(true);
}
@@ -77,6 +83,7 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
final Buffer buffer = future.getBuffer();
final int total = buffer.available();
if (total > 0) {
+ Channel channel = getChannel();
Window remoteWindow = channel.getRemoteWindow();
final int length = Math.min(Math.min(remoteWindow.getSize(), total), remoteWindow.getPacketSize());
if (log.isTraceEnabled()) {
@@ -101,32 +108,72 @@ public class ChannelAsyncOutputStream extends AbstractCloseable implements IoOut
buffer.rpos(buffer.rpos() + length);
remoteWindow.consume(length);
try {
+ final ChannelAsyncOutputStream stream = this;
s.writePacket(buf).addListener(new SshFutureListener<IoWriteFuture>() {
- @SuppressWarnings("synthetic-access")
@Override
public void operationComplete(IoWriteFuture f) {
+ if (f.isWritten()) {
+ handleOperationCompleted();
+ } else {
+ handleOperationFailed(f.getException());
+ }
+ }
+
+ @SuppressWarnings("synthetic-access")
+ private void handleOperationCompleted() {
if (total > length) {
+ if (log.isTraceEnabled()) {
+ log.trace("doWriteIfPossible({}) completed write of {} out of {}", stream, length, total);
+ }
doWriteIfPossible(false);
} else {
- pendingWrite.compareAndSet(future, null);
+ boolean nullified = pendingWrite.compareAndSet(future, null);
+ if (log.isTraceEnabled()) {
+ log.trace("doWriteIfPossible({}) completed write len={}, more={}",
+ stream, total, !nullified);
+ }
future.setValue(Boolean.TRUE);
}
}
+
+ @SuppressWarnings("synthetic-access")
+ private void handleOperationFailed(Throwable reason) {
+ if (log.isDebugEnabled()) {
+ log.debug("doWriteIfPossible({}) failed ({}) to complete write of {} out of {}: {}",
+ stream, reason.getClass().getSimpleName(), length, total, reason.getMessage());
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("doWriteIfPossible(" + this + ") write failure details", reason);
+ }
+
+ boolean nullified = pendingWrite.compareAndSet(future, null);
+ if (log.isTraceEnabled()) {
+ log.trace("doWriteIfPossible({}) failed write len={}, more={}",
+ stream, total, !nullified);
+ }
+ future.setValue(reason);
+ }
});
} catch (IOException e) {
future.setValue(e);
}
} else if (!resume) {
- log.debug("Delaying write to {} until space is available in the remote window", this);
+ if (log.isDebugEnabled()) {
+ log.debug("doWriteIfPossible({}) delaying write until space is available in the remote window", this);
+ }
}
} else {
- pendingWrite.compareAndSet(future, null);
+ boolean nullified = pendingWrite.compareAndSet(future, null);
+ if (log.isTraceEnabled()) {
+ log.trace("doWriteIfPossible({}) current buffer sent - more={}", this, !nullified);
+ }
future.setValue(Boolean.TRUE);
}
}
@Override
public String toString() {
- return "ChannelAsyncOutputStream[" + channel + "]";
+ return getClass().getSimpleName() + "[" + getChannel() + "] cmd=" + SshConstants.getCommandMessageName(cmd & 0xFF);
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 11ff1b4..4b66a24 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -52,6 +52,7 @@ import org.apache.sshd.common.util.net.SshdSocketAddress;
import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
import org.apache.sshd.common.util.threads.ThreadUtils;
import org.apache.sshd.server.channel.AbstractServerChannel;
+import org.apache.sshd.server.channel.ServerChannel;
/**
* TODO Add javadoc
@@ -348,15 +349,34 @@ public class TcpipServerChannel extends AbstractServerChannel {
protected void doWriteData(byte[] data, int off, final int len) throws IOException {
// Make sure we copy the data as the incoming buffer may be reused
Buffer buf = ByteArrayBuffer.getCompactClone(data, off, len);
+ final ServerChannel channel = this;
ioSession.write(buf).addListener(new SshFutureListener<IoWriteFuture>() {
@Override
+ @SuppressWarnings("synthetic-access")
public void operationComplete(IoWriteFuture future) {
- try {
- Window wLocal = getLocalWindow();
- wLocal.consumeAndCheck(len);
- } catch (IOException e) {
- Session session = getSession();
- session.exceptionCaught(e);
+ Session session = getSession();
+ if (future.isWritten()) {
+ try {
+ Window wLocal = getLocalWindow();
+ wLocal.consumeAndCheck(len);
+ } catch (IOException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("doWriteData({}) failed ({}) to consume len={}: {}",
+ channel, e.getClass().getSimpleName(), len, e.getMessage());
+ }
+ session.exceptionCaught(e);
+ }
+ } else {
+ Throwable t = future.getException();
+ if (log.isDebugEnabled()) {
+ log.debug("doWriteData({}) failed ({}) to write len={}: {}",
+ channel, t.getClass().getSimpleName(), len, t.getMessage());
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("doWriteData(" + channel + ") len=" + len + " write failure details", t);
+ }
+ session.exceptionCaught(t);
}
}
});
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
index f7017b3..b20cfb7 100644
--- a/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/WindowAdjustTest.java
@@ -112,7 +112,6 @@ public class WindowAdjustTest extends BaseTestSupport {
@Test(timeout = 6L * 60L * 1000L)
public void testTrafficHeavyLoad() throws Exception {
-
try (SshClient client = setupTestClient()) {
client.start();
@@ -274,7 +273,7 @@ public class WindowAdjustTest extends BaseTestSupport {
/**
* Wrapper for asyncIn stream that catches Pending exception and queues the pending messages for later retry (send after previous messages were fully transfered)
*/
- private static class AsyncInPendingWrapper {
+ private static class AsyncInPendingWrapper extends AbstractLoggingBean {
private IoOutputStream asyncIn;
// Order has to be preserved for queued writes
@@ -309,11 +308,16 @@ public class WindowAdjustTest extends BaseTestSupport {
asyncIn.write(msg).addListener(new SshFutureListener<IoWriteFuture>() {
@SuppressWarnings("synthetic-access")
@Override
- public void operationComplete(final IoWriteFuture future) {
- if (wasPending) {
- pending.remove();
+ public void operationComplete(IoWriteFuture future) {
+ if (future.isWritten()) {
+ if (wasPending) {
+ pending.remove();
+ }
+ writePendingIfAny();
+ } else {
+ Throwable t = future.getException();
+ log.warn("Failed to write message", t);
}
- writePendingIfAny();
}
});
} catch (final WritePendingException e) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/823540d4/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
index a66c3b6..4623a52 100644
--- a/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
+++ b/sshd-core/src/test/java/org/apache/sshd/util/test/AsyncEchoShellFactory.java
@@ -25,11 +25,13 @@ import java.nio.charset.StandardCharsets;
import org.apache.sshd.common.Factory;
import org.apache.sshd.common.channel.BufferedIoOutputStream;
+import org.apache.sshd.common.channel.Window;
import org.apache.sshd.common.future.CloseFuture;
import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoInputStream;
import org.apache.sshd.common.io.IoOutputStream;
import org.apache.sshd.common.io.IoWriteFuture;
+import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
import org.apache.sshd.server.AsyncCommand;
import org.apache.sshd.server.ChannelSessionAware;
@@ -144,10 +146,17 @@ public class AsyncEchoShellFactory implements Factory<Command> {
out.write(new ByteArrayBuffer(bytes)).addListener(new SshFutureListener<IoWriteFuture>() {
@Override
public void operationComplete(IoWriteFuture future) {
- try {
- channel.getLocalWindow().consumeAndCheck(bytes.length);
- } catch (IOException e) {
- channel.getSession().exceptionCaught(e);
+ Session session = channel.getSession();
+ if (future.isWritten()) {
+ try {
+ Window wLocal = channel.getLocalWindow();
+ wLocal.consumeAndCheck(bytes.length);
+ } catch (IOException e) {
+ session.exceptionCaught(e);
+ }
+ } else {
+ Throwable t = future.getException();
+ session.exceptionCaught(t);
}
}
});