You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2015/11/29 07:43:12 UTC

mina-sshd git commit: [SSHD-601] Add configurable re-key packets limit

Repository: mina-sshd
Updated Branches:
  refs/heads/master b07095c9b -> 0a4becb3b


[SSHD-601] Add configurable re-key packets limit


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/0a4becb3
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/0a4becb3
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/0a4becb3

Branch: refs/heads/master
Commit: 0a4becb3bc1789ad0cf929cae1e3fa6af5e33a57
Parents: b07095c
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Sun Nov 29 08:42:58 2015 +0200
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Sun Nov 29 08:42:58 2015 +0200

----------------------------------------------------------------------
 .../apache/sshd/common/cipher/BaseCipher.java   |  40 +++-
 .../sshd/common/cipher/BaseRC4Cipher.java       |   8 +-
 .../org/apache/sshd/common/cipher/Cipher.java   |  10 +
 .../apache/sshd/common/cipher/CipherNone.java   |  10 +
 .../sshd/common/session/AbstractSession.java    | 225 ++++++++++++++-----
 .../sshd/common/util/buffer/BufferUtils.java    |   1 +
 .../sshd/server/ServerFactoryManager.java       |  41 +++-
 .../sshd/server/session/ServerSessionImpl.java  |  23 +-
 .../java/org/apache/sshd/KeyReExchangeTest.java | 151 ++++++++++---
 9 files changed, 388 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseCipher.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseCipher.java b/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseCipher.java
index 0b85a47..c6b534b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseCipher.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseCipher.java
@@ -24,6 +24,7 @@ import javax.crypto.spec.SecretKeySpec;
 import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.SecurityUtils;
+import org.apache.sshd.common.util.ValidateUtils;
 
 /**
  * Base class for all Cipher implementations delegating to the JCE provider.
@@ -32,17 +33,27 @@ import org.apache.sshd.common.util.SecurityUtils;
  */
 public class BaseCipher implements Cipher {
 
-    protected final int ivsize;
-    protected final int bsize;
-    protected final String algorithm;
-    protected final String transformation;
     protected javax.crypto.Cipher cipher;
+    private final int ivsize;
+    private final int bsize;
+    private final String algorithm;
+    private final String transformation;
 
     public BaseCipher(int ivsize, int bsize, String algorithm, String transformation) {
         this.ivsize = ivsize;
         this.bsize = bsize;
-        this.algorithm = algorithm;
-        this.transformation = transformation;
+        this.algorithm = ValidateUtils.checkNotNullAndNotEmpty(algorithm, "No algorithm");
+        this.transformation = ValidateUtils.checkNotNullAndNotEmpty(transformation, "No transformation");
+    }
+
+    @Override
+    public String getAlgorithm() {
+        return algorithm;
+    }
+
+    @Override
+    public String getTransformation() {
+        return transformation;
     }
 
     @Override
@@ -57,12 +68,12 @@ public class BaseCipher implements Cipher {
 
     @Override
     public void init(Mode mode, byte[] key, byte[] iv) throws Exception {
-        key = resize(key, bsize);
-        iv = resize(iv, ivsize);
+        key = resize(key, getBlockSize());
+        iv = resize(iv, getIVSize());
         try {
-            cipher = SecurityUtils.getCipher(transformation);
-            cipher.init(mode == Mode.Encrypt ? javax.crypto.Cipher.ENCRYPT_MODE : javax.crypto.Cipher.DECRYPT_MODE,
-                    new SecretKeySpec(key, algorithm),
+            cipher = SecurityUtils.getCipher(getTransformation());
+            cipher.init(Mode.Encrypt.equals(mode) ? javax.crypto.Cipher.ENCRYPT_MODE : javax.crypto.Cipher.DECRYPT_MODE,
+                    new SecretKeySpec(key, getAlgorithm()),
                     new IvParameterSpec(iv));
         } catch (Exception e) {
             cipher = null;
@@ -91,6 +102,11 @@ public class BaseCipher implements Cipher {
 
     @Override
     public String toString() {
-        return getClass().getSimpleName() + "[" + algorithm + "," + ivsize + "," + bsize + "," + transformation + "]";
+        return getClass().getSimpleName()
+             + "[" + getAlgorithm()
+             + "," + getIVSize()
+             + "," + getBlockSize()
+             + "," + getTransformation()
+             + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseRC4Cipher.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseRC4Cipher.java b/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseRC4Cipher.java
index 19c9497..016ecee 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseRC4Cipher.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseRC4Cipher.java
@@ -35,11 +35,11 @@ public class BaseRC4Cipher extends BaseCipher {
 
     @Override
     public void init(Mode mode, byte[] key, byte[] iv) throws Exception {
-        key = resize(key, bsize);
+        key = resize(key, getBlockSize());
         try {
-            cipher = SecurityUtils.getCipher(transformation);
-            cipher.init(mode == Mode.Encrypt ? javax.crypto.Cipher.ENCRYPT_MODE : javax.crypto.Cipher.DECRYPT_MODE,
-                    new SecretKeySpec(key, algorithm));
+            cipher = SecurityUtils.getCipher(getTransformation());
+            cipher.init(Mode.Encrypt.equals(mode)  ? javax.crypto.Cipher.ENCRYPT_MODE : javax.crypto.Cipher.DECRYPT_MODE,
+                    new SecretKeySpec(key, getAlgorithm()));
 
             byte[] foo = new byte[1];
             for (int i = 0; i < SKIP_SIZE; i++) {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/common/cipher/Cipher.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/cipher/Cipher.java b/sshd-core/src/main/java/org/apache/sshd/common/cipher/Cipher.java
index 63e2744..9e53cc9 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/cipher/Cipher.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/cipher/Cipher.java
@@ -31,6 +31,16 @@ public interface Cipher {
     }
 
     /**
+     * @return The cipher's algorithm
+     */
+    String getAlgorithm();
+
+    /**
+     * @return The actual transformation used - e.g., AES/CBC/NoPadding
+     */
+    String getTransformation();
+
+    /**
      * @return Size of the initialization vector (in bytes)
      */
     int getIVSize();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/common/cipher/CipherNone.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/cipher/CipherNone.java b/sshd-core/src/main/java/org/apache/sshd/common/cipher/CipherNone.java
index 751b70f..7f2c863 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/cipher/CipherNone.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/cipher/CipherNone.java
@@ -35,6 +35,16 @@ public class CipherNone implements Cipher {
     }
 
     @Override
+    public String getAlgorithm() {
+        return "none";
+    }
+
+    @Override
+    public String getTransformation() {
+        return "none";
+    }
+
+    @Override
     public int getIVSize() {
         return 8;   // dummy
     }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index d25b524..71a3893 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
+import java.util.Date;
 import java.util.EnumMap;
 import java.util.LinkedList;
 import java.util.Map;
@@ -68,6 +69,7 @@ import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.BufferUtils;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.server.ServerFactoryManager;
 
 /**
  * <P>
@@ -177,10 +179,17 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
     protected final AtomicLong outPacketsCount = new AtomicLong(0L);
     protected final AtomicLong inBytesCount = new AtomicLong(0L);
     protected final AtomicLong outBytesCount = new AtomicLong(0L);
+    protected final AtomicLong inBlocksCount = new AtomicLong(0L);
+    protected final AtomicLong outBlocksCount = new AtomicLong(0L);
     protected final AtomicLong lastKeyTimeValue = new AtomicLong(0L);
     protected final Queue<PendingWriteFuture> pendingPackets = new LinkedList<>();
 
     protected Service currentService;
+    // we initialize them here in case super constructor calls some methods that use these values
+    protected long maxRekyPackets = ServerFactoryManager.DEFAULT_REKEY_PACKETS_LIMIT;
+    protected long maxRekeyBytes = ServerFactoryManager.DEFAULT_REKEY_BYTES_LIMIT;
+    protected long maxRekeyInterval = ServerFactoryManager.DEFAULT_REKEY_TIME_LIMIT;
+    protected final AtomicLong maxRekeyBlocks = new AtomicLong(ServerFactoryManager.DEFAULT_REKEY_BYTES_LIMIT / 16);
 
     /**
      * The factory manager used to retrieve factories of Ciphers, Macs and other objects
@@ -201,6 +210,10 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         this.factoryManager = factoryManager;
         this.ioSession = ioSession;
 
+        maxRekeyBytes = PropertyResolverUtils.getLongProperty(this, ServerFactoryManager.REKEY_BYTES_LIMIT, maxRekeyBytes);
+        maxRekeyInterval = PropertyResolverUtils.getLongProperty(this, ServerFactoryManager.REKEY_TIME_LIMIT, maxRekeyInterval);
+        maxRekyPackets = PropertyResolverUtils.getLongProperty(this, ServerFactoryManager.REKEY_PACKETS_LIMIT, maxRekyPackets);
+
         ClassLoader loader = getClass().getClassLoader();
         sessionListenerProxy = EventListenerUtils.proxyWrapper(SessionListener.class, loader, sessionListeners);
         channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, loader, channelListeners);
@@ -479,6 +492,10 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
                 log.debug("handleServiceRequest({}) Service {} rejected: {} = {}",
                           this, serviceName, e.getClass().getSimpleName(), e.getMessage());
             }
+
+            if (log.isTraceEnabled()) {
+                log.trace("handleServiceRequest(" + this + ") service=" + serviceName + " rejection details", e);
+            }
             disconnect(SshConstants.SSH2_DISCONNECT_SERVICE_NOT_AVAILABLE, "Bad service request: " + serviceName);
             return;
         }
@@ -521,7 +538,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
 
     protected void handleNewKeys(int cmd) throws Exception {
         if (log.isDebugEnabled()) {
-            log.debug("handleNewKeys({}) SSH_MSG_NEWKEYS", this);
+            log.debug("handleNewKeys({}) SSH_MSG_NEWKEYS command={}", this, SshConstants.getCommandMessageName(cmd));
         }
         validateKexState(cmd, KexState.KEYS);
         receiveNewKeys();
@@ -539,7 +556,9 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         sendSessionEvent(SessionListener.Event.KeyEstablished);
         synchronized (pendingPackets) {
             if (!pendingPackets.isEmpty()) {
-                log.debug("Dequeing pending packets");
+                if (log.isDebugEnabled()) {
+                    log.debug("handleNewKeys({}) Dequeing pending packets", this);
+                }
                 synchronized (encodeLock) {
                     PendingWriteFuture future;
                     while ((future = pendingPackets.poll()) != null) {
@@ -672,7 +691,8 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
                 synchronized (pendingPackets) {
                     if (!KexState.DONE.equals(kexState.get())) {
                         if (pendingPackets.isEmpty()) {
-                            log.debug("Start flagging packets as pending until key exchange is done");
+                            log.debug("writePacket({})[{}] Start flagging packets as pending until key exchange is done",
+                                      this, SshConstants.getCommandMessageName(cmd & 0xFF));
                         }
                         PendingWriteFuture future = new PendingWriteFuture(buffer);
                         pendingPackets.add(future);
@@ -701,7 +721,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
                 public void run() {
                     Throwable t = new TimeoutException("Timeout writing packet: " + timeout + " " + unit);
                     if (log.isDebugEnabled()) {
-                        log.debug(t.getMessage());
+                        log.debug("writePacket({}): {}", AbstractSession.this, t.getMessage());
                     }
                     future.setValue(t);
                 }
@@ -810,7 +830,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
             int off = buffer.rpos() - 5;
             // Debug log the packet
             if (log.isTraceEnabled()) {
-                log.trace("Sending packet #{}: {}", Long.valueOf(seqo), buffer.printHex());
+                log.trace("encode({}) Sending packet #{}: {}", this, Long.valueOf(seqo), buffer.printHex());
             }
             // Compress the packet if needed
             if ((outCompression != null) && (authed || !outCompression.isDelayed())) {
@@ -845,6 +865,9 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
             // Encrypt packet, excluding mac
             if (outCipher != null) {
                 outCipher.update(buffer.array(), off, len + 4);
+
+                int blocksCount = (len + 4) / outCipher.getBlockSize();
+                outBlocksCount.addAndGet(Math.max(1, blocksCount));
             }
             // Increment packet id
             seqo = (seqo + 1) & 0xffffffffL;
@@ -877,12 +900,15 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
                     // Decrypt the first bytes
                     if (inCipher != null) {
                         inCipher.update(decoderBuffer.array(), 0, inCipherSize);
+
+                        int blocksCount = inCipherSize / inCipher.getBlockSize();
+                        inBlocksCount.addAndGet(Math.max(1, blocksCount));
                     }
                     // Read packet length
                     decoderLength = decoderBuffer.getInt();
                     // Check packet length validity
-                    if (decoderLength < 5 || decoderLength > (256 * 1024)) {
-                        log.warn("Error decoding packet (invalid length) {}", decoderBuffer.printHex());
+                    if ((decoderLength < 5) || (decoderLength > (256 * 1024))) {
+                        log.warn("decode({}) Error decoding packet(invalid length) {}", this, decoderBuffer.printHex());
                         throw new SshException(SshConstants.SSH2_DISCONNECT_PROTOCOL_ERROR,
                                 "Invalid packet length: " + decoderLength);
                     }
@@ -902,7 +928,11 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
                     byte[] data = decoderBuffer.array();
                     // Decrypt the remaining of the packet
                     if (inCipher != null) {
-                        inCipher.update(data, inCipherSize, decoderLength + 4 - inCipherSize);
+                        int updateLen = decoderLength + 4 - inCipherSize;
+                        inCipher.update(data, inCipherSize, updateLen);
+
+                        int blocksCount = updateLen / inCipher.getBlockSize();
+                        inBlocksCount.addAndGet(Math.max(1, blocksCount));
                     }
                     // Check the mac of the packet
                     if (inMac != null) {
@@ -938,7 +968,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
                         buf = decoderBuffer;
                     }
                     if (log.isTraceEnabled()) {
-                        log.trace("Received packet #{}: {}", Long.valueOf(seqi), buf.printHex());
+                        log.trace("decode({}) Received packet #{}: {}", this, Long.valueOf(seqi), buf.printHex());
                     }
                     // Update stats
                     inPacketsCount.incrementAndGet();
@@ -964,7 +994,9 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
      * @param ident our identification to send
      */
     protected void sendIdentification(String ident) {
-        log.debug("Send identification: {}", ident);
+        if (log.isDebugEnabled()) {
+            log.debug("sendIdentification({}): {}", this, ident);
+        }
         byte[] data = (ident + "\r\n").getBytes(StandardCharsets.UTF_8);
         ioSession.write(new ByteArrayBuffer(data));
     }
@@ -1073,7 +1105,9 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
      * @throws IOException if an error occurred sending the packet
      */
     protected byte[] sendKexInit(Map<KexProposalOption, String> proposal) throws IOException {
-        log.debug("Send SSH_MSG_KEXINIT");
+        if (log.isDebugEnabled()) {
+            log.debug("sendKexInit({}) Send SSH_MSG_KEXINIT", this);
+        }
         Buffer buffer = createBuffer(SshConstants.SSH_MSG_KEXINIT);
         int p = buffer.wpos();
         buffer.wpos(p + SshConstants.MSG_KEX_COOKIE_SIZE);
@@ -1158,7 +1192,9 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
      * @throws IOException if an error occurs sending the message
      */
     protected void sendNewKeys() throws IOException {
-        log.debug("Send SSH_MSG_NEWKEYS");
+        if (log.isDebugEnabled()) {
+            log.debug("sendNewKeys({}) Send SSH_MSG_NEWKEYS", this);
+        }
         Buffer buffer = createBuffer(SshConstants.SSH_MSG_NEWKEYS, Byte.SIZE);
         writePacket(buffer);
     }
@@ -1171,25 +1207,15 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
      * @throws Exception if an error occurs
      */
     protected void receiveNewKeys() throws Exception {
-        byte[] iv_c2s;
-        byte[] iv_s2c;
-        byte[] e_c2s;
-        byte[] e_s2c;
-        byte[] mac_c2s;
-        byte[] mac_s2c;
         byte[] k = kex.getK();
         byte[] h = kex.getH();
         Digest hash = kex.getHash();
-        Cipher s2ccipher;
-        Cipher c2scipher;
-        Mac s2cmac;
-        Mac c2smac;
-        Compression s2ccomp;
-        Compression c2scomp;
 
         if (sessionId == null) {
-            sessionId = new byte[h.length];
-            System.arraycopy(h, 0, sessionId, 0, h.length);
+            sessionId = h.clone();
+            if (log.isDebugEnabled()) {
+                log.debug("receiveNewKeys({}) session ID={}", this, BufferUtils.printHex(':', sessionId));
+            }
         }
 
         Buffer buffer = new ByteArrayBuffer();
@@ -1200,57 +1226,55 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         int pos = buffer.available();
         byte[] buf = buffer.array();
         hash.update(buf, 0, pos);
-        iv_c2s = hash.digest();
+        byte[] iv_c2s = hash.digest();
 
         int j = pos - sessionId.length - 1;
 
         buf[j]++;
         hash.update(buf, 0, pos);
-        iv_s2c = hash.digest();
+        byte[] iv_s2c = hash.digest();
 
         buf[j]++;
         hash.update(buf, 0, pos);
-        e_c2s = hash.digest();
+        byte[] e_c2s = hash.digest();
 
         buf[j]++;
         hash.update(buf, 0, pos);
-        e_s2c = hash.digest();
+        byte[] e_s2c = hash.digest();
 
         buf[j]++;
         hash.update(buf, 0, pos);
-        mac_c2s = hash.digest();
+        byte[] mac_c2s = hash.digest();
 
         buf[j]++;
         hash.update(buf, 0, pos);
-        mac_s2c = hash.digest();
+        byte[] mac_s2c = hash.digest();
 
-        String value;
-
-        value = getNegotiatedKexParameter(KexProposalOption.S2CENC);
-        s2ccipher = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getCipherFactories(), value), "Unknown s2c cipher: %s", value);
+        String value = getNegotiatedKexParameter(KexProposalOption.S2CENC);
+        Cipher s2ccipher = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getCipherFactories(), value), "Unknown s2c cipher: %s", value);
         e_s2c = resizeKey(e_s2c, s2ccipher.getBlockSize(), hash, k, h);
         s2ccipher.init(isServer ? Cipher.Mode.Encrypt : Cipher.Mode.Decrypt, e_s2c, iv_s2c);
 
         value = getNegotiatedKexParameter(KexProposalOption.S2CMAC);
-        s2cmac = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getMacFactories(), value), "Unknown s2c mac: %s", value);
+        Mac s2cmac = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getMacFactories(), value), "Unknown s2c mac: %s", value);
         mac_s2c = resizeKey(mac_s2c, s2cmac.getBlockSize(), hash, k, h);
         s2cmac.init(mac_s2c);
 
         value = getNegotiatedKexParameter(KexProposalOption.S2CCOMP);
-        s2ccomp = NamedFactory.Utils.create(getCompressionFactories(), value);
+        Compression s2ccomp = NamedFactory.Utils.create(getCompressionFactories(), value);
 
         value = getNegotiatedKexParameter(KexProposalOption.C2SENC);
-        c2scipher = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getCipherFactories(), value), "Unknown c2s cipher: %s", value);
+        Cipher c2scipher = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getCipherFactories(), value), "Unknown c2s cipher: %s", value);
         e_c2s = resizeKey(e_c2s, c2scipher.getBlockSize(), hash, k, h);
         c2scipher.init(isServer ? Cipher.Mode.Decrypt : Cipher.Mode.Encrypt, e_c2s, iv_c2s);
 
         value = getNegotiatedKexParameter(KexProposalOption.C2SMAC);
-        c2smac = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getMacFactories(), value), "Unknown c2s mac: %s", value);
+        Mac c2smac = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getMacFactories(), value), "Unknown c2s mac: %s", value);
         mac_c2s = resizeKey(mac_c2s, c2smac.getBlockSize(), hash, k, h);
         c2smac.init(mac_c2s);
 
         value = getNegotiatedKexParameter(KexProposalOption.C2SCOMP);
-        c2scomp = NamedFactory.Utils.create(getCompressionFactories(), value);
+        Compression c2scomp = NamedFactory.Utils.create(getCompressionFactories(), value);
 
         if (isServer) {
             outCipher = s2ccipher;
@@ -1276,10 +1300,25 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         if (inCompression != null) {
             inCompression.init(Compression.Type.Inflater, -1);
         }
+
+        // see https://tools.ietf.org/html/rfc4344#section-3.2
+        int inBlockSize = inCipher.getBlockSize();
+        int outBlockSize = outCipher.getBlockSize();
+        // select the lowest cipher size
+        int avgCipherBlockSize = Math.min(inBlockSize, outBlockSize);
+        long recommendedByteRekeyBlocks = 1L << Math.min((avgCipherBlockSize * Byte.SIZE) / 4, 63);    // in case (block-size / 4) > 63
+        maxRekeyBlocks.set(PropertyResolverUtils.getLongProperty(this, ServerFactoryManager.REKEY_BLOCKS_LIMIT, recommendedByteRekeyBlocks));
+        if (log.isDebugEnabled()) {
+            log.debug("receiveNewKeys({}) inCipher={}, outCipher={}, recommended blocks limit={}, actual={}",
+                      this, inCipher, outCipher, recommendedByteRekeyBlocks, maxRekeyBlocks);
+        }
+
         inBytesCount.set(0L);
         outBytesCount.set(0L);
         inPacketsCount.set(0L);
         outPacketsCount.set(0L);
+        inBlocksCount.set(0L);
+        outBlocksCount.set(0L);
         lastKeyTimeValue.set(System.currentTimeMillis());
     }
 
@@ -1403,14 +1442,14 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         }
 
         if (log.isDebugEnabled()) {
-            log.debug("Kex: server->client {} {} {}",
-                    guess.get(KexProposalOption.S2CENC),
-                    guess.get(KexProposalOption.S2CMAC),
-                    guess.get(KexProposalOption.S2CCOMP));
-            log.debug("Kex: client->server {} {} {}",
-                    guess.get(KexProposalOption.C2SENC),
-                    guess.get(KexProposalOption.C2SMAC),
-                    guess.get(KexProposalOption.C2SCOMP));
+            log.debug("setNegotiationResult({}) Kex: server->client {} {} {}", this,
+                      guess.get(KexProposalOption.S2CENC),
+                      guess.get(KexProposalOption.S2CMAC),
+                      guess.get(KexProposalOption.S2CCOMP));
+            log.debug("setNegotiationResult({}) Kex: client->server {} {} {}", this,
+                      guess.get(KexProposalOption.C2SENC),
+                      guess.get(KexProposalOption.C2SMAC),
+                      guess.get(KexProposalOption.C2SCOMP));
         }
 
         return guess;
@@ -1557,7 +1596,89 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
     }
 
     protected void checkRekey() throws IOException {
-        // nothing
+        if (isRekeyRequired()) {
+            reExchangeKeys();
+        }
+    }
+
+    protected boolean isRekeyRequired() {
+        KexState curState = kexState.get();
+        if (!KexState.DONE.equals(curState)) {
+            return false;
+        }
+
+        return isRekeyTimeIntervalExceeded()
+            || isRekeyPacketCountsExceeded()
+            || isRekeyBlocksCountExceeded()
+            || isRekeyDataSizeExceeded();
+    }
+
+    protected boolean isRekeyTimeIntervalExceeded() {
+        if (maxRekeyInterval <= 0L) {
+            return false;   // disabled
+        }
+
+        long now = System.currentTimeMillis();
+        long rekeyDiff = now - lastKeyTimeValue.get();
+        boolean rekey = rekeyDiff > maxRekeyInterval;
+        if (rekey) {
+            if (log.isDebugEnabled()) {
+                log.debug("isRekeyTimeIntervalExceeded({}) re-keying: last={}, now={}, diff={}, max={}",
+                          this, new Date(lastKeyTimeValue.get()), new Date(now),
+                          rekeyDiff, maxRekeyInterval);
+            }
+        }
+
+        return rekey;
+    }
+
+    protected boolean isRekeyPacketCountsExceeded() {
+        if (maxRekyPackets <= 0L) {
+            return false;   // disabled
+        }
+
+        boolean rekey = (inPacketsCount.get() > maxRekyPackets) || (outPacketsCount.get() > maxRekyPackets);
+        if (rekey) {
+            if (log.isDebugEnabled()) {
+                log.debug("isRekeyPacketCountsExceeded({}) re-keying: in={}, out={}, max={}",
+                          this, inPacketsCount, outPacketsCount, maxRekyPackets);
+            }
+        }
+
+        return rekey;
+    }
+
+    protected boolean isRekeyDataSizeExceeded() {
+        if (maxRekeyBytes <= 0L) {
+            return false;
+        }
+
+        boolean rekey = (inBytesCount.get() > maxRekeyBytes) || (outBytesCount.get() > maxRekeyBytes);
+        if (rekey) {
+            if (log.isDebugEnabled()) {
+                log.debug("isRekeyDataSizeExceeded({}) re-keying: in={}, out={}, max={}",
+                          this, inBytesCount, outBytesCount, maxRekeyBytes);
+            }
+        }
+
+        return rekey;
+    }
+
+    protected boolean isRekeyBlocksCountExceeded() {
+        long maxBlocks = maxRekeyBlocks.get();
+        if (maxBlocks <= 0L) {
+            return false;
+        }
+
+        boolean rekey = (inBlocksCount.get() > maxBlocks) || (outBlocksCount.get() > maxBlocks);
+        if (rekey) {
+            if (log.isDebugEnabled()) {
+                log.debug("isRekeyBlocksCountExceeded({}) re-keying: in={}, out={}, max={}",
+                          this, inBlocksCount, outBlocksCount, maxBlocks);
+            }
+        }
+
+        return rekey;
     }
 
     protected byte[] sendKexInit() throws IOException {
@@ -1569,8 +1690,8 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
 
         Map<KexProposalOption, String> proposal = createProposal(resolvedAlgorithms);
         byte[] seed = sendKexInit(proposal);
-        if (log.isDebugEnabled()) {
-            log.debug("sendKexInit(" + proposal + ") seed: " + BufferUtils.printHex(':', seed));
+        if (log.isTraceEnabled()) {
+            log.trace("sendKexInit({}) proposal={} seed: {}", this, proposal, BufferUtils.printHex(':', seed));
         }
         setKexSeed(seed);
         return seed;

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/common/util/buffer/BufferUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/buffer/BufferUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/buffer/BufferUtils.java
index 676549f..dc9a9cb 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/buffer/BufferUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/buffer/BufferUtils.java
@@ -311,6 +311,7 @@ public final class BufferUtils {
         }
         return true;
     }
+
     public static int getNextPowerOf2(int value) {
         // for 0-7 return 8
         return (value < Byte.SIZE) ? Byte.SIZE : NumberUtils.getNextPowerOf2(value);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
index 92fd295..2e63121 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
@@ -92,19 +92,52 @@ public interface ServerFactoryManager extends FactoryManager, ServerAuthenticati
 
     /**
      * Key re-exchange will be automatically performed after the session
-     * has sent or received the given amount of bytes.
-     * The default value is 1 gigabyte.
+     * has sent or received the given amount of bytes. If non-positive,
+     * then disabled. The default value is {@link #DEFAULT_REKEY_BYTES_LIMIT}
      */
     String REKEY_BYTES_LIMIT = "rekey-bytes-limit";
 
     /**
+     * Default value for {@link #REKEY_BYTES_LIMIT} if no override
+     * @see <A HREF="https://tools.ietf.org/html/rfc4253#page-23">RFC4253 section 9</A>
+     */
+    long DEFAULT_REKEY_BYTES_LIMIT = 1024L * 1024L * 1024L; // 1GB
+
+    /**
      * Key re-exchange will be automatically performed after the specified
-     * amount of time has elapsed since the last key exchange. In milliseconds.
-     * The default value is 1 hour.
+     * amount of time has elapsed since the last key exchange - in milliseconds.
+     * If non-positive then disabled. The default value is {@link #DEFAULT_REKEY_TIME_LIMIT}
      */
     String REKEY_TIME_LIMIT = "rekey-time-limit";
 
     /**
+     * Default value for {@link #REKEY_TIME_LIMIT} if none specified
+     * @see <A HREF="https://tools.ietf.org/html/rfc4253#page-23">RFC4253 section 9</A>
+     */
+    long DEFAULT_REKEY_TIME_LIMIT = 60L * 60L * 1000L; // 1 hour
+
+    /**
+     * Key re-exchange will be automatically performed after the specified
+     * number of packets has been exchanged - positive 64-bit value. If
+     * non-positive then disabled. The default is {@link #DEFAULT_REKEY_PACKETS_LIMIT}
+     */
+    String REKEY_PACKETS_LIMIT = "rekey-packets-limit";
+
+    /**
+     * Default value for {@link #REKEY_PACKETS_LIMIT} if none specified
+     * @see <A HREF="https://tools.ietf.org/html/rfc4344#page-3">RFC4344 section 3.1</A>
+     */
+    long DEFAULT_REKEY_PACKETS_LIMIT = 1L << 31;
+
+    /**
+     * Key re-exchange will be automatically performed after the specified
+     * number of cipher blocks has been processed - positive 64-bit value. If
+     * non-positive then disabled. The default is calculated according to
+     * <A HREF="https://tools.ietf.org/html/rfc4344#page-3">RFC4344 section 3.2</A>
+     */
+    String REKEY_BLOCKS_LIMIT = "rekey-blocks-limit";
+
+    /**
      * A URL pointing to the moduli file.
      * If not specified, the default internal file will be used.
      */

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
index 479159a..f506924 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
@@ -24,7 +24,6 @@ import java.security.KeyPair;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
-
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.NamedResource;
 import org.apache.sshd.common.PropertyResolverUtils;
@@ -51,15 +50,9 @@ import org.apache.sshd.server.ServerFactoryManager;
  * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
  */
 public class ServerSessionImpl extends AbstractServerSession {
-    protected static final long MAX_PACKETS = 1L << 31;
-
-    private long maxBytes = 1024 * 1024 * 1024;   // 1 GB
-    private long maxKeyInterval = 60 * 60 * 1000; // 1 hour
-
     public ServerSessionImpl(ServerFactoryManager server, IoSession ioSession) throws Exception {
         super(server, ioSession);
-        maxBytes = Math.max(32, PropertyResolverUtils.getLongProperty(this, ServerFactoryManager.REKEY_BYTES_LIMIT, maxBytes));
-        maxKeyInterval = PropertyResolverUtils.getLongProperty(this, ServerFactoryManager.REKEY_TIME_LIMIT, maxKeyInterval);
+
         if (log.isDebugEnabled()) {
             log.debug("Server session created {}", ioSession);
         }
@@ -82,20 +75,6 @@ public class ServerSessionImpl extends AbstractServerSession {
         disconnect(SshConstants.SSH2_DISCONNECT_PROTOCOL_ERROR, "Unsupported packet: SSH_MSG_SERVICE_ACCEPT");
     }
 
-    @Override
-    protected void checkRekey() throws IOException {
-        if (KexState.DONE.equals(kexState.get())) {
-            long now = System.currentTimeMillis();
-            if ((inPacketsCount.get() > MAX_PACKETS)
-                    || (outPacketsCount.get() > MAX_PACKETS)
-                    || (inBytesCount.get() > maxBytes)
-                    || (outBytesCount.get() > maxBytes)
-                    || ((maxKeyInterval > 0L) && ((now - lastKeyTimeValue.get()) > maxKeyInterval))) {
-                reExchangeKeys();
-            }
-        }
-    }
-
     protected void sendServerIdentification() {
         FactoryManager manager = getFactoryManager();
         String ident = PropertyResolverUtils.getString(manager, ServerFactoryManager.SERVER_IDENTIFICATION);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
index 077e531..34323a6 100644
--- a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
@@ -34,13 +34,12 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.channel.ChannelShell;
 import org.apache.sshd.client.channel.ClientChannel;
 import org.apache.sshd.client.session.ClientSession;
-import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.common.PropertyResolverUtils;
 import org.apache.sshd.common.cipher.BuiltinCiphers;
 import org.apache.sshd.common.future.KeyExchangeFuture;
 import org.apache.sshd.common.kex.BuiltinDHFactories;
@@ -49,6 +48,7 @@ import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.session.SessionListener;
 import org.apache.sshd.common.subsystem.sftp.SftpConstants;
 import org.apache.sshd.common.util.SecurityUtils;
+import org.apache.sshd.common.util.io.NullOutputStream;
 import org.apache.sshd.server.ServerFactoryManager;
 import org.apache.sshd.server.SshServer;
 import org.apache.sshd.util.test.BaseTestSupport;
@@ -83,7 +83,7 @@ public class KeyReExchangeTest extends BaseTestSupport {
         sshd.stop(true);
     }
 
-    protected void setUp(long bytesLimit, long timeLimit) throws Exception {
+    protected void setUp(long bytesLimit, long timeLimit, long packetsLimit) throws Exception {
         sshd = setupTestServer();
         if (bytesLimit > 0L) {
             PropertyResolverUtils.updateProperty(sshd, ServerFactoryManager.REKEY_BYTES_LIMIT, bytesLimit);
@@ -91,6 +91,9 @@ public class KeyReExchangeTest extends BaseTestSupport {
         if (timeLimit > 0L) {
             PropertyResolverUtils.updateProperty(sshd, ServerFactoryManager.REKEY_TIME_LIMIT, timeLimit);
         }
+        if (packetsLimit > 0L) {
+            PropertyResolverUtils.updateProperty(sshd, ServerFactoryManager.REKEY_PACKETS_LIMIT, packetsLimit);
+        }
 
         sshd.start();
         port = sshd.getPort();
@@ -98,7 +101,7 @@ public class KeyReExchangeTest extends BaseTestSupport {
 
     @Test
     public void testSwitchToNoneCipher() throws Exception {
-        setUp(0, 0);
+        setUp(0L, 0L, 0L);
 
         sshd.getCipherFactories().add(BuiltinCiphers.none);
         try (SshClient client = setupTestClient()) {
@@ -122,7 +125,7 @@ public class KeyReExchangeTest extends BaseTestSupport {
 
     @Test   // see SSHD-558
     public void testKexFutureExceptionPropagation() throws Exception {
-        setUp(0, 0);
+        setUp(0L, 0L, 0L);
         sshd.getCipherFactories().add(BuiltinCiphers.none);
 
         try (SshClient client = setupTestClient()) {
@@ -195,7 +198,7 @@ public class KeyReExchangeTest extends BaseTestSupport {
     @Test
     public void testReExchangeFromJschClient() throws Exception {
         Assume.assumeTrue("DH Group Exchange not supported", SecurityUtils.isDHGroupExchangeSupported());
-        setUp(0, 0);
+        setUp(0L, 0L, 0L);
 
         JSchLogger.init();
         JSch.setConfig("kex", BuiltinDHFactories.Constants.DIFFIE_HELLMAN_GROUP_EXCHANGE_SHA1);
@@ -232,7 +235,7 @@ public class KeyReExchangeTest extends BaseTestSupport {
 
     @Test
     public void testReExchangeFromSshdClient() throws Exception {
-        setUp(0, 0);
+        setUp(0L, 0L, 0L);
 
         try (SshClient client = setupTestClient()) {
             client.start();
@@ -288,22 +291,22 @@ public class KeyReExchangeTest extends BaseTestSupport {
 
     @Test
     public void testReExchangeFromServerBySize() throws Exception {
-        final long LIMIT = 8192;
-        setUp(LIMIT, 0);
+        final long LIMIT = 8192L;
+        setUp(LIMIT, 0L, 0L);
 
         try (SshClient client = setupTestClient()) {
             client.start();
 
-            try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
+            try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession();
+                 ByteArrayOutputStream sent = new ByteArrayOutputStream();
+                 ByteArrayOutputStream out = new ByteArrayOutputStream()) {
                 session.addPasswordIdentity(getCurrentTestName());
                 session.auth().verify(5L, TimeUnit.SECONDS);
 
                 try (ChannelShell channel = session.createShellChannel();
-                     ByteArrayOutputStream sent = new ByteArrayOutputStream();
                      PipedOutputStream pipedIn = new PipedOutputStream();
                      OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
-                     ByteArrayOutputStream out = new ByteArrayOutputStream();
-                     ByteArrayOutputStream err = new ByteArrayOutputStream();
+                     OutputStream err = new NullOutputStream();
                      InputStream inPipe = new PipedInputStream(pipedIn)) {
 
                     channel.setIn(inPipe);
@@ -347,6 +350,7 @@ public class KeyReExchangeTest extends BaseTestSupport {
                         teeOut.flush();
                         // no need to wait until the limit is reached if a re-key occurred
                         if (exchanges.get() > 0) {
+                            outputDebugMessage("Stop sending after %d bytes - exchanges=%s", sentSize + data.length, exchanges);
                             break;
                         }
                     }
@@ -359,32 +363,36 @@ public class KeyReExchangeTest extends BaseTestSupport {
                     assertFalse("Timeout while waiting for channel closure", result.contains(ClientChannel.ClientChannelEvent.TIMEOUT));
 
                     assertTrue("Expected rekeying", exchanges.get() > 0);
-                    assertEquals("Mismatched sent data length", sent.toByteArray().length, out.toByteArray().length);
-                    assertArrayEquals("Mismatched sent data content", sent.toByteArray(), out.toByteArray());
                 }
+
+                byte[] sentData = sent.toByteArray();
+                byte[] outData = out.toByteArray();
+                assertEquals("Mismatched sent data length", sentData.length, outData.length);
+                assertArrayEquals("Mismatched sent data content", sentData, outData);
             } finally {
                 client.stop();
             }
         }
     }
+
     @Test
     public void testReExchangeFromServerByTime() throws Exception {
         final long TIME = TimeUnit.SECONDS.toMillis(2L);
-        setUp(0, TIME);
+        setUp(0L, TIME, 0L);
 
         try (SshClient client = setupTestClient()) {
             client.start();
 
-            try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
+            try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession();
+                 ByteArrayOutputStream sent = new ByteArrayOutputStream();
+                 ByteArrayOutputStream out = new ByteArrayOutputStream()) {
                 session.addPasswordIdentity(getCurrentTestName());
                 session.auth().verify(5L, TimeUnit.SECONDS);
 
                 try (ChannelShell channel = session.createShellChannel();
-                     ByteArrayOutputStream sent = new ByteArrayOutputStream();
                      PipedOutputStream pipedIn = new PipedOutputStream();
                      OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
-                     ByteArrayOutputStream out = new ByteArrayOutputStream();
-                     ByteArrayOutputStream err = new ByteArrayOutputStream();
+                     OutputStream err = new NullOutputStream();
                      InputStream inPipe = new PipedInputStream(pipedIn)) {
 
                     channel.setIn(inPipe);
@@ -426,21 +434,25 @@ public class KeyReExchangeTest extends BaseTestSupport {
                     final long MAX_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(3L * TIME);
                     final long MIN_WAIT = 10L;
                     final long MIN_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(MIN_WAIT);
-                    for (long timePassed = 0L; timePassed < MAX_WAIT_NANOS; timePassed++) {
+                    for (long timePassed = 0L, sentSize = 0L; timePassed < MAX_WAIT_NANOS; timePassed++) {
                         long nanoStart = System.nanoTime();
                         teeOut.write(data);
                         teeOut.write('\n');
                         teeOut.flush();
 
+                        long nanoEnd = System.nanoTime();
+                        long nanoDuration = nanoEnd - nanoStart;
+
+                        timePassed += nanoDuration;
+                        sentSize += data.length + 1;
+
                         // no need to wait until the timeout expires if a re-key occurred
                         if (exchanges.get() > 0) {
+                            outputDebugMessage("Stop sending after %d nanos and size=%d - exchanges=%s",
+                                               timePassed, sentSize, exchanges);
                             break;
                         }
 
-                        long nanoEnd = System.nanoTime();
-                        long nanoDuration = nanoEnd - nanoStart;
-
-                        timePassed += nanoDuration;
                         if ((timePassed < MAX_WAIT_NANOS) && (nanoDuration < MIN_WAIT_NANOS)) {
                             Thread.sleep(MIN_WAIT);
                         }
@@ -454,9 +466,94 @@ public class KeyReExchangeTest extends BaseTestSupport {
                     assertFalse("Timeout while waiting for channel closure", result.contains(ClientChannel.ClientChannelEvent.TIMEOUT));
 
                     assertTrue("Expected rekeying", exchanges.get() > 0);
-                    assertEquals("Mismatched sent data length", sent.toByteArray().length, out.toByteArray().length);
-                    assertArrayEquals("Mismatched sent data content", sent.toByteArray(), out.toByteArray());
                 }
+
+                byte[] sentData = sent.toByteArray();
+                byte[] outData = out.toByteArray();
+                assertEquals("Mismatched sent data length", sentData.length, outData.length);
+                assertArrayEquals("Mismatched sent data content", sentData, outData);
+            } finally {
+                client.stop();
+            }
+        }
+    }
+
+    @Test   // see SSHD-601
+    public void testReExchangeFromServerByPackets() throws Exception {
+        final int PACKETS = 128;
+        setUp(0L, 0L, PACKETS);
+
+        try (SshClient client = setupTestClient()) {
+            client.start();
+
+            try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession();
+                 ByteArrayOutputStream sent = new ByteArrayOutputStream();
+                 ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+                session.addPasswordIdentity(getCurrentTestName());
+                session.auth().verify(5L, TimeUnit.SECONDS);
+
+                try (ChannelShell channel = session.createShellChannel();
+                     PipedOutputStream pipedIn = new PipedOutputStream();
+                     OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
+                     OutputStream err = new NullOutputStream();
+                     InputStream inPipe = new PipedInputStream(pipedIn)) {
+
+                    channel.setIn(inPipe);
+                    channel.setOut(out);
+                    channel.setErr(err);
+                    channel.open();
+
+                    teeOut.write("this is my command\n".getBytes(StandardCharsets.UTF_8));
+                    teeOut.flush();
+
+                    final AtomicInteger exchanges = new AtomicInteger();
+                    session.addSessionListener(new SessionListener() {
+                        @Override
+                        public void sessionCreated(Session session) {
+                            // ignored
+                        }
+
+                        @Override
+                        public void sessionEvent(Session session, Event event) {
+                            if (Event.KeyEstablished.equals(event)) {
+                                int count = exchanges.incrementAndGet();
+                                outputDebugMessage("Key established for %s - count=%d", session, count);
+                            }
+                        }
+
+                        @Override
+                        public void sessionClosed(Session session) {
+                            // ignored
+                        }
+                    });
+
+                    byte[] data = (getClass().getName() + "#" + getCurrentTestName() + "\n").getBytes(StandardCharsets.UTF_8);
+                    for (int index = 0; index < (PACKETS * 2); index++) {
+                        teeOut.write(data);
+                        teeOut.flush();
+
+                        // no need to wait until the packets limit is reached if a re-key occurred
+                        if (exchanges.get() > 0) {
+                            outputDebugMessage("Stop sending after % packets and %d bytes - exchanges=%s",
+                                               (index + 1), (index + 1L) * data.length, exchanges);
+                            break;
+                        }
+                    }
+
+                    teeOut.write("exit\n".getBytes(StandardCharsets.UTF_8));
+                    teeOut.flush();
+
+                    Collection<ClientChannel.ClientChannelEvent> result =
+                            channel.waitFor(EnumSet.of(ClientChannel.ClientChannelEvent.CLOSED), TimeUnit.SECONDS.toMillis(15L));
+                    assertFalse("Timeout while waiting for channel closure", result.contains(ClientChannel.ClientChannelEvent.TIMEOUT));
+
+                    assertTrue("Expected rekeying", exchanges.get() > 0);
+                }
+
+                byte[] sentData = sent.toByteArray();
+                byte[] outData = out.toByteArray();
+                assertEquals("Mismatched sent data length", sentData.length, outData.length);
+                assertArrayEquals("Mismatched sent data content", sentData, outData);
             } finally {
                 client.stop();
             }