You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2020/04/12 16:44:42 UTC

[mina-sshd] branch master updated: [SSHD-966] Separated pending packets enqueue to separated method in AbstractSession

This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git


The following commit(s) were added to refs/heads/master by this push:
     new fd0599c  [SSHD-966] Separated pending packets enqueue to separated method in AbstractSession
fd0599c is described below

commit fd0599c387fbfb1d538cc7fe6b7334ef274f5803
Author: Lyor Goldstein <lg...@apache.org>
AuthorDate: Sun Mar 8 20:44:02 2020 +0200

    [SSHD-966] Separated pending packets enqueue to separated method in AbstractSession
---
 .../common/session/helpers/AbstractSession.java    | 69 ++++++++++++++++------
 1 file changed, 50 insertions(+), 19 deletions(-)

diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index 5c95ab3..6718034 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -755,9 +755,9 @@ public abstract class AbstractSession extends SessionHelper {
         int numPending = packetsQueue.size();
         List<SimpleImmutableEntry<PendingWriteFuture, IoWriteFuture>> pendingWrites = new ArrayList<>(numPending);
         synchronized (encodeLock) {
-            for (PendingWriteFuture future = pendingPackets.poll();
+            for (PendingWriteFuture future = packetsQueue.poll();
                     future != null;
-                    future = pendingPackets.poll()) {
+                    future = packetsQueue.poll()) {
                 IoWriteFuture writeFuture = doWritePacket(future.getBuffer());
                 pendingWrites.add(new SimpleImmutableEntry<>(future, writeFuture));
             }
@@ -853,23 +853,9 @@ public abstract class AbstractSession extends SessionHelper {
     @Override
     public IoWriteFuture writePacket(Buffer buffer) throws IOException {
         // While exchanging key, queue high level packets
-        if (!KexState.DONE.equals(kexState.get())) {
-            byte[] bufData = buffer.array();
-            int cmd = bufData[buffer.rpos()] & 0xFF;
-            if (cmd > SshConstants.SSH_MSG_KEX_LAST) {
-                String cmdName = SshConstants.getCommandMessageName(cmd);
-                boolean debugEnabled = log.isDebugEnabled();
-                synchronized (pendingPackets) {
-                    if (!KexState.DONE.equals(kexState.get())) {
-                        if (pendingPackets.isEmpty() && debugEnabled) {
-                            log.debug("writePacket({})[{}] Start flagging packets as pending until key exchange is done", this, cmdName);
-                        }
-                        PendingWriteFuture future = new PendingWriteFuture(cmdName, buffer);
-                        pendingPackets.add(future);
-                        return future;
-                    }
-                }
-            }
+        PendingWriteFuture future = enqueuePendingPacket(buffer);
+        if (future != null) {
+            return future;
         }
 
         try {
@@ -889,6 +875,51 @@ public abstract class AbstractSession extends SessionHelper {
         }
     }
 
+    /**
+     * Checks if key-exchange is done - if so, or the packet is related to the
+     * key-exchange protocol, then allows the packet to go through, otherwise
+     * enqueues it to be sent when key-exchange completed
+     *
+     * @param buffer The {@link Buffer} containing the packet to be sent
+     * @return A {@link PendingWriteFuture} if enqueued, {@code null} if
+     * packet can go through.
+     */
+    protected PendingWriteFuture enqueuePendingPacket(Buffer buffer) {
+        if (KexState.DONE.equals(kexState.get())) {
+            return null;
+        }
+
+        byte[] bufData = buffer.array();
+        int cmd = bufData[buffer.rpos()] & 0xFF;
+        if (cmd <= SshConstants.SSH_MSG_KEX_LAST) {
+            return null;
+        }
+
+        String cmdName = SshConstants.getCommandMessageName(cmd);
+        PendingWriteFuture future;
+        int numPending;
+        synchronized (pendingPackets) {
+            if (KexState.DONE.equals(kexState.get())) {
+                return null;
+            }
+
+            future = new PendingWriteFuture(cmdName, buffer);
+            pendingPackets.add(future);
+            numPending = pendingPackets.size();
+        }
+
+        if (log.isDebugEnabled()) {
+            if (numPending == 1) {
+                log.debug("enqueuePendingPacket({})[{}] Start flagging packets as pending until key exchange is done", this, cmdName);
+            } else {
+                log.debug("enqueuePendingPacket({})[{}] enqueued until key exchange is done (pending={})", this, cmdName, numPending);
+
+            }
+        }
+
+        return future;
+    }
+
     // NOTE: must acquire encodeLock when calling this method
     protected Buffer resolveOutputPacket(Buffer buffer) throws IOException {
         Buffer ignoreBuf = null;