You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2015/12/09 11:07:07 UTC

[4/6] mina-sshd git commit: Fixed keys re-exchange request logic in case of automatic re-exchange due to thresholds

Fixed keys re-exchange request logic in case of automatic re-exchange due to thresholds


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/d99d4806
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/d99d4806
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/d99d4806

Branch: refs/heads/master
Commit: d99d4806e00a68a5dcbe5d634fe8b3113c360360
Parents: 05f5d36
Author: Lyor Goldstein <lg...@vmware.com>
Authored: Wed Dec 9 12:00:06 2015 +0200
Committer: Lyor Goldstein <lg...@vmware.com>
Committed: Wed Dec 9 12:00:06 2015 +0200

----------------------------------------------------------------------
 .../sshd/common/session/AbstractSession.java    | 85 +++++++++++++-------
 .../org/apache/sshd/common/session/Session.java |  4 +-
 .../java/org/apache/sshd/KeyReExchangeTest.java | 36 +++++++++
 .../java/org/apache/sshd/server/ServerTest.java |  3 +-
 .../test/OutputCountTrackingOutputStream.java   |  4 +-
 5 files changed, 96 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d99d4806/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
index 74d275f..c7fddef 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/AbstractSession.java
@@ -424,7 +424,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
                 handleIgnore(buffer);
                 break;
             case SshConstants.SSH_MSG_UNIMPLEMENTED:
-                handleUnimplented(buffer);
+                handleUnimplemented(buffer);
                 break;
             case SshConstants.SSH_MSG_DEBUG:
                 handleDebug(buffer);
@@ -439,7 +439,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
                 handleKexInit(buffer);
                 break;
             case SshConstants.SSH_MSG_NEWKEYS:
-                handleNewKeys(cmd);
+                handleNewKeys(cmd, buffer);
                 break;
             default:
                 if ((cmd >= SshConstants.SSH_MSG_KEX_FIRST) && (cmd <= SshConstants.SSH_MSG_KEX_LAST)) {
@@ -521,17 +521,21 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         }
     }
 
-    protected void handleUnimplented(Buffer buffer) throws Exception {
-        int seqNo = buffer.getInt();
+    protected void handleUnimplemented(Buffer buffer) throws Exception {
+        handleUnimplemented(buffer.getInt(), buffer);
+    }
+
+    protected void handleUnimplemented(int seqNo, Buffer buffer) throws Exception {
         if (log.isDebugEnabled()) {
             log.debug("handleUnimplented({}) SSH_MSG_UNIMPLEMENTED #{}", this, seqNo);
         }
     }
 
     protected void handleDebug(Buffer buffer) throws Exception {
-        boolean display = buffer.getBoolean();
-        String msg = buffer.getString();
-        String lang = buffer.getString();
+        handleDebug(buffer.getBoolean(), buffer.getString(), buffer.getString(), buffer);
+    }
+
+    protected void handleDebug(boolean display, String msg, String lang, Buffer buffer) throws Exception {
         if (log.isDebugEnabled()) {
             log.debug("handleDebug({}) SSH_MSG_DEBUG (display={}) [lang={}] '{}'",
                       this, display, lang, msg);
@@ -539,18 +543,23 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
     }
 
     protected void handleDisconnect(Buffer buffer) throws Exception  {
-        int code = buffer.getInt();
-        String msg = buffer.getString();
-        String lang = buffer.getString();
+        handleDisconnect(buffer.getInt(), buffer.getString(), buffer.getString(), buffer);
+    }
+
+    protected void handleDisconnect(int code, String msg, String lang, Buffer buffer) throws Exception {
         if (log.isDebugEnabled()) {
             log.debug("handleDisconnect({}) SSH_MSG_DISCONNECT reason={}, [lang={}] msg={}",
                       this, SshConstants.getDisconnectReasonName(code), lang, msg);
         }
+
         close(true);
     }
 
-    protected void handleServiceRequest(Buffer buffer) throws IOException {
-        String serviceName = buffer.getString();
+    protected void handleServiceRequest(Buffer buffer) throws Exception {
+        handleServiceRequest(buffer.getString(), buffer);
+    }
+
+    protected void handleServiceRequest(String serviceName, Buffer buffer) throws Exception {
         if (log.isDebugEnabled()) {
             log.debug("handleServiceRequest({}) SSH_MSG_SERVICE_REQUEST '{}'", this, serviceName);
         }
@@ -569,16 +578,21 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
             disconnect(SshConstants.SSH2_DISCONNECT_SERVICE_NOT_AVAILABLE, "Bad service request: " + serviceName);
             return;
         }
+
         if (log.isDebugEnabled()) {
             log.debug("handleServiceRequest({}) Accepted service {}", this, serviceName);
         }
+
         Buffer response = prepareBuffer(SshConstants.SSH_MSG_SERVICE_ACCEPT, BufferUtils.clear(buffer));
         response.putString(serviceName);
         writePacket(response);
     }
 
-    protected void handleServiceAccept(Buffer buffer) throws IOException {
-        String serviceName = buffer.getString();
+    protected void handleServiceAccept(Buffer buffer) throws Exception {
+        handleServiceAccept(buffer.getString(), buffer);
+    }
+
+    protected void handleServiceAccept(String serviceName, Buffer buffer) throws Exception {
         if (log.isDebugEnabled()) {
             log.debug("handleServiceAccept({}) SSH_MSG_SERVICE_ACCEPT service={}", this, serviceName);
         }
@@ -605,7 +619,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         sendSessionEvent(SessionListener.Event.KexCompleted);
     }
 
-    protected void handleNewKeys(int cmd) throws Exception {
+    protected void handleNewKeys(int cmd, Buffer buffer) throws Exception {
         if (log.isDebugEnabled()) {
             log.debug("handleNewKeys({}) SSH_MSG_NEWKEYS command={}", this, SshConstants.getCommandMessageName(cmd));
         }
@@ -1692,27 +1706,36 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
 
     @Override
     public KeyExchangeFuture reExchangeKeys() throws IOException {
-        if (kexState.compareAndSet(KexState.DONE, KexState.INIT)) {
-            log.info("reExchangeKeys({}) Initiating key re-exchange", this);
-            sendKexInit();
-
-            DefaultKeyExchangeFuture kexFuture = kexFutureHolder.getAndSet(new DefaultKeyExchangeFuture(null));
-            if (kexFuture != null) {
-                synchronized (kexFuture) {
-                    Object value = kexFuture.getValue();
-                    if (value == null) {
-                        kexFuture.setValue(new SshException("New KEX started while previous one still ongoing"));
-                    }
-                }
-            }
-        }
-
+        requestNewKeysExchange();
         return ValidateUtils.checkNotNull(kexFutureHolder.get(), "No current KEX future on state=%s", kexState.get());
     }
 
     protected void checkRekey() throws IOException {
         if (isRekeyRequired()) {
-            reExchangeKeys();
+            requestNewKeysExchange();
+        }
+    }
+
+    protected void requestNewKeysExchange() throws IOException {
+        if (!kexState.compareAndSet(KexState.DONE, KexState.INIT)) {
+            if (log.isDebugEnabled()) {
+                log.debug("requestNewKeysExchange({}) KEX state not DONE: {}", this, kexState.get());
+            }
+
+            return;
+        }
+
+        log.info("requestNewKeysExchange({}) Initiating key re-exchange", this);
+        sendKexInit();
+
+        DefaultKeyExchangeFuture kexFuture = kexFutureHolder.getAndSet(new DefaultKeyExchangeFuture(null));
+        if (kexFuture != null) {
+            synchronized (kexFuture) {
+                Object value = kexFuture.getValue();
+                if (value == null) {
+                    kexFuture.setValue(new SshException("New KEX started while previous one still ongoing"));
+                }
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d99d4806/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
index db9de87..c31d589 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/Session.java
@@ -226,8 +226,8 @@ public interface Session
     /**
      * Initiate a new key exchange.
      *
-     * @return An {@link KeyExchangeFuture} for awaiting the completion of the exchange
-     * @throws IOException If failed to negotiate keys
+     * @return A {@link KeyExchangeFuture} for awaiting the completion of the exchange
+     * @throws IOException If failed to request keys re-negotiation
      */
     KeyExchangeFuture reExchangeKeys() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d99d4806/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
index 34d1a40..242e3b4 100644
--- a/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/KeyReExchangeTest.java
@@ -264,17 +264,26 @@ public class KeyReExchangeTest extends BaseTestSupport {
                      InputStream inPipe = new PipedInputStream(pipedIn);
                      OutputStream teeOut = new TeeOutputStream(sent, pipedIn);
                      ByteArrayOutputStream out = new ByteArrayOutputStream() {
+                         private long writeCount = 0L;
+
                          @Override
                          public void write(int b) {
                              super.write(b);
+                             updateWriteCount(1L);
                              pipedCount.release(1);
                          }
 
                          @Override
                          public void write(byte[] b, int off, int len) {
                              super.write(b, off, len);
+                             updateWriteCount(len);
                              pipedCount.release(len);
                          }
+
+                         private void updateWriteCount(long delta) {
+                             writeCount += delta;
+                             outputDebugMessage("OUT write count=%d", writeCount);
+                         }
                      };
                      ByteArrayOutputStream err = new ByteArrayOutputStream()) {
 
@@ -333,17 +342,26 @@ public class KeyReExchangeTest extends BaseTestSupport {
             try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession();
                  ByteArrayOutputStream sent = new ByteArrayOutputStream();
                  ByteArrayOutputStream out = new ByteArrayOutputStream() {
+                     private long writeCount = 0L;
+
                      @Override
                      public void write(int b) {
                          super.write(b);
+                         updateWriteCount(1L);
                          pipedCount.release(1);
                      }
 
                      @Override
                      public void write(byte[] b, int off, int len) {
                          super.write(b, off, len);
+                         updateWriteCount(len);
                          pipedCount.release(len);
                      }
+
+                     private void updateWriteCount(long delta) {
+                         writeCount += delta;
+                         outputDebugMessage("OUT write count=%d", writeCount);
+                     }
                  }) {
                 session.addPasswordIdentity(getCurrentTestName());
                 session.auth().verify(5L, TimeUnit.SECONDS);
@@ -436,17 +454,26 @@ public class KeyReExchangeTest extends BaseTestSupport {
             try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession();
                  ByteArrayOutputStream sent = new ByteArrayOutputStream();
                  ByteArrayOutputStream out = new ByteArrayOutputStream() {
+                     private long writeCount = 0L;
+
                      @Override
                      public void write(int b) {
                          super.write(b);
+                         updateWriteCount(1L);
                          pipedCount.release(1);
                      }
 
                      @Override
                      public void write(byte[] b, int off, int len) {
                          super.write(b, off, len);
+                         updateWriteCount(len);
                          pipedCount.release(len);
                      }
+
+                     private void updateWriteCount(long delta) {
+                         writeCount += delta;
+                         outputDebugMessage("OUT write count=%d", writeCount);
+                     }
                  }) {
                 session.addPasswordIdentity(getCurrentTestName());
                 session.auth().verify(5L, TimeUnit.SECONDS);
@@ -557,17 +584,26 @@ public class KeyReExchangeTest extends BaseTestSupport {
             try (ClientSession session = client.connect(getCurrentTestName(), TEST_LOCALHOST, port).verify(7L, TimeUnit.SECONDS).getSession();
                  ByteArrayOutputStream sent = new ByteArrayOutputStream();
                  ByteArrayOutputStream out = new ByteArrayOutputStream() {
+                     private long writeCount = 0L;
+
                      @Override
                      public void write(int b) {
                          super.write(b);
+                         updateWriteCount(1L);
                          pipedCount.release(1);
                      }
 
                      @Override
                      public void write(byte[] b, int off, int len) {
                          super.write(b, off, len);
+                         updateWriteCount(len);
                          pipedCount.release(len);
                      }
+
+                     private void updateWriteCount(long delta) {
+                         writeCount += delta;
+                         outputDebugMessage("OUT write count=%d", writeCount);
+                     }
                  }) {
                 session.addPasswordIdentity(getCurrentTestName());
                 session.auth().verify(5L, TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d99d4806/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java b/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java
index e45c51c..8bf16db 100644
--- a/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java
+++ b/sshd-core/src/test/java/org/apache/sshd/server/ServerTest.java
@@ -167,7 +167,8 @@ public class ServerTest extends BaseTestSupport {
                 assertFalse("Authentication unexpectedly successful", authFuture.isSuccess());
             } while (authFuture.getException() == null);
 
-            assertNotNull("Missing auth future exception", authFuture.getException());
+            Throwable t = authFuture.getException();
+            assertNotNull("Missing auth future exception", t);
             assertTrue("Number trials (" + nbTrials + ") below min.=" + MAX_AUTH_REQUESTS, nbTrials > MAX_AUTH_REQUESTS);
         } finally {
             client.stop();

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/d99d4806/sshd-core/src/test/java/org/apache/sshd/util/test/OutputCountTrackingOutputStream.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/test/java/org/apache/sshd/util/test/OutputCountTrackingOutputStream.java b/sshd-core/src/test/java/org/apache/sshd/util/test/OutputCountTrackingOutputStream.java
index 0cb6ec4..b4eb84c 100644
--- a/sshd-core/src/test/java/org/apache/sshd/util/test/OutputCountTrackingOutputStream.java
+++ b/sshd-core/src/test/java/org/apache/sshd/util/test/OutputCountTrackingOutputStream.java
@@ -35,13 +35,13 @@ public class OutputCountTrackingOutputStream extends FilterOutputStream {
 
     @Override
     public void write(int b) throws IOException {
-        super.write(b);
+        out.write(b);
         updateWriteCount(1L);
     }
 
     @Override
     public void write(byte[] b, int off, int len) throws IOException {
-        super.write(b, off, len);
+        out.write(b, off, len); // don't call super since it calls the single 'write'
         updateWriteCount(len);
     }