You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2015/11/29 07:43:12 UTC
mina-sshd git commit: [SSHD-601] Add configurable re-key packets limit
Repository: mina-sshd
Updated Branches:
refs/heads/master b07095c9b -> 0a4becb3b
[SSHD-601] Add configurable re-key packets limit
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/0a4becb3
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/0a4becb3
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/0a4becb3
Branch: refs/heads/master
Commit: 0a4becb3bc1789ad0cf929cae1e3fa6af5e33a57
Parents: b07095c
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Sun Nov 29 08:42:58 2015 +0200
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Sun Nov 29 08:42:58 2015 +0200
----------------------------------------------------------------------
.../apache/sshd/common/cipher/BaseCipher.java | 40 +++-
.../sshd/common/cipher/BaseRC4Cipher.java | 8 +-
.../org/apache/sshd/common/cipher/Cipher.java | 10 +
.../apache/sshd/common/cipher/CipherNone.java | 10 +
.../sshd/common/session/AbstractSession.java | 225 ++++++++++++++-----
.../sshd/common/util/buffer/BufferUtils.java | 1 +
.../sshd/server/ServerFactoryManager.java | 41 +++-
.../sshd/server/session/ServerSessionImpl.java | 23 +-
.../java/org/apache/sshd/KeyReExchangeTest.java | 151 ++++++++++---
9 files changed, 388 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseCipher.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseCipher.java b/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseCipher.java
index 0b85a47..c6b534b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseCipher.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseCipher.java
@@ -24,6 +24,7 @@ import javax.crypto.spec.SecretKeySpec;
import org.apache.sshd.common.SshException;
import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.SecurityUtils;
+import org.apache.sshd.common.util.ValidateUtils;
/**
* Base class for all Cipher implementations delegating to the JCE provider.
@@ -32,17 +33,27 @@ import org.apache.sshd.common.util.SecurityUtils;
*/
public class BaseCipher implements Cipher {
- protected final int ivsize;
- protected final int bsize;
- protected final String algorithm;
- protected final String transformation;
protected javax.crypto.Cipher cipher;
+ private final int ivsize;
+ private final int bsize;
+ private final String algorithm;
+ private final String transformation;
public BaseCipher(int ivsize, int bsize, String algorithm, String transformation) {
this.ivsize = ivsize;
this.bsize = bsize;
- this.algorithm = algorithm;
- this.transformation = transformation;
+ this.algorithm = ValidateUtils.checkNotNullAndNotEmpty(algorithm, "No algorithm");
+ this.transformation = ValidateUtils.checkNotNullAndNotEmpty(transformation, "No transformation");
+ }
+
+ @Override
+ public String getAlgorithm() {
+ return algorithm;
+ }
+
+ @Override
+ public String getTransformation() {
+ return transformation;
}
@Override
@@ -57,12 +68,12 @@ public class BaseCipher implements Cipher {
@Override
public void init(Mode mode, byte[] key, byte[] iv) throws Exception {
- key = resize(key, bsize);
- iv = resize(iv, ivsize);
+ key = resize(key, getBlockSize());
+ iv = resize(iv, getIVSize());
try {
- cipher = SecurityUtils.getCipher(transformation);
- cipher.init(mode == Mode.Encrypt ? javax.crypto.Cipher.ENCRYPT_MODE : javax.crypto.Cipher.DECRYPT_MODE,
- new SecretKeySpec(key, algorithm),
+ cipher = SecurityUtils.getCipher(getTransformation());
+ cipher.init(Mode.Encrypt.equals(mode) ? javax.crypto.Cipher.ENCRYPT_MODE : javax.crypto.Cipher.DECRYPT_MODE,
+ new SecretKeySpec(key, getAlgorithm()),
new IvParameterSpec(iv));
} catch (Exception e) {
cipher = null;
@@ -91,6 +102,11 @@ public class BaseCipher implements Cipher {
@Override
public String toString() {
- return getClass().getSimpleName() + "[" + algorithm + "," + ivsize + "," + bsize + "," + transformation + "]";
+ return getClass().getSimpleName()
+ + "[" + getAlgorithm()
+ + "," + getIVSize()
+ + "," + getBlockSize()
+ + "," + getTransformation()
+ + "]";
}
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseRC4Cipher.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseRC4Cipher.java b/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseRC4Cipher.java
index 19c9497..016ecee 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseRC4Cipher.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/cipher/BaseRC4Cipher.java
@@ -35,11 +35,11 @@ public class BaseRC4Cipher extends BaseCipher {
@Override
public void init(Mode mode, byte[] key, byte[] iv) throws Exception {
- key = resize(key, bsize);
+ key = resize(key, getBlockSize());
try {
- cipher = SecurityUtils.getCipher(transformation);
- cipher.init(mode == Mode.Encrypt ? javax.crypto.Cipher.ENCRYPT_MODE : javax.crypto.Cipher.DECRYPT_MODE,
- new SecretKeySpec(key, algorithm));
+ cipher = SecurityUtils.getCipher(getTransformation());
+ cipher.init(Mode.Encrypt.equals(mode) ? javax.crypto.Cipher.ENCRYPT_MODE : javax.crypto.Cipher.DECRYPT_MODE,
+ new SecretKeySpec(key, getAlgorithm()));
byte[] foo = new byte[1];
for (int i = 0; i < SKIP_SIZE; i++) {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/common/cipher/Cipher.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/cipher/Cipher.java b/sshd-core/src/main/java/org/apache/sshd/common/cipher/Cipher.java
index 63e2744..9e53cc9 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/cipher/Cipher.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/cipher/Cipher.java
@@ -31,6 +31,16 @@ public interface Cipher {
}
/**
+ * @return The cipher's algorithm
+ */
+ String getAlgorithm();
+
+ /**
+ * @return The actual transformation used - e.g., AES/CBC/NoPadding
+ */
+ String getTransformation();
+
+ /**
* @return Size of the initialization vector (in bytes)
*/
int getIVSize();
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/common/cipher/CipherNone.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/cipher/CipherNone.java b/sshd-core/src/main/java/org/apache/sshd/common/cipher/CipherNone.java
index 751b70f..7f2c863 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/cipher/CipherNone.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/cipher/CipherNone.java
@@ -35,6 +35,16 @@ public class CipherNone implements Cipher {
}
@Override
+ public String getAlgorithm() {
+ return "none";
+ }
+
+ @Override
+ public String getTransformation() {
+ return "none";
+ }
+
+ @Override
public int getIVSize() {
return 8; // dummy
}
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index d25b524..71a3893 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
+import java.util.Date;
import java.util.EnumMap;
import java.util.LinkedList;
import java.util.Map;
@@ -68,6 +69,7 @@ import org.apache.sshd.common.util.ValidateUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.BufferUtils;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.server.ServerFactoryManager;
/**
* <P>
@@ -177,10 +179,17 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
protected final AtomicLong outPacketsCount = new AtomicLong(0L);
protected final AtomicLong inBytesCount = new AtomicLong(0L);
protected final AtomicLong outBytesCount = new AtomicLong(0L);
+ protected final AtomicLong inBlocksCount = new AtomicLong(0L);
+ protected final AtomicLong outBlocksCount = new AtomicLong(0L);
protected final AtomicLong lastKeyTimeValue = new AtomicLong(0L);
protected final Queue<PendingWriteFuture> pendingPackets = new LinkedList<>();
protected Service currentService;
+ // we initialize them here in case super constructor calls some methods that use these values
+ protected long maxRekyPackets = ServerFactoryManager.DEFAULT_REKEY_PACKETS_LIMIT;
+ protected long maxRekeyBytes = ServerFactoryManager.DEFAULT_REKEY_BYTES_LIMIT;
+ protected long maxRekeyInterval = ServerFactoryManager.DEFAULT_REKEY_TIME_LIMIT;
+ protected final AtomicLong maxRekeyBlocks = new AtomicLong(ServerFactoryManager.DEFAULT_REKEY_BYTES_LIMIT / 16);
/**
* The factory manager used to retrieve factories of Ciphers, Macs and other objects
@@ -201,6 +210,10 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
this.factoryManager = factoryManager;
this.ioSession = ioSession;
+ maxRekeyBytes = PropertyResolverUtils.getLongProperty(this, ServerFactoryManager.REKEY_BYTES_LIMIT, maxRekeyBytes);
+ maxRekeyInterval = PropertyResolverUtils.getLongProperty(this, ServerFactoryManager.REKEY_TIME_LIMIT, maxRekeyInterval);
+ maxRekyPackets = PropertyResolverUtils.getLongProperty(this, ServerFactoryManager.REKEY_PACKETS_LIMIT, maxRekyPackets);
+
ClassLoader loader = getClass().getClassLoader();
sessionListenerProxy = EventListenerUtils.proxyWrapper(SessionListener.class, loader, sessionListeners);
channelListenerProxy = EventListenerUtils.proxyWrapper(ChannelListener.class, loader, channelListeners);
@@ -479,6 +492,10 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
log.debug("handleServiceRequest({}) Service {} rejected: {} = {}",
this, serviceName, e.getClass().getSimpleName(), e.getMessage());
}
+
+ if (log.isTraceEnabled()) {
+ log.trace("handleServiceRequest(" + this + ") service=" + serviceName + " rejection details", e);
+ }
disconnect(SshConstants.SSH2_DISCONNECT_SERVICE_NOT_AVAILABLE, "Bad service request: " + serviceName);
return;
}
@@ -521,7 +538,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
protected void handleNewKeys(int cmd) throws Exception {
if (log.isDebugEnabled()) {
- log.debug("handleNewKeys({}) SSH_MSG_NEWKEYS", this);
+ log.debug("handleNewKeys({}) SSH_MSG_NEWKEYS command={}", this, SshConstants.getCommandMessageName(cmd));
}
validateKexState(cmd, KexState.KEYS);
receiveNewKeys();
@@ -539,7 +556,9 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
sendSessionEvent(SessionListener.Event.KeyEstablished);
synchronized (pendingPackets) {
if (!pendingPackets.isEmpty()) {
- log.debug("Dequeing pending packets");
+ if (log.isDebugEnabled()) {
+ log.debug("handleNewKeys({}) Dequeing pending packets", this);
+ }
synchronized (encodeLock) {
PendingWriteFuture future;
while ((future = pendingPackets.poll()) != null) {
@@ -672,7 +691,8 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
synchronized (pendingPackets) {
if (!KexState.DONE.equals(kexState.get())) {
if (pendingPackets.isEmpty()) {
- log.debug("Start flagging packets as pending until key exchange is done");
+ log.debug("writePacket({})[{}] Start flagging packets as pending until key exchange is done",
+ this, SshConstants.getCommandMessageName(cmd & 0xFF));
}
PendingWriteFuture future = new PendingWriteFuture(buffer);
pendingPackets.add(future);
@@ -701,7 +721,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
public void run() {
Throwable t = new TimeoutException("Timeout writing packet: " + timeout + " " + unit);
if (log.isDebugEnabled()) {
- log.debug(t.getMessage());
+ log.debug("writePacket({}): {}", AbstractSession.this, t.getMessage());
}
future.setValue(t);
}
@@ -810,7 +830,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
int off = buffer.rpos() - 5;
// Debug log the packet
if (log.isTraceEnabled()) {
- log.trace("Sending packet #{}: {}", Long.valueOf(seqo), buffer.printHex());
+ log.trace("encode({}) Sending packet #{}: {}", this, Long.valueOf(seqo), buffer.printHex());
}
// Compress the packet if needed
if ((outCompression != null) && (authed || !outCompression.isDelayed())) {
@@ -845,6 +865,9 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
// Encrypt packet, excluding mac
if (outCipher != null) {
outCipher.update(buffer.array(), off, len + 4);
+
+ int blocksCount = (len + 4) / outCipher.getBlockSize();
+ outBlocksCount.addAndGet(Math.max(1, blocksCount));
}
// Increment packet id
seqo = (seqo + 1) & 0xffffffffL;
@@ -877,12 +900,15 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
// Decrypt the first bytes
if (inCipher != null) {
inCipher.update(decoderBuffer.array(), 0, inCipherSize);
+
+ int blocksCount = inCipherSize / inCipher.getBlockSize();
+ inBlocksCount.addAndGet(Math.max(1, blocksCount));
}
// Read packet length
decoderLength = decoderBuffer.getInt();
// Check packet length validity
- if (decoderLength < 5 || decoderLength > (256 * 1024)) {
- log.warn("Error decoding packet (invalid length) {}", decoderBuffer.printHex());
+ if ((decoderLength < 5) || (decoderLength > (256 * 1024))) {
+ log.warn("decode({}) Error decoding packet(invalid length) {}", this, decoderBuffer.printHex());
throw new SshException(SshConstants.SSH2_DISCONNECT_PROTOCOL_ERROR,
"Invalid packet length: " + decoderLength);
}
@@ -902,7 +928,11 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
byte[] data = decoderBuffer.array();
// Decrypt the remaining of the packet
if (inCipher != null) {
- inCipher.update(data, inCipherSize, decoderLength + 4 - inCipherSize);
+ int updateLen = decoderLength + 4 - inCipherSize;
+ inCipher.update(data, inCipherSize, updateLen);
+
+ int blocksCount = updateLen / inCipher.getBlockSize();
+ inBlocksCount.addAndGet(Math.max(1, blocksCount));
}
// Check the mac of the packet
if (inMac != null) {
@@ -938,7 +968,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
buf = decoderBuffer;
}
if (log.isTraceEnabled()) {
- log.trace("Received packet #{}: {}", Long.valueOf(seqi), buf.printHex());
+ log.trace("decode({}) Received packet #{}: {}", this, Long.valueOf(seqi), buf.printHex());
}
// Update stats
inPacketsCount.incrementAndGet();
@@ -964,7 +994,9 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
* @param ident our identification to send
*/
protected void sendIdentification(String ident) {
- log.debug("Send identification: {}", ident);
+ if (log.isDebugEnabled()) {
+ log.debug("sendIdentification({}): {}", this, ident);
+ }
byte[] data = (ident + "\r\n").getBytes(StandardCharsets.UTF_8);
ioSession.write(new ByteArrayBuffer(data));
}
@@ -1073,7 +1105,9 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
* @throws IOException if an error occurred sending the packet
*/
protected byte[] sendKexInit(Map<KexProposalOption, String> proposal) throws IOException {
- log.debug("Send SSH_MSG_KEXINIT");
+ if (log.isDebugEnabled()) {
+ log.debug("sendKexInit({}) Send SSH_MSG_KEXINIT", this);
+ }
Buffer buffer = createBuffer(SshConstants.SSH_MSG_KEXINIT);
int p = buffer.wpos();
buffer.wpos(p + SshConstants.MSG_KEX_COOKIE_SIZE);
@@ -1158,7 +1192,9 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
* @throws IOException if an error occurs sending the message
*/
protected void sendNewKeys() throws IOException {
- log.debug("Send SSH_MSG_NEWKEYS");
+ if (log.isDebugEnabled()) {
+ log.debug("sendNewKeys({}) Send SSH_MSG_NEWKEYS", this);
+ }
Buffer buffer = createBuffer(SshConstants.SSH_MSG_NEWKEYS, Byte.SIZE);
writePacket(buffer);
}
@@ -1171,25 +1207,15 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
* @throws Exception if an error occurs
*/
protected void receiveNewKeys() throws Exception {
- byte[] iv_c2s;
- byte[] iv_s2c;
- byte[] e_c2s;
- byte[] e_s2c;
- byte[] mac_c2s;
- byte[] mac_s2c;
byte[] k = kex.getK();
byte[] h = kex.getH();
Digest hash = kex.getHash();
- Cipher s2ccipher;
- Cipher c2scipher;
- Mac s2cmac;
- Mac c2smac;
- Compression s2ccomp;
- Compression c2scomp;
if (sessionId == null) {
- sessionId = new byte[h.length];
- System.arraycopy(h, 0, sessionId, 0, h.length);
+ sessionId = h.clone();
+ if (log.isDebugEnabled()) {
+ log.debug("receiveNewKeys({}) session ID={}", this, BufferUtils.printHex(':', sessionId));
+ }
}
Buffer buffer = new ByteArrayBuffer();
@@ -1200,57 +1226,55 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
int pos = buffer.available();
byte[] buf = buffer.array();
hash.update(buf, 0, pos);
- iv_c2s = hash.digest();
+ byte[] iv_c2s = hash.digest();
int j = pos - sessionId.length - 1;
buf[j]++;
hash.update(buf, 0, pos);
- iv_s2c = hash.digest();
+ byte[] iv_s2c = hash.digest();
buf[j]++;
hash.update(buf, 0, pos);
- e_c2s = hash.digest();
+ byte[] e_c2s = hash.digest();
buf[j]++;
hash.update(buf, 0, pos);
- e_s2c = hash.digest();
+ byte[] e_s2c = hash.digest();
buf[j]++;
hash.update(buf, 0, pos);
- mac_c2s = hash.digest();
+ byte[] mac_c2s = hash.digest();
buf[j]++;
hash.update(buf, 0, pos);
- mac_s2c = hash.digest();
+ byte[] mac_s2c = hash.digest();
- String value;
-
- value = getNegotiatedKexParameter(KexProposalOption.S2CENC);
- s2ccipher = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getCipherFactories(), value), "Unknown s2c cipher: %s", value);
+ String value = getNegotiatedKexParameter(KexProposalOption.S2CENC);
+ Cipher s2ccipher = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getCipherFactories(), value), "Unknown s2c cipher: %s", value);
e_s2c = resizeKey(e_s2c, s2ccipher.getBlockSize(), hash, k, h);
s2ccipher.init(isServer ? Cipher.Mode.Encrypt : Cipher.Mode.Decrypt, e_s2c, iv_s2c);
value = getNegotiatedKexParameter(KexProposalOption.S2CMAC);
- s2cmac = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getMacFactories(), value), "Unknown s2c mac: %s", value);
+ Mac s2cmac = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getMacFactories(), value), "Unknown s2c mac: %s", value);
mac_s2c = resizeKey(mac_s2c, s2cmac.getBlockSize(), hash, k, h);
s2cmac.init(mac_s2c);
value = getNegotiatedKexParameter(KexProposalOption.S2CCOMP);
- s2ccomp = NamedFactory.Utils.create(getCompressionFactories(), value);
+ Compression s2ccomp = NamedFactory.Utils.create(getCompressionFactories(), value);
value = getNegotiatedKexParameter(KexProposalOption.C2SENC);
- c2scipher = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getCipherFactories(), value), "Unknown c2s cipher: %s", value);
+ Cipher c2scipher = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getCipherFactories(), value), "Unknown c2s cipher: %s", value);
e_c2s = resizeKey(e_c2s, c2scipher.getBlockSize(), hash, k, h);
c2scipher.init(isServer ? Cipher.Mode.Decrypt : Cipher.Mode.Encrypt, e_c2s, iv_c2s);
value = getNegotiatedKexParameter(KexProposalOption.C2SMAC);
- c2smac = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getMacFactories(), value), "Unknown c2s mac: %s", value);
+ Mac c2smac = ValidateUtils.checkNotNull(NamedFactory.Utils.create(getMacFactories(), value), "Unknown c2s mac: %s", value);
mac_c2s = resizeKey(mac_c2s, c2smac.getBlockSize(), hash, k, h);
c2smac.init(mac_c2s);
value = getNegotiatedKexParameter(KexProposalOption.C2SCOMP);
- c2scomp = NamedFactory.Utils.create(getCompressionFactories(), value);
+ Compression c2scomp = NamedFactory.Utils.create(getCompressionFactories(), value);
if (isServer) {
outCipher = s2ccipher;
@@ -1276,10 +1300,25 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
if (inCompression != null) {
inCompression.init(Compression.Type.Inflater, -1);
}
+
+ // see https://tools.ietf.org/html/rfc4344#section-3.2
+ int inBlockSize = inCipher.getBlockSize();
+ int outBlockSize = outCipher.getBlockSize();
+ // select the lowest cipher size
+ int avgCipherBlockSize = Math.min(inBlockSize, outBlockSize);
+ long recommendedByteRekeyBlocks = 1L << Math.min((avgCipherBlockSize * Byte.SIZE) / 4, 63); // in case (block-size / 4) > 63
+ maxRekeyBlocks.set(PropertyResolverUtils.getLongProperty(this, ServerFactoryManager.REKEY_BLOCKS_LIMIT, recommendedByteRekeyBlocks));
+ if (log.isDebugEnabled()) {
+ log.debug("receiveNewKeys({}) inCipher={}, outCipher={}, recommended blocks limit={}, actual={}",
+ this, inCipher, outCipher, recommendedByteRekeyBlocks, maxRekeyBlocks);
+ }
+
inBytesCount.set(0L);
outBytesCount.set(0L);
inPacketsCount.set(0L);
outPacketsCount.set(0L);
+ inBlocksCount.set(0L);
+ outBlocksCount.set(0L);
lastKeyTimeValue.set(System.currentTimeMillis());
}
@@ -1403,14 +1442,14 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
}
if (log.isDebugEnabled()) {
- log.debug("Kex: server->client {} {} {}",
- guess.get(KexProposalOption.S2CENC),
- guess.get(KexProposalOption.S2CMAC),
- guess.get(KexProposalOption.S2CCOMP));
- log.debug("Kex: client->server {} {} {}",
- guess.get(KexProposalOption.C2SENC),
- guess.get(KexProposalOption.C2SMAC),
- guess.get(KexProposalOption.C2SCOMP));
+ log.debug("setNegotiationResult({}) Kex: server->client {} {} {}", this,
+ guess.get(KexProposalOption.S2CENC),
+ guess.get(KexProposalOption.S2CMAC),
+ guess.get(KexProposalOption.S2CCOMP));
+ log.debug("setNegotiationResult({}) Kex: client->server {} {} {}", this,
+ guess.get(KexProposalOption.C2SENC),
+ guess.get(KexProposalOption.C2SMAC),
+ guess.get(KexProposalOption.C2SCOMP));
}
return guess;
@@ -1557,7 +1596,89 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
}
protected void checkRekey() throws IOException {
- // nothing
+ if (isRekeyRequired()) {
+ reExchangeKeys();
+ }
+ }
+
+ protected boolean isRekeyRequired() {
+ KexState curState = kexState.get();
+ if (!KexState.DONE.equals(curState)) {
+ return false;
+ }
+
+ return isRekeyTimeIntervalExceeded()
+ || isRekeyPacketCountsExceeded()
+ || isRekeyBlocksCountExceeded()
+ || isRekeyDataSizeExceeded();
+ }
+
+ protected boolean isRekeyTimeIntervalExceeded() {
+ if (maxRekeyInterval <= 0L) {
+ return false; // disabled
+ }
+
+ long now = System.currentTimeMillis();
+ long rekeyDiff = now - lastKeyTimeValue.get();
+ boolean rekey = rekeyDiff > maxRekeyInterval;
+ if (rekey) {
+ if (log.isDebugEnabled()) {
+ log.debug("isRekeyTimeIntervalExceeded({}) re-keying: last={}, now={}, diff={}, max={}",
+ this, new Date(lastKeyTimeValue.get()), new Date(now),
+ rekeyDiff, maxRekeyInterval);
+ }
+ }
+
+ return rekey;
+ }
+
+ protected boolean isRekeyPacketCountsExceeded() {
+ if (maxRekyPackets <= 0L) {
+ return false; // disabled
+ }
+
+ boolean rekey = (inPacketsCount.get() > maxRekyPackets) || (outPacketsCount.get() > maxRekyPackets);
+ if (rekey) {
+ if (log.isDebugEnabled()) {
+ log.debug("isRekeyPacketCountsExceeded({}) re-keying: in={}, out={}, max={}",
+ this, inPacketsCount, outPacketsCount, maxRekyPackets);
+ }
+ }
+
+ return rekey;
+ }
+
+ protected boolean isRekeyDataSizeExceeded() {
+ if (maxRekeyBytes <= 0L) {
+ return false;
+ }
+
+ boolean rekey = (inBytesCount.get() > maxRekeyBytes) || (outBytesCount.get() > maxRekeyBytes);
+ if (rekey) {
+ if (log.isDebugEnabled()) {
+ log.debug("isRekeyDataSizeExceeded({}) re-keying: in={}, out={}, max={}",
+ this, inBytesCount, outBytesCount, maxRekeyBytes);
+ }
+ }
+
+ return rekey;
+ }
+
+ protected boolean isRekeyBlocksCountExceeded() {
+ long maxBlocks = maxRekeyBlocks.get();
+ if (maxBlocks <= 0L) {
+ return false;
+ }
+
+ boolean rekey = (inBlocksCount.get() > maxBlocks) || (outBlocksCount.get() > maxBlocks);
+ if (rekey) {
+ if (log.isDebugEnabled()) {
+ log.debug("isRekeyBlocksCountExceeded({}) re-keying: in={}, out={}, max={}",
+ this, inBlocksCount, outBlocksCount, maxBlocks);
+ }
+ }
+
+ return rekey;
}
protected byte[] sendKexInit() throws IOException {
@@ -1569,8 +1690,8 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
Map<KexProposalOption, String> proposal = createProposal(resolvedAlgorithms);
byte[] seed = sendKexInit(proposal);
- if (log.isDebugEnabled()) {
- log.debug("sendKexInit(" + proposal + ") seed: " + BufferUtils.printHex(':', seed));
+ if (log.isTraceEnabled()) {
+ log.trace("sendKexInit({}) proposal={} seed: {}", this, proposal, BufferUtils.printHex(':', seed));
}
setKexSeed(seed);
return seed;
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/common/util/buffer/BufferUtils.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/buffer/BufferUtils.java b/sshd-core/src/main/java/org/apache/sshd/common/util/buffer/BufferUtils.java
index 676549f..dc9a9cb 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/buffer/BufferUtils.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/buffer/BufferUtils.java
@@ -311,6 +311,7 @@ public final class BufferUtils {
}
return true;
}
+
public static int getNextPowerOf2(int value) {
// for 0-7 return 8
return (value < Byte.SIZE) ? Byte.SIZE : NumberUtils.getNextPowerOf2(value);
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
index 92fd295..2e63121 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/ServerFactoryManager.java
@@ -92,19 +92,52 @@ public interface ServerFactoryManager extends FactoryManager, ServerAuthenticati
/**
* Key re-exchange will be automatically performed after the session
- * has sent or received the given amount of bytes.
- * The default value is 1 gigabyte.
+ * has sent or received the given amount of bytes. If non-positive,
+ * then disabled. The default value is {@link #DEFAULT_REKEY_BYTES_LIMIT}
*/
String REKEY_BYTES_LIMIT = "rekey-bytes-limit";
/**
+ * Default value for {@link #REKEY_BYTES_LIMIT} if no override
+ * @see <A HREF="https://tools.ietf.org/html/rfc4253#page-23">RFC4253 section 9</A>
+ */
+ long DEFAULT_REKEY_BYTES_LIMIT = 1024L * 1024L * 1024L; // 1GB
+
+ /**
* Key re-exchange will be automatically performed after the specified
- * amount of time has elapsed since the last key exchange. In milliseconds.
- * The default value is 1 hour.
+ * amount of time has elapsed since the last key exchange - in milliseconds.
+ * If non-positive then disabled. The default value is {@link #DEFAULT_REKEY_TIME_LIMIT}
*/
String REKEY_TIME_LIMIT = "rekey-time-limit";
/**
+ * Default value for {@link #REKEY_TIME_LIMIT} if none specified
+ * @see <A HREF="https://tools.ietf.org/html/rfc4253#page-23">RFC4253 section 9</A>
+ */
+ long DEFAULT_REKEY_TIME_LIMIT = 60L * 60L * 1000L; // 1 hour
+
+ /**
+ * Key re-exchange will be automatically performed after the specified
+ * number of packets has been exchanged - positive 64-bit value. If
+ * non-positive then disabled. The default is {@link #DEFAULT_REKEY_PACKETS_LIMIT}
+ */
+ String REKEY_PACKETS_LIMIT = "rekey-packets-limit";
+
+ /**
+ * Default value for {@link #REKEY_PACKETS_LIMIT} if none specified
+ * @see <A HREF="https://tools.ietf.org/html/rfc4344#page-3">RFC4344 section 3.1</A>
+ */
+ long DEFAULT_REKEY_PACKETS_LIMIT = 1L << 31;
+
+ /**
+ * Key re-exchange will be automatically performed after the specified
+ * number of cipher blocks has been processed - positive 64-bit value. If
+ * non-positive then disabled. The default is calculated according to
+ * <A HREF="https://tools.ietf.org/html/rfc4344#page-3">RFC4344 section 3.2</A>
+ */
+ String REKEY_BLOCKS_LIMIT = "rekey-blocks-limit";
+
+ /**
* A URL pointing to the moduli file.
* If not specified, the default internal file will be used.
*/
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
index 479159a..f506924 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/session/ServerSessionImpl.java
@@ -24,7 +24,6 @@ import java.security.KeyPair;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
-
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.NamedResource;
import org.apache.sshd.common.PropertyResolverUtils;
@@ -51,15 +50,9 @@ import org.apache.sshd.server.ServerFactoryManager;
* @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
*/
public class ServerSessionImpl extends AbstractServerSession {
- protected static final long MAX_PACKETS = 1L << 31;
-
- private long maxBytes = 1024 * 1024 * 1024; // 1 GB
- private long maxKeyInterval = 60 * 60 * 1000; // 1 hour
-
public ServerSessionImpl(ServerFactoryManager server, IoSession ioSession) throws Exception {
super(server, ioSession);
- maxBytes = Math.max(32, PropertyResolverUtils.getLongProperty(this, ServerFactoryManager.REKEY_BYTES_LIMIT, maxBytes));
- maxKeyInterval = PropertyResolverUtils.getLongProperty(this, ServerFactoryManager.REKEY_TIME_LIMIT, maxKeyInterval);
+
if (log.isDebugEnabled()) {
log.debug("Server session created {}", ioSession);
}
@@ -82,20 +75,6 @@ public class ServerSessionImpl extends AbstractServerSession {
disconnect(SshConstants.SSH2_DISCONNECT_PROTOCOL_ERROR, "Unsupported packet: SSH_MSG_SERVICE_ACCEPT");
}
- @Override
- protected void checkRekey() throws IOException {
- if (KexState.DONE.equals(kexState.get())) {
- long now = System.currentTimeMillis();
- if ((inPacketsCount.get() > MAX_PACKETS)
- || (outPacketsCount.get() > MAX_PACKETS)
- || (inBytesCount.get() > maxBytes)
- || (outBytesCount.get() > maxBytes)
- || ((maxKeyInterval > 0L) && ((now - lastKeyTimeValue.get()) > maxKeyInterval))) {
- reExchangeKeys();
- }
- }
- }
-
protected void sendServerIdentification() {
FactoryManager manager = getFactoryManager();
String ident = PropertyResolverUtils.getString(manager, ServerFactoryManager.SERVER_IDENTIFICATION);
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/0a4becb3/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
index 077e531..34323a6 100644
--- a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
@@ -34,13 +34,12 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.channel.ChannelShell;
import org.apache.sshd.client.channel.ClientChannel;
import org.apache.sshd.client.session.ClientSession;
-import org.apache.sshd.common.PropertyResolverUtils;
import org.apache.sshd.common.NamedFactory;
+import org.apache.sshd.common.PropertyResolverUtils;
import org.apache.sshd.common.cipher.BuiltinCiphers;
import org.apache.sshd.common.future.KeyExchangeFuture;
import org.apache.sshd.common.kex.BuiltinDHFactories;
@@ -49,6 +48,7 @@ import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.SessionListener;
import org.apache.sshd.common.subsystem.sftp.SftpConstants;
import org.apache.sshd.common.util.SecurityUtils;
+import org.apache.sshd.common.util.io.NullOutputStream;
import org.apache.sshd.server.ServerFactoryManager;
import org.apache.sshd.server.SshServer;
import org.apache.sshd.util.test.BaseTestSupport;
@@ -83,7 +83,7 @@ public class KeyReExchangeTest extends BaseTestSupport {
sshd.stop(true);
}
- protected void setUp(long bytesLimit, long timeLimit) throws Exception {
+ protected void setUp(long bytesLimit, long timeLimit, long packetsLimit) throws Exception {
sshd = setupTestServer();
if (bytesLimit > 0L) {
PropertyResolverUtils.updateProperty(sshd, ServerFactoryManager.REKEY_BYTES_LIMIT, bytesLimit);
@@ -91,6 +91,9 @@ public class KeyReExchangeTest extends BaseTestSupport {
if (timeLimit > 0L) {
PropertyResolverUtils.updateProperty(sshd, ServerFactoryManager.REKEY_TIME_LIMIT, timeLimit);
}
+ if (packetsLimit > 0L) {
+ PropertyResolverUtils.updateProperty(sshd, ServerFactoryManager.REKEY_PACKETS_LIMIT, packetsLimit);
+ }
sshd.start();
port = sshd.getPort();
@@ -98,7 +101,7 @@ public class KeyReExchangeTest extends BaseTestSupport {
@Test
public void testSwitchToNoneCipher() throws Exception {
- setUp(0, 0);
+ setUp(0L, 0L, 0L);
sshd.getCipherFactories().add(BuiltinCiphers.none);
try (SshClient client = setupTestClient()) {
@@ -122,7 +125,7 @@ public class KeyReExchangeTest extends BaseTestSupport {
@Test // see SSHD-558
public void testKexFutureExceptionPropagation() throws Exception {
- setUp(0, 0);
+ setUp(0L, 0L, 0L);
sshd.getCipherFactories().add(BuiltinCiphers.none);
try (SshClient client = setupTestClient()) {
@@ -195,7 +198,7 @@ public class KeyReExchangeTest extends BaseTestSupport {
@Test
public void testReExchangeFromJschClient() throws Exception {
Assume.assumeTrue("DH Group Exchange not supported", SecurityUtils.isDHGroupExchangeSupported());
- setUp(0, 0);
+ setUp(0L, 0L, 0L);
JSchLogger.init();
JSch.setConfig("kex", BuiltinDHFactories.Constants.DIFFIE_HELLMAN_GROUP_EXCHANGE_SHA1);
@@ -232,7 +235,7 @@ public class KeyReExchangeTest extends BaseTestSupport {
@Test
public void testReExchangeFromSshdClient() throws Exception {
- setUp(0, 0);
+ setUp(0L, 0L, 0L);
try (SshClient client = setupTestClient()) {
client.start();
@@ -288,22 +291,22 @@ public class KeyReExchangeTest extends BaseTestSupport {
@Test
public void testReExchangeFromServerBySize() throws Exception {
- final long LIMIT = 8192;
- setUp(LIMIT, 0);
+ final long LIMIT = 8192L;
+ setUp(LIMIT, 0L, 0L);
try (SshClient client = setupTestClient()) {
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession();
+ ByteArrayOutputStream sent = new ByteArrayOutputStream();
+ ByteArrayOutputStream out = new ByteArrayOutputStream()) {
session.addPasswordIdentity(getCurrentTestName());
session.auth().verify(5L, TimeUnit.SECONDS);
try (ChannelShell channel = session.createShellChannel();
- ByteArrayOutputStream sent = new ByteArrayOutputStream();
PipedOutputStream pipedIn = new PipedOutputStream();
OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream();
+ OutputStream err = new NullOutputStream();
InputStream inPipe = new PipedInputStream(pipedIn)) {
channel.setIn(inPipe);
@@ -347,6 +350,7 @@ public class KeyReExchangeTest extends BaseTestSupport {
teeOut.flush();
// no need to wait until the limit is reached if a re-key occurred
if (exchanges.get() > 0) {
+ outputDebugMessage("Stop sending after %d bytes - exchanges=%s", sentSize + data.length, exchanges);
break;
}
}
@@ -359,32 +363,36 @@ public class KeyReExchangeTest extends BaseTestSupport {
assertFalse("Timeout while waiting for channel closure", result.contains(ClientChannel.ClientChannelEvent.TIMEOUT));
assertTrue("Expected rekeying", exchanges.get() > 0);
- assertEquals("Mismatched sent data length", sent.toByteArray().length, out.toByteArray().length);
- assertArrayEquals("Mismatched sent data content", sent.toByteArray(), out.toByteArray());
}
+
+ byte[] sentData = sent.toByteArray();
+ byte[] outData = out.toByteArray();
+ assertEquals("Mismatched sent data length", sentData.length, outData.length);
+ assertArrayEquals("Mismatched sent data content", sentData, outData);
} finally {
client.stop();
}
}
}
+
@Test
public void testReExchangeFromServerByTime() throws Exception {
final long TIME = TimeUnit.SECONDS.toMillis(2L);
- setUp(0, TIME);
+ setUp(0L, TIME, 0L);
try (SshClient client = setupTestClient()) {
client.start();
- try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession()) {
+ try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession();
+ ByteArrayOutputStream sent = new ByteArrayOutputStream();
+ ByteArrayOutputStream out = new ByteArrayOutputStream()) {
session.addPasswordIdentity(getCurrentTestName());
session.auth().verify(5L, TimeUnit.SECONDS);
try (ChannelShell channel = session.createShellChannel();
- ByteArrayOutputStream sent = new ByteArrayOutputStream();
PipedOutputStream pipedIn = new PipedOutputStream();
OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream();
+ OutputStream err = new NullOutputStream();
InputStream inPipe = new PipedInputStream(pipedIn)) {
channel.setIn(inPipe);
@@ -426,21 +434,25 @@ public class KeyReExchangeTest extends BaseTestSupport {
final long MAX_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(3L * TIME);
final long MIN_WAIT = 10L;
final long MIN_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(MIN_WAIT);
- for (long timePassed = 0L; timePassed < MAX_WAIT_NANOS; timePassed++) {
+ for (long timePassed = 0L, sentSize = 0L; timePassed < MAX_WAIT_NANOS; timePassed++) {
long nanoStart = System.nanoTime();
teeOut.write(data);
teeOut.write('\n');
teeOut.flush();
+ long nanoEnd = System.nanoTime();
+ long nanoDuration = nanoEnd - nanoStart;
+
+ timePassed += nanoDuration;
+ sentSize += data.length + 1;
+
// no need to wait until the timeout expires if a re-key occurred
if (exchanges.get() > 0) {
+ outputDebugMessage("Stop sending after %d nanos and size=%d - exchanges=%s",
+ timePassed, sentSize, exchanges);
break;
}
- long nanoEnd = System.nanoTime();
- long nanoDuration = nanoEnd - nanoStart;
-
- timePassed += nanoDuration;
if ((timePassed < MAX_WAIT_NANOS) && (nanoDuration < MIN_WAIT_NANOS)) {
Thread.sleep(MIN_WAIT);
}
@@ -454,9 +466,94 @@ public class KeyReExchangeTest extends BaseTestSupport {
assertFalse("Timeout while waiting for channel closure", result.contains(ClientChannel.ClientChannelEvent.TIMEOUT));
assertTrue("Expected rekeying", exchanges.get() > 0);
- assertEquals("Mismatched sent data length", sent.toByteArray().length, out.toByteArray().length);
- assertArrayEquals("Mismatched sent data content", sent.toByteArray(), out.toByteArray());
}
+
+ byte[] sentData = sent.toByteArray();
+ byte[] outData = out.toByteArray();
+ assertEquals("Mismatched sent data length", sentData.length, outData.length);
+ assertArrayEquals("Mismatched sent data content", sentData, outData);
+ } finally {
+ client.stop();
+ }
+ }
+ }
+
+ @Test // see SSHD-601
+ public void testReExchangeFromServerByPackets() throws Exception {
+ final int PACKETS = 128;
+ setUp(0L, 0L, PACKETS);
+
+ try (SshClient client = setupTestClient()) {
+ client.start();
+
+ try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession();
+ ByteArrayOutputStream sent = new ByteArrayOutputStream();
+ ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+ session.addPasswordIdentity(getCurrentTestName());
+ session.auth().verify(5L, TimeUnit.SECONDS);
+
+ try (ChannelShell channel = session.createShellChannel();
+ PipedOutputStream pipedIn = new PipedOutputStream();
+ OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
+ OutputStream err = new NullOutputStream();
+ InputStream inPipe = new PipedInputStream(pipedIn)) {
+
+ channel.setIn(inPipe);
+ channel.setOut(out);
+ channel.setErr(err);
+ channel.open();
+
+ teeOut.write("this is my command\n".getBytes(StandardCharsets.UTF_8));
+ teeOut.flush();
+
+ final AtomicInteger exchanges = new AtomicInteger();
+ session.addSessionListener(new SessionListener() {
+ @Override
+ public void sessionCreated(Session session) {
+ // ignored
+ }
+
+ @Override
+ public void sessionEvent(Session session, Event event) {
+ if (Event.KeyEstablished.equals(event)) {
+ int count = exchanges.incrementAndGet();
+ outputDebugMessage("Key established for %s - count=%d", session, count);
+ }
+ }
+
+ @Override
+ public void sessionClosed(Session session) {
+ // ignored
+ }
+ });
+
+ byte[] data = (getClass().getName() + "#" + getCurrentTestName() + "\n").getBytes(StandardCharsets.UTF_8);
+ for (int index = 0; index < (PACKETS * 2); index++) {
+ teeOut.write(data);
+ teeOut.flush();
+
+ // no need to wait until the packets limit is reached if a re-key occurred
+ if (exchanges.get() > 0) {
+ outputDebugMessage("Stop sending after % packets and %d bytes - exchanges=%s",
+ (index + 1), (index + 1L) * data.length, exchanges);
+ break;
+ }
+ }
+
+ teeOut.write("exit\n".getBytes(StandardCharsets.UTF_8));
+ teeOut.flush();
+
+ Collection<ClientChannel.ClientChannelEvent> result =
+ channel.waitFor(EnumSet.of(ClientChannel.ClientChannelEvent.CLOSED), TimeUnit.SECONDS.toMillis(15L));
+ assertFalse("Timeout while waiting for channel closure", result.contains(ClientChannel.ClientChannelEvent.TIMEOUT));
+
+ assertTrue("Expected rekeying", exchanges.get() > 0);
+ }
+
+ byte[] sentData = sent.toByteArray();
+ byte[] outData = out.toByteArray();
+ assertEquals("Mismatched sent data length", sentData.length, outData.length);
+ assertArrayEquals("Mismatched sent data content", sentData, outData);
} finally {
client.stop();
}