You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2014/02/13 13:09:49 UTC
[2/2] git commit: Fix synchronisation problem which causes out of
order packets during key reexchange
Fix synchronisation problem which causes out of order packets during key reexchange
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/f9a9ce3a
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/f9a9ce3a
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/f9a9ce3a
Branch: refs/heads/master
Commit: f9a9ce3a47555ded36c3a20b1dd390ba252d62ce
Parents: 1a2626e
Author: Guillaume Nodet <gn...@apache.org>
Authored: Thu Feb 13 13:09:41 2014 +0100
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Thu Feb 13 13:09:41 2014 +0100
----------------------------------------------------------------------
.../sshd/client/session/ClientSessionImpl.java | 2 +-
.../sshd/common/session/AbstractSession.java | 63 +++++++++++---------
.../sshd/server/session/ServerSession.java | 5 +-
.../java/org/apache/sshd/KeyReExchangeTest.java | 1 +
4 files changed, 39 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f9a9ce3a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
index c4a7f86..1298acf 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientSessionImpl.java
@@ -98,7 +98,7 @@ public class ClientSessionImpl extends AbstractSession implements ClientSession
authFuture = new DefaultAuthFuture(lock);
authFuture.setAuthed(false);
sendClientIdentification();
- kexState = KEX_STATE_INIT;
+ kexState.set(KEX_STATE_INIT);
sendKexInit();
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f9a9ce3a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index 6ffeb8c..22b8229 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.sshd.common.Cipher;
@@ -110,7 +111,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
protected byte[] I_C; // the payload of the client's SSH_MSG_KEXINIT
protected byte[] I_S; // the payload of the factoryManager's SSH_MSG_KEXINIT
protected KeyExchange kex;
- protected int kexState;
+ protected final AtomicInteger kexState = new AtomicInteger();
protected DefaultSshFuture reexchangeFuture;
//
@@ -140,11 +141,11 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
//
// Rekeying
//
- protected long inPackets;
- protected long outPackets;
- protected long inBytes;
- protected long outBytes;
- protected long lastKeyTime;
+ protected volatile long inPackets;
+ protected volatile long outPackets;
+ protected volatile long inBytes;
+ protected volatile long outBytes;
+ protected volatile long lastKeyTime;
protected final Queue<PendingWriteFuture> pendingPackets = new LinkedList<PendingWriteFuture>();
protected Service currentService;
@@ -323,7 +324,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
case SSH_MSG_SERVICE_REQUEST:
String service = buffer.getString();
log.debug("Received SSH_MSG_SERVICE_REQUEST '{}'", service);
- if (kexState != KEX_STATE_DONE) {
+ if (kexState.get() != KEX_STATE_DONE) {
throw new IllegalStateException("Received command " + cmd + " before key exchange is finished");
}
try {
@@ -340,7 +341,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
break;
case SSH_MSG_SERVICE_ACCEPT:
log.debug("Received SSH_MSG_SERVICE_ACCEPT");
- if (kexState != KEX_STATE_DONE) {
+ if (kexState.get() != KEX_STATE_DONE) {
throw new IllegalStateException("Received command " + cmd + " before key exchange is finished");
}
serviceAccept();
@@ -348,44 +349,48 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
case SSH_MSG_KEXINIT:
log.debug("Received SSH_MSG_KEXINIT");
receiveKexInit(buffer);
- if (kexState == KEX_STATE_DONE) {
+ if (kexState.compareAndSet(KEX_STATE_DONE, KEX_STATE_RUN)) {
sendKexInit();
- } else if (kexState != KEX_STATE_INIT) {
+ } else if (!kexState.compareAndSet(KEX_STATE_INIT, KEX_STATE_RUN)) {
throw new IllegalStateException("Received SSH_MSG_KEXINIT while key exchange is running");
}
- kexState = KEX_STATE_RUN;
negotiate();
kex = NamedFactory.Utils.create(factoryManager.getKeyExchangeFactories(), negotiated[SshConstants.PROPOSAL_KEX_ALGS]);
kex.init(this, serverVersion.getBytes(), clientVersion.getBytes(), I_S, I_C);
break;
case SSH_MSG_NEWKEYS:
log.debug("Received SSH_MSG_NEWKEYS");
- if (kexState != KEX_STATE_KEYS) {
+ if (kexState.get() != KEX_STATE_KEYS) {
throw new IllegalStateException("Received command " + cmd + " before key exchange is finished");
}
receiveNewKeys();
- kexState = KEX_STATE_DONE;
if (reexchangeFuture != null) {
reexchangeFuture.setValue(true);
}
sendEvent(SessionListener.Event.KeyEstablished);
synchronized (pendingPackets) {
- PendingWriteFuture future;
- while ((future = pendingPackets.poll()) != null) {
- doWritePacket(future.getBuffer()).addListener(future);
+ if (!pendingPackets.isEmpty()) {
+ log.info("Dequeing pending packets");
+ synchronized (encodeLock) {
+ PendingWriteFuture future;
+ while ((future = pendingPackets.poll()) != null) {
+ doWritePacket(future.getBuffer()).addListener(future);
+ }
+ }
}
+ kexState.set(KEX_STATE_DONE);
}
break;
default:
if (cmd >= SshConstants.SSH_MSG_KEX_FIRST && cmd <= SshConstants.SSH_MSG_KEX_LAST) {
- if (kexState != KEX_STATE_RUN) {
+ if (kexState.get() != KEX_STATE_RUN) {
throw new IllegalStateException("Received kex command " + cmd + " while not in key exchange");
}
buffer.rpos(buffer.rpos() - 1);
if (kex.next(buffer)) {
checkKeys();
sendNewKeys();
- kexState = KEX_STATE_KEYS;
+ kexState.set(KEX_STATE_KEYS);
}
} else if (currentService != null) {
currentService.process(cmd, buffer);
@@ -457,12 +462,14 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
*/
public IoWriteFuture writePacket(Buffer buffer) throws IOException {
// While exchanging key, queue high level packets
- if (kexState != KEX_STATE_DONE) {
+ if (kexState.get() != KEX_STATE_DONE) {
byte cmd = buffer.array()[buffer.rpos()];
if (cmd > SshConstants.SSH_MSG_KEX_LAST) {
synchronized (pendingPackets) {
- if (kexState != KEX_STATE_DONE) {
- log.info("Flag packet {} as pending until key exchange is done", cmd);
+ if (kexState.get() != KEX_STATE_DONE) {
+ if (pendingPackets.isEmpty()) {
+ log.info("Start flagging packets as pending until key exchange is done");
+ }
PendingWriteFuture future = new PendingWriteFuture(buffer);
pendingPackets.add(future);
return future;
@@ -821,6 +828,7 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
* @throws IOException if an error occurred sending the packet
*/
protected byte[] sendKexInit(String[] proposal) throws IOException {
+ log.debug("Send SSH_MSG_KEXINIT");
Buffer buffer = createBuffer(SshConstants.SSH_MSG_KEXINIT);
int p = buffer.wpos();
buffer.wpos(p + 16);
@@ -1203,15 +1211,12 @@ public abstract class AbstractSession extends CloseableUtils.AbstractInnerClosea
* {@inheritDoc}
*/
public SshFuture reExchangeKeys() throws IOException {
- synchronized (lock) {
- if (kexState == KEX_STATE_DONE) {
- log.info("Initiating key re-exchange");
- kexState = KEX_STATE_INIT;
- sendKexInit();
- reexchangeFuture = new DefaultSshFuture(null);
- }
- return reexchangeFuture;
+ if (kexState.compareAndSet(KEX_STATE_DONE, KEX_STATE_INIT)) {
+ log.info("Initiating key re-exchange");
+ sendKexInit();
+ reexchangeFuture = new DefaultSshFuture(null);
}
+ return reexchangeFuture;
}
protected void checkRekey() throws IOException {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f9a9ce3a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
index b5597a2..169e434 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
@@ -56,7 +56,7 @@ public class ServerSession extends AbstractSession {
maxKeyInterval = getLongProperty(ServerFactoryManager.REKEY_TIME_LIMIT, maxKeyInterval);
log.info("Server session created from {}", ioSession.getRemoteAddress());
sendServerIdentification();
- kexState = KEX_STATE_INIT;
+ kexState.set(KEX_STATE_INIT);
sendKexInit();
}
@@ -100,7 +100,7 @@ public class ServerSession extends AbstractSession {
}
protected void checkRekey() throws IOException {
- if (kexState == KEX_STATE_DONE) {
+ if (kexState.get() == KEX_STATE_DONE) {
if ( inPackets > MAX_PACKETS || outPackets > MAX_PACKETS
|| inBytes > maxBytes || outBytes > maxBytes
|| maxKeyInterval > 0 && System.currentTimeMillis() - lastKeyTime > maxKeyInterval)
@@ -109,6 +109,7 @@ public class ServerSession extends AbstractSession {
}
}
}
+
public void resetIdleTimeout() {
this.idleTimeoutTimestamp = System.currentTimeMillis() + idleTimeoutMs;
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/f9a9ce3a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
index 590e3ba..4be1137 100644
--- a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
@@ -202,6 +202,7 @@ public class KeyReExchangeTest extends BaseTest {
client.stop();
assertTrue("Expected rekeying", exchanges.get() > 0);
+ assertEquals(sent.toByteArray().length, out.toByteArray().length);
assertArrayEquals(sent.toByteArray(), out.toByteArray());
}
}