You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2014/01/30 15:51:03 UTC

[6/6] git commit: [SSHD-285] Automatic key re-rexchange from server

[SSHD-285] Automatic key re-rexchange from server

Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/d5b96d9f
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/d5b96d9f
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/d5b96d9f

Branch: refs/heads/master
Commit: d5b96d9fe6703ba43eedaf288eac6f5bf8c7e63c
Parents: df2133c
Author: Guillaume Nodet <gn...@apache.org>
Authored: Thu Jan 30 15:50:46 2014 +0100
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Thu Jan 30 15:50:46 2014 +0100

----------------------------------------------------------------------
 .../sshd/common/session/AbstractSession.java    | 156 ++++++++++++++++---
 .../sshd/server/ServerFactoryManager.java       |  14 ++
 .../sshd/server/session/ServerSession.java      |  21 ++-
 .../java/org/apache/sshd/KeyReExchangeTest.java |  98 +++++++++---
 4 files changed, 248 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d5b96d9f/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index efd5570..cc050e2 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -21,8 +21,10 @@ package org.apache.sshd.common.session;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReference;
@@ -148,6 +150,16 @@ public abstract class AbstractSession implements Session {
     protected final AtomicReference<Buffer> requestResult = new AtomicReference<Buffer>();
     protected final Map<AttributeKey<?>, Object> attributes = new ConcurrentHashMap<AttributeKey<?>, Object>();
 
+    //
+    // Rekeying
+    //
+    protected long inPackets;
+    protected long outPackets;
+    protected long inBytes;
+    protected long outBytes;
+    protected long lastKeyTime;
+    protected final Queue<PendingWriteFuture> pendingPackets = new LinkedList<PendingWriteFuture>();
+
     protected Service currentService;
 
     /**
@@ -288,6 +300,12 @@ public abstract class AbstractSession implements Session {
      * @throws Exception if an exeption occurs while handling this packet.
      */
     protected void handleMessage(Buffer buffer) throws Exception {
+        synchronized (lock) {
+            doHandleMessage(buffer);
+        }
+    }
+
+    protected void doHandleMessage(Buffer buffer) throws Exception {
         byte cmd = buffer.getByte();
         switch (cmd) {
             case SSH_MSG_DISCONNECT: {
@@ -361,6 +379,12 @@ public abstract class AbstractSession implements Session {
                     reexchangeFuture.setValue(true);
                 }
                 sendEvent(SessionListener.Event.KeyEstablished);
+                synchronized (pendingPackets) {
+                    PendingWriteFuture future;
+                    while ((future = pendingPackets.poll()) != null) {
+                        doWritePacket(future.getBuffer()).addListener(future);
+                    }
+                }
                 break;
             default:
                 if (cmd >= SshConstants.SSH_MSG_KEX_FIRST && cmd <= SshConstants.SSH_MSG_KEX_LAST) {
@@ -381,6 +405,7 @@ public abstract class AbstractSession implements Session {
                 }
                 break;
         }
+        checkRekey();
     }
 
     /**
@@ -468,16 +493,35 @@ public abstract class AbstractSession implements Session {
      * @throws java.io.IOException if an error occured when encoding sending the packet
      */
     public IoWriteFuture writePacket(Buffer buffer) throws IOException {
-        try {
-            // Synchronize all write requests as needed by the encoding algorithm
-            // and also queue the write request in this synchronized block to ensure
-            // packets are sent in the correct order
-            synchronized (encodeLock) {
-                encode(buffer);
-                return ioSession.write(buffer);
+        // While exchanging key, queue high level packets
+        if (kexState != KEX_STATE_DONE) {
+            byte cmd = buffer.array()[buffer.rpos()];
+            if (cmd > SshConstants.SSH_MSG_KEX_LAST) {
+                synchronized (pendingPackets) {
+                    if (kexState != KEX_STATE_DONE) {
+                        log.info("Flag packet {} as pending until key exchange is done", cmd);
+                        PendingWriteFuture future = new PendingWriteFuture(buffer);
+                        pendingPackets.add(future);
+                        return future;
+                    }
+                }
             }
+        }
+        try {
+            return doWritePacket(buffer);
         } finally {
             resetIdleTimeout();
+            checkRekey();
+        }
+    }
+
+    protected IoWriteFuture doWritePacket(Buffer buffer) throws IOException {
+        // Synchronize all write requests as needed by the encoding algorithm
+        // and also queue the write request in this synchronized block to ensure
+        // packets are sent in the correct order
+        synchronized (encodeLock) {
+            encode(buffer);
+            return ioSession.write(buffer);
         }
     }
 
@@ -600,6 +644,9 @@ public abstract class AbstractSession implements Session {
             }
             // Increment packet id
             seqo = (seqo + 1) & 0xffffffffL;
+            // Update stats
+            outPackets ++;
+            outBytes += len;
             // Make buffer ready to be read
             buffer.rpos(off);
         } catch (SshException e) {
@@ -689,6 +736,9 @@ public abstract class AbstractSession implements Session {
                     if (log.isTraceEnabled()) {
                         log.trace("Received packet #{}: {}", seqi, buf.printHex());
                     }
+                    // Update stats
+                    inPackets ++;
+                    inBytes += buf.available();
                     // Process decoded packet
                     handleMessage(buf);
                     // Set ready to handle next packet
@@ -965,6 +1015,11 @@ public abstract class AbstractSession implements Session {
         if (inCompression != null) {
             inCompression.init(Compression.Type.Inflater, -1);
         }
+        inBytes = 0;
+        outBytes = 0;
+        inPackets = 0;
+        outPackets = 0;
+        lastKeyTime = System.currentTimeMillis();
     }
 
     /**
@@ -1067,13 +1122,13 @@ public abstract class AbstractSession implements Session {
         }
         negotiated = guess;
         log.info("Kex: server->client {} {} {}",
-                new Object[] { negotiated[SshConstants.PROPOSAL_ENC_ALGS_STOC],
-                               negotiated[SshConstants.PROPOSAL_MAC_ALGS_STOC],
-                               negotiated[SshConstants.PROPOSAL_COMP_ALGS_STOC]});
+                new Object[]{negotiated[SshConstants.PROPOSAL_ENC_ALGS_STOC],
+                        negotiated[SshConstants.PROPOSAL_MAC_ALGS_STOC],
+                        negotiated[SshConstants.PROPOSAL_COMP_ALGS_STOC]});
         log.info("Kex: client->server {} {} {}",
-                new Object[] { negotiated[SshConstants.PROPOSAL_ENC_ALGS_CTOS],
-                               negotiated[SshConstants.PROPOSAL_MAC_ALGS_CTOS],
-                               negotiated[SshConstants.PROPOSAL_COMP_ALGS_CTOS]});
+                new Object[]{negotiated[SshConstants.PROPOSAL_ENC_ALGS_CTOS],
+                        negotiated[SshConstants.PROPOSAL_MAC_ALGS_CTOS],
+                        negotiated[SshConstants.PROPOSAL_COMP_ALGS_CTOS]});
     }
 
     protected void requestSuccess(Buffer buffer) throws Exception{
@@ -1109,6 +1164,18 @@ public abstract class AbstractSession implements Session {
         return defaultValue;
     }
 
+    public long getLongProperty(String name, long defaultValue) {
+        try {
+            String v = factoryManager.getProperties().get(name);
+            if (v != null) {
+                return Long.parseLong(v);
+            }
+        } catch (Exception e) {
+            // Ignore
+        }
+        return defaultValue;
+    }
+
     /**
      * Returns the value of the user-defined attribute of this session.
      *
@@ -1168,13 +1235,18 @@ public abstract class AbstractSession implements Session {
      * {@inheritDoc}
      */
     public SshFuture reExchangeKeys() throws IOException {
-        if (kexState != KEX_STATE_DONE) {
-            throw new IllegalStateException("Can not perform key re-exchange while key exchange is already running");
+        synchronized (lock) {
+            if (kexState == KEX_STATE_DONE) {
+                log.info("Initiating key re-exchange");
+                kexState = KEX_STATE_INIT;
+                sendKexInit();
+                reexchangeFuture = new DefaultSshFuture(null);
+            }
+            return reexchangeFuture;
         }
-        kexState = KEX_STATE_INIT;
-        sendKexInit();
-        reexchangeFuture = new DefaultSshFuture(null);
-        return reexchangeFuture;
+    }
+
+    protected void checkRekey() throws IOException {
     }
 
     protected abstract void sendKexInit() throws IOException;
@@ -1190,4 +1262,50 @@ public abstract class AbstractSession implements Session {
 
     public abstract void resetIdleTimeout();
 
+    /**
+     * Future holding a packet pending key exchange termination.
+     */
+    protected static class PendingWriteFuture extends DefaultSshFuture<IoWriteFuture>
+            implements IoWriteFuture, SshFutureListener<IoWriteFuture> {
+
+        private final Buffer buffer;
+
+        protected PendingWriteFuture(Buffer buffer) {
+            super(null);
+            this.buffer = buffer;
+        }
+
+        public Buffer getBuffer() {
+            return buffer;
+        }
+
+        public boolean isWritten() {
+            return getValue() instanceof Boolean;
+        }
+
+        public Throwable getException() {
+            Object v = getValue();
+            return v instanceof Throwable ? (Throwable) v : null;
+        }
+
+        public void setWritten() {
+            setValue(Boolean.TRUE);
+        }
+
+        public void setException(Throwable cause) {
+            if (cause == null) {
+                throw new IllegalArgumentException();
+            }
+            setValue(cause);
+        }
+
+        public void operationComplete(IoWriteFuture future) {
+            if (future.isWritten()) {
+                setWritten();
+            } else {
+                future.setException(future.getException());
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d5b96d9f/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
index 49bac66..2f6a1e3 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
@@ -88,6 +88,20 @@ public interface ServerFactoryManager extends FactoryManager {
     public static final String COMMAND_EXIT_TIMEOUT = "command-exit-timeout";
 
     /**
+     * Key re-exchange will be automatically performed after the session
+     * has sent or received the given amount of bytes.
+     * The default value is 1 gigabyte.
+     */
+    public static final String REKEY_BYTES_LIMIT = "rekey-bytes-limit";
+
+    /**
+     * Key re-exchange will be automatically performed after the specified
+     * amount of time has elapsed since the last key exchange. In milliseconds.
+     * The default value is 1 hour.
+     */
+    public static final String REKEY_TIME_LIMIT = "rekey-time-limit";
+
+    /**
      * Retrieve the list of named factories for <code>UserAuth<code> objects.
      *
      * @return a list of named <code>UserAuth</code> factories, never <code>null</code>

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d5b96d9f/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
index 4f10cdd..b92eea0 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSession.java
@@ -43,16 +43,23 @@ import org.apache.sshd.server.ServerFactoryManager;
  */
 public class ServerSession extends AbstractSession {
 
+    protected static final long MAX_PACKETS = (1l << 31);
+
     private long authTimeoutTimestamp;
     private long idleTimeoutTimestamp = 0L;
-    private int authTimeoutMs = 2 * 60 * 1000; // 2 minutes in milliseconds
-    private int idleTimeoutMs = 10 * 60 * 1000; // 10 minutes in milliseconds
+    private int authTimeoutMs = 2 * 60 * 1000;    // 2 minutes in milliseconds
+    private int idleTimeoutMs = 10 * 60 * 1000;   // 10 minutes in milliseconds
+    private long maxBytes = 1024 * 1024;          // 1 GB
+    private long maxKeyInterval = 60 * 60 * 1000; // 1 hour
+
 
     public ServerSession(ServerFactoryManager server, IoSession ioSession) throws Exception {
         super(true, server, ioSession);
         authTimeoutMs = getIntProperty(ServerFactoryManager.AUTH_TIMEOUT, authTimeoutMs);
         authTimeoutTimestamp = System.currentTimeMillis() + authTimeoutMs;
         idleTimeoutMs = getIntProperty(ServerFactoryManager.IDLE_TIMEOUT, idleTimeoutMs);
+        maxBytes = Math.max(32, getLongProperty(ServerFactoryManager.REKEY_BYTES_LIMIT, maxBytes));
+        maxKeyInterval = getLongProperty(ServerFactoryManager.REKEY_TIME_LIMIT, maxKeyInterval);
         log.info("Session created from {}", ioSession.getRemoteAddress());
         sendServerIdentification();
         kexState = KEX_STATE_INIT;
@@ -98,6 +105,16 @@ public class ServerSession extends AbstractSession {
         }
     }
 
+    protected void checkRekey() throws IOException {
+        if (kexState == KEX_STATE_DONE) {
+            if (   inPackets > MAX_PACKETS || outPackets > MAX_PACKETS
+                || inBytes > maxBytes || outBytes > maxBytes
+                || maxKeyInterval > 0 && System.currentTimeMillis() - lastKeyTime > maxKeyInterval)
+            {
+                reExchangeKeys();
+            }
+        }
+    }
     public void resetIdleTimeout() {
         this.idleTimeoutTimestamp = System.currentTimeMillis() + idleTimeoutMs;
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d5b96d9f/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
index e6b1c54..b4170fa 100644
--- a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
@@ -23,20 +23,13 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
-import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.jcraft.jsch.JSch;
 import org.apache.sshd.client.channel.ChannelShell;
-import org.apache.sshd.client.kex.DHG1;
-import org.apache.sshd.client.kex.DHG14;
-import org.apache.sshd.client.kex.DHGEX;
-import org.apache.sshd.client.kex.DHGEX256;
-import org.apache.sshd.client.kex.ECDHP256;
-import org.apache.sshd.client.kex.ECDHP384;
-import org.apache.sshd.client.kex.ECDHP521;
-import org.apache.sshd.common.KeyExchange;
-import org.apache.sshd.common.NamedFactory;
-import org.apache.sshd.common.util.SecurityUtils;
+import org.apache.sshd.common.Session;
+import org.apache.sshd.common.SessionListener;
+import org.apache.sshd.server.ServerFactoryManager;
 import org.apache.sshd.util.BogusPasswordAuthenticator;
 import org.apache.sshd.util.EchoShellFactory;
 import org.apache.sshd.util.JSchLogger;
@@ -44,7 +37,6 @@ import org.apache.sshd.util.SimpleUserInfo;
 import org.apache.sshd.util.TeeOutputStream;
 import org.apache.sshd.util.Utils;
 import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -61,11 +53,21 @@ public class KeyReExchangeTest {
     private SshServer sshd;
     private int port;
 
-    @Before
-    public void setUp() throws Exception {
+    @After
+    public void tearDown() throws Exception {
+        sshd.stop();
+    }
+
+    protected void setUp(long bytesLimit, long timeLimit) throws Exception {
         port = Utils.getFreePort();
 
         sshd = SshServer.setUpDefaultServer();
+        if (bytesLimit > 0) {
+            sshd.getProperties().put(ServerFactoryManager.REKEY_BYTES_LIMIT, Long.toString(bytesLimit));
+        }
+        if (timeLimit > 0) {
+            sshd.getProperties().put(ServerFactoryManager.REKEY_TIME_LIMIT, Long.toString(timeLimit));
+        }
         sshd.setPort(port);
         sshd.setKeyPairProvider(Utils.createTestHostKeyProvider());
         sshd.setShellFactory(new EchoShellFactory());
@@ -73,13 +75,10 @@ public class KeyReExchangeTest {
         sshd.start();
     }
 
-    @After
-    public void tearDown() throws Exception {
-        sshd.stop();
-    }
-
     @Test
     public void testReExchangeFromClient() throws Exception {
+        setUp(0, 0);
+
         JSchLogger.init();
         JSch.setConfig("kex", "diffie-hellman-group-exchange-sha1");
         JSch sch = new JSch();
@@ -105,6 +104,8 @@ public class KeyReExchangeTest {
 
     @Test
     public void testReExchangeFromNativeClient() throws Exception {
+        setUp(0, 0);
+
         SshClient client = SshClient.setUpDefaultClient();
         client.start();
         ClientSession session = client.connect("localhost", port).await().getSession();
@@ -133,7 +134,63 @@ public class KeyReExchangeTest {
         for (int i = 0; i < 10; i++) {
             teeOut.write(sb.toString().getBytes());
             teeOut.flush();
-            session.reExchangeKeys().await();
+            session.reExchangeKeys();
+        }
+        teeOut.write("exit\n".getBytes());
+        teeOut.flush();
+
+        channel.waitFor(ClientChannel.CLOSED, 0);
+
+        channel.close(false);
+        client.stop();
+
+        assertArrayEquals(sent.toByteArray(), out.toByteArray());
+    }
+
+    @Test
+    public void testReExchangeFromServer() throws Exception {
+        setUp(8192, 0);
+
+        SshClient client = SshClient.setUpDefaultClient();
+        client.start();
+        ClientSession session = client.connect("localhost", port).await().getSession();
+        session.authPassword("smx", "smx").await();
+        ChannelShell channel = session.createShellChannel();
+
+        ByteArrayOutputStream sent = new ByteArrayOutputStream();
+        PipedOutputStream pipedIn = new PipedOutputStream();
+        channel.setIn(new PipedInputStream(pipedIn));
+        OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        ByteArrayOutputStream err = new ByteArrayOutputStream();
+        channel.setOut(out);
+        channel.setErr(err);
+        channel.open();
+
+        teeOut.write("this is my command\n".getBytes());
+        teeOut.flush();
+
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 100; i++) {
+            sb.append("0123456789");
+        }
+        sb.append("\n");
+
+        final AtomicInteger exchanges = new AtomicInteger();
+        session.addListener(new SessionListener() {
+            public void sessionCreated(Session session) {
+            }
+            public void sessionEvent(Session sesssion, Event event) {
+                if (event == Event.KeyEstablished) {
+                    exchanges.incrementAndGet();
+                }
+            }
+            public void sessionClosed(Session session) {
+            }
+        });
+        for (int i = 0; i < 100; i++) {
+            teeOut.write(sb.toString().getBytes());
+            teeOut.flush();
         }
         teeOut.write("exit\n".getBytes());
         teeOut.flush();
@@ -143,6 +200,7 @@ public class KeyReExchangeTest {
         channel.close(false);
         client.stop();
 
+        assertTrue("Expected rekeying", exchanges.get() > 0);
         assertArrayEquals(sent.toByteArray(), out.toByteArray());
     }
 }