You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2014/01/30 15:51:03 UTC
[6/6] git commit: [SSHD-285] Automatic key re-rexchange from server
[SSHD-285] Automatic key re-rexchange from server
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/d5b96d9f
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/d5b96d9f
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/d5b96d9f
Branch: refs/heads/master
Commit: d5b96d9fe6703ba43eedaf288eac6f5bf8c7e63c
Parents: df2133c
Author: Guillaume Nodet <gn...@apache.org>
Authored: Thu Jan 30 15:50:46 2014 +0100
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Thu Jan 30 15:50:46 2014 +0100
----------------------------------------------------------------------
.../sshd/common/session/AbstractSession.java | 156 ++++++++++++++++---
.../sshd/server/ServerFactoryManager.java | 14 ++
.../sshd/server/session/ServerSession.java | 21 ++-
.../java/org/apache/sshd/KeyReExchangeTest.java | 98 +++++++++---
4 files changed, 248 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d5b96d9f/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index efd5570..cc050e2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -21,8 +21,10 @@ package org.apache.sshd.common.session;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
@@ -148,6 +150,16 @@ public abstract class AbstractSession implements Session {
protected final AtomicReference<Buffer> requestResult = new AtomicReference<Buffer>();
protected final Map<AttributeKey<?>, Object> attributes = new ConcurrentHashMap<AttributeKey<?>, Object>();
+ //
+ // Rekeying
+ //
+ protected long inPackets;
+ protected long outPackets;
+ protected long inBytes;
+ protected long outBytes;
+ protected long lastKeyTime;
+ protected final Queue<PendingWriteFuture> pendingPackets = new LinkedList<PendingWriteFuture>();
+
protected Service currentService;
/**
@@ -288,6 +300,12 @@ public abstract class AbstractSession implements Session {
* @throws Exception if an exeption occurs while handling this packet.
*/
protected void handleMessage(Buffer buffer) throws Exception {
+ synchronized (lock) {
+ doHandleMessage(buffer);
+ }
+ }
+
+ protected void doHandleMessage(Buffer buffer) throws Exception {
byte cmd = buffer.getByte();
switch (cmd) {
case SSH_MSG_DISCONNECT: {
@@ -361,6 +379,12 @@ public abstract class AbstractSession implements Session {
reexchangeFuture.setValue(true);
}
sendEvent(SessionListener.Event.KeyEstablished);
+ synchronized (pendingPackets) {
+ PendingWriteFuture future;
+ while ((future = pendingPackets.poll()) != null) {
+ doWritePacket(future.getBuffer()).addListener(future);
+ }
+ }
break;
default:
if (cmd >= SshConstants.SSH_MSG_KEX_FIRST && cmd <= SshConstants.SSH_MSG_KEX_LAST) {
@@ -381,6 +405,7 @@ public abstract class AbstractSession implements Session {
}
break;
}
+ checkRekey();
}
/**
@@ -468,16 +493,35 @@ public abstract class AbstractSession implements Session {
* @throws java.io.IOException if an error occured when encoding sending the packet
*/
public IoWriteFuture writePacket(Buffer buffer) throws IOException {
- try {
- // Synchronize all write requests as needed by the encoding algorithm
- // and also queue the write request in this synchronized block to ensure
- // packets are sent in the correct order
- synchronized (encodeLock) {
- encode(buffer);
- return ioSession.write(buffer);
+ // While exchanging key, queue high level packets
+ if (kexState != KEX_STATE_DONE) {
+ byte cmd = buffer.array()[buffer.rpos()];
+ if (cmd > SshConstants.SSH_MSG_KEX_LAST) {
+ synchronized (pendingPackets) {
+ if (kexState != KEX_STATE_DONE) {
+ log.info("Flag packet {} as pending until key exchange is done", cmd);
+ PendingWriteFuture future = new PendingWriteFuture(buffer);
+ pendingPackets.add(future);
+ return future;
+ }
+ }
}
+ }
+ try {
+ return doWritePacket(buffer);
} finally {
resetIdleTimeout();
+ checkRekey();
+ }
+ }
+
+ protected IoWriteFuture doWritePacket(Buffer buffer) throws IOException {
+ // Synchronize all write requests as needed by the encoding algorithm
+ // and also queue the write request in this synchronized block to ensure
+ // packets are sent in the correct order
+ synchronized (encodeLock) {
+ encode(buffer);
+ return ioSession.write(buffer);
}
}
@@ -600,6 +644,9 @@ public abstract class AbstractSession implements Session {
}
// Increment packet id
seqo = (seqo + 1) & 0xffffffffL;
+ // Update stats
+ outPackets ++;
+ outBytes += len;
// Make buffer ready to be read
buffer.rpos(off);
} catch (SshException e) {
@@ -689,6 +736,9 @@ public abstract class AbstractSession implements Session {
if (log.isTraceEnabled()) {
log.trace("Received packet #{}: {}", seqi, buf.printHex());
}
+ // Update stats
+ inPackets ++;
+ inBytes += buf.available();
// Process decoded packet
handleMessage(buf);
// Set ready to handle next packet
@@ -965,6 +1015,11 @@ public abstract class AbstractSession implements Session {
if (inCompression != null) {
inCompression.init(Compression.Type.Inflater, -1);
}
+ inBytes = 0;
+ outBytes = 0;
+ inPackets = 0;
+ outPackets = 0;
+ lastKeyTime = System.currentTimeMillis();
}
/**
@@ -1067,13 +1122,13 @@ public abstract class AbstractSession implements Session {
}
negotiated = guess;
log.info("Kex: server->client {} {} {}",
- new Object[] { negotiated[SshConstants.PROPOSAL_ENC_ALGS_STOC],
- negotiated[SshConstants.PROPOSAL_MAC_ALGS_STOC],
- negotiated[SshConstants.PROPOSAL_COMP_ALGS_STOC]});
+ new Object[]{negotiated[SshConstants.PROPOSAL_ENC_ALGS_STOC],
+ negotiated[SshConstants.PROPOSAL_MAC_ALGS_STOC],
+ negotiated[SshConstants.PROPOSAL_COMP_ALGS_STOC]});
log.info("Kex: client->server {} {} {}",
- new Object[] { negotiated[SshConstants.PROPOSAL_ENC_ALGS_CTOS],
- negotiated[SshConstants.PROPOSAL_MAC_ALGS_CTOS],
- negotiated[SshConstants.PROPOSAL_COMP_ALGS_CTOS]});
+ new Object[]{negotiated[SshConstants.PROPOSAL_ENC_ALGS_CTOS],
+ negotiated[SshConstants.PROPOSAL_MAC_ALGS_CTOS],
+ negotiated[SshConstants.PROPOSAL_COMP_ALGS_CTOS]});
}
protected void requestSuccess(Buffer buffer) throws Exception{
@@ -1109,6 +1164,18 @@ public abstract class AbstractSession implements Session {
return defaultValue;
}
+ public long getLongProperty(String name, long defaultValue) {
+ try {
+ String v = factoryManager.getProperties().get(name);
+ if (v != null) {
+ return Long.parseLong(v);
+ }
+ } catch (Exception e) {
+ // Ignore
+ }
+ return defaultValue;
+ }
+
/**
* Returns the value of the user-defined attribute of this session.
*
@@ -1168,13 +1235,18 @@ public abstract class AbstractSession implements Session {
* {@inheritDoc}
*/
public SshFuture reExchangeKeys() throws IOException {
- if (kexState != KEX_STATE_DONE) {
- throw new IllegalStateException("Can not perform key re-exchange while key exchange is already running");
+ synchronized (lock) {
+ if (kexState == KEX_STATE_DONE) {
+ log.info("Initiating key re-exchange");
+ kexState = KEX_STATE_INIT;
+ sendKexInit();
+ reexchangeFuture = new DefaultSshFuture(null);
+ }
+ return reexchangeFuture;
}
- kexState = KEX_STATE_INIT;
- sendKexInit();
- reexchangeFuture = new DefaultSshFuture(null);
- return reexchangeFuture;
+ }
+
+ protected void checkRekey() throws IOException {
}
protected abstract void sendKexInit() throws IOException;
@@ -1190,4 +1262,50 @@ public abstract class AbstractSession implements Session {
public abstract void resetIdleTimeout();
+ /**
+ * Future holding a packet pending key exchange termination.
+ */
+ protected static class PendingWriteFuture extends DefaultSshFuture<IoWriteFuture>
+ implements IoWriteFuture, SshFutureListener<IoWriteFuture> {
+
+ private final Buffer buffer;
+
+ protected PendingWriteFuture(Buffer buffer) {
+ super(null);
+ this.buffer = buffer;
+ }
+
+ public Buffer getBuffer() {
+ return buffer;
+ }
+
+ public boolean isWritten() {
+ return getValue() instanceof Boolean;
+ }
+
+ public Throwable getException() {
+ Object v = getValue();
+ return v instanceof Throwable ? (Throwable) v : null;
+ }
+
+ public void setWritten() {
+ setValue(Boolean.TRUE);
+ }
+
+ public void setException(Throwable cause) {
+ if (cause == null) {
+ throw new IllegalArgumentException();
+ }
+ setValue(cause);
+ }
+
+ public void operationComplete(IoWriteFuture future) {
+ if (future.isWritten()) {
+ setWritten();
+ } else {
+ future.setException(future.getException());
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d5b96d9f/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
index 49bac66..2f6a1e3 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
@@ -88,6 +88,20 @@ public interface ServerFactoryManager extends FactoryManager {
public static final String COMMAND_EXIT_TIMEOUT = "command-exit-timeout";
/**
+ * Key re-exchange will be automatically performed after the session
+ * has sent or received the given amount of bytes.
+ * The default value is 1 gigabyte.
+ */
+ public static final String REKEY_BYTES_LIMIT = "rekey-bytes-limit";
+
+ /**
+ * Key re-exchange will be automatically performed after the specified
+ * amount of time has elapsed since the last key exchange. In milliseconds.
+ * The default value is 1 hour.
+ */
+ public static final String REKEY_TIME_LIMIT = "rekey-time-limit";
+
+ /**
* Retrieve the list of named factories for <code>UserAuth<code> objects.
*
* @return a list of named <code>UserAuth</code> factories, never <code>null</code>
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d5b96d9f/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
index 4f10cdd..b92eea0 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
@@ -43,16 +43,23 @@ import org.apache.sshd.server.ServerFactoryManager;
*/
public class ServerSession extends AbstractSession {
+ protected static final long MAX_PACKETS = (1l << 31);
+
private long authTimeoutTimestamp;
private long idleTimeoutTimestamp = 0L;
- private int authTimeoutMs = 2 * 60 * 1000; // 2 minutes in milliseconds
- private int idleTimeoutMs = 10 * 60 * 1000; // 10 minutes in milliseconds
+ private int authTimeoutMs = 2 * 60 * 1000; // 2 minutes in milliseconds
+ private int idleTimeoutMs = 10 * 60 * 1000; // 10 minutes in milliseconds
+ private long maxBytes = 1024 * 1024; // 1 GB
+ private long maxKeyInterval = 60 * 60 * 1000; // 1 hour
+
public ServerSession(ServerFactoryManager server, IoSession ioSession) throws Exception {
super(true, server, ioSession);
authTimeoutMs = getIntProperty(ServerFactoryManager.AUTH_TIMEOUT, authTimeoutMs);
authTimeoutTimestamp = System.currentTimeMillis() + authTimeoutMs;
idleTimeoutMs = getIntProperty(ServerFactoryManager.IDLE_TIMEOUT, idleTimeoutMs);
+ maxBytes = Math.max(32, getLongProperty(ServerFactoryManager.REKEY_BYTES_LIMIT, maxBytes));
+ maxKeyInterval = getLongProperty(ServerFactoryManager.REKEY_TIME_LIMIT, maxKeyInterval);
log.info("Session created from {}", ioSession.getRemoteAddress());
sendServerIdentification();
kexState = KEX_STATE_INIT;
@@ -98,6 +105,16 @@ public class ServerSession extends AbstractSession {
}
}
+ protected void checkRekey() throws IOException {
+ if (kexState == KEX_STATE_DONE) {
+ if ( inPackets > MAX_PACKETS || outPackets > MAX_PACKETS
+ || inBytes > maxBytes || outBytes > maxBytes
+ || maxKeyInterval > 0 && System.currentTimeMillis() - lastKeyTime > maxKeyInterval)
+ {
+ reExchangeKeys();
+ }
+ }
+ }
public void resetIdleTimeout() {
this.idleTimeoutTimestamp = System.currentTimeMillis() + idleTimeoutMs;
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d5b96d9f/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
index e6b1c54..b4170fa 100644
--- a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
@@ -23,20 +23,13 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
-import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
import com.jcraft.jsch.JSch;
import org.apache.sshd.client.channel.ChannelShell;
-import org.apache.sshd.client.kex.DHG1;
-import org.apache.sshd.client.kex.DHG14;
-import org.apache.sshd.client.kex.DHGEX;
-import org.apache.sshd.client.kex.DHGEX256;
-import org.apache.sshd.client.kex.ECDHP256;
-import org.apache.sshd.client.kex.ECDHP384;
-import org.apache.sshd.client.kex.ECDHP521;
-import org.apache.sshd.common.KeyExchange;
-import org.apache.sshd.common.NamedFactory;
-import org.apache.sshd.common.util.SecurityUtils;
+import org.apache.sshd.common.Session;
+import org.apache.sshd.common.SessionListener;
+import org.apache.sshd.server.ServerFactoryManager;
import org.apache.sshd.util.BogusPasswordAuthenticator;
import org.apache.sshd.util.EchoShellFactory;
import org.apache.sshd.util.JSchLogger;
@@ -44,7 +37,6 @@ import org.apache.sshd.util.SimpleUserInfo;
import org.apache.sshd.util.TeeOutputStream;
import org.apache.sshd.util.Utils;
import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
@@ -61,11 +53,21 @@ public class KeyReExchangeTest {
private SshServer sshd;
private int port;
- @Before
- public void setUp() throws Exception {
+ @After
+ public void tearDown() throws Exception {
+ sshd.stop();
+ }
+
+ protected void setUp(long bytesLimit, long timeLimit) throws Exception {
port = Utils.getFreePort();
sshd = SshServer.setUpDefaultServer();
+ if (bytesLimit > 0) {
+ sshd.getProperties().put(ServerFactoryManager.REKEY_BYTES_LIMIT, Long.toString(bytesLimit));
+ }
+ if (timeLimit > 0) {
+ sshd.getProperties().put(ServerFactoryManager.REKEY_TIME_LIMIT, Long.toString(timeLimit));
+ }
sshd.setPort(port);
sshd.setKeyPairProvider(Utils.createTestHostKeyProvider());
sshd.setShellFactory(new EchoShellFactory());
@@ -73,13 +75,10 @@ public class KeyReExchangeTest {
sshd.start();
}
- @After
- public void tearDown() throws Exception {
- sshd.stop();
- }
-
@Test
public void testReExchangeFromClient() throws Exception {
+ setUp(0, 0);
+
JSchLogger.init();
JSch.setConfig("kex", "diffie-hellman-group-exchange-sha1");
JSch sch = new JSch();
@@ -105,6 +104,8 @@ public class KeyReExchangeTest {
@Test
public void testReExchangeFromNativeClient() throws Exception {
+ setUp(0, 0);
+
SshClient client = SshClient.setUpDefaultClient();
client.start();
ClientSession session = client.connect("localhost", port).await().getSession();
@@ -133,7 +134,63 @@ public class KeyReExchangeTest {
for (int i = 0; i < 10; i++) {
teeOut.write(sb.toString().getBytes());
teeOut.flush();
- session.reExchangeKeys().await();
+ session.reExchangeKeys();
+ }
+ teeOut.write("exit\n".getBytes());
+ teeOut.flush();
+
+ channel.waitFor(ClientChannel.CLOSED, 0);
+
+ channel.close(false);
+ client.stop();
+
+ assertArrayEquals(sent.toByteArray(), out.toByteArray());
+ }
+
+ @Test
+ public void testReExchangeFromServer() throws Exception {
+ setUp(8192, 0);
+
+ SshClient client = SshClient.setUpDefaultClient();
+ client.start();
+ ClientSession session = client.connect("localhost", port).await().getSession();
+ session.authPassword("smx", "smx").await();
+ ChannelShell channel = session.createShellChannel();
+
+ ByteArrayOutputStream sent = new ByteArrayOutputStream();
+ PipedOutputStream pipedIn = new PipedOutputStream();
+ channel.setIn(new PipedInputStream(pipedIn));
+ OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayOutputStream err = new ByteArrayOutputStream();
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open();
+
+ teeOut.write("this is my command\n".getBytes());
+ teeOut.flush();
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 100; i++) {
+ sb.append("0123456789");
+ }
+ sb.append("\n");
+
+ final AtomicInteger exchanges = new AtomicInteger();
+ session.addListener(new SessionListener() {
+ public void sessionCreated(Session session) {
+ }
+ public void sessionEvent(Session sesssion, Event event) {
+ if (event == Event.KeyEstablished) {
+ exchanges.incrementAndGet();
+ }
+ }
+ public void sessionClosed(Session session) {
+ }
+ });
+ for (int i = 0; i < 100; i++) {
+ teeOut.write(sb.toString().getBytes());
+ teeOut.flush();
}
teeOut.write("exit\n".getBytes());
teeOut.flush();
@@ -143,6 +200,7 @@ public class KeyReExchangeTest {
channel.close(false);
client.stop();
+ assertTrue("Expected rekeying", exchanges.get() > 0);
assertArrayEquals(sent.toByteArray(), out.toByteArray());
}
}