You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2020/04/12 16:44:42 UTC
[mina-sshd] branch master updated: [SSHD-966] Separated pending
packets enqueue to separated method in AbstractSession
This is an automated email from the ASF dual-hosted git repository.
lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
The following commit(s) were added to refs/heads/master by this push:
new fd0599c [SSHD-966] Separated pending packets enqueue to separated method in AbstractSession
fd0599c is described below
commit fd0599c387fbfb1d538cc7fe6b7334ef274f5803
Author: Lyor Goldstein <lg...@apache.org>
AuthorDate: Sun Mar 8 20:44:02 2020 +0200
[SSHD-966] Separated pending packets enqueue to separated method in AbstractSession
---
.../common/session/helpers/AbstractSession.java | 69 ++++++++++++++++------
1 file changed, 50 insertions(+), 19 deletions(-)
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index 5c95ab3..6718034 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -755,9 +755,9 @@ public abstract class AbstractSession extends SessionHelper {
int numPending = packetsQueue.size();
List<SimpleImmutableEntry<PendingWriteFuture, IoWriteFuture>> pendingWrites = new ArrayList<>(numPending);
synchronized (encodeLock) {
- for (PendingWriteFuture future = pendingPackets.poll();
+ for (PendingWriteFuture future = packetsQueue.poll();
future != null;
- future = pendingPackets.poll()) {
+ future = packetsQueue.poll()) {
IoWriteFuture writeFuture = doWritePacket(future.getBuffer());
pendingWrites.add(new SimpleImmutableEntry<>(future, writeFuture));
}
@@ -853,23 +853,9 @@ public abstract class AbstractSession extends SessionHelper {
@Override
public IoWriteFuture writePacket(Buffer buffer) throws IOException {
// While exchanging key, queue high level packets
- if (!KexState.DONE.equals(kexState.get())) {
- byte[] bufData = buffer.array();
- int cmd = bufData[buffer.rpos()] & 0xFF;
- if (cmd > SshConstants.SSH_MSG_KEX_LAST) {
- String cmdName = SshConstants.getCommandMessageName(cmd);
- boolean debugEnabled = log.isDebugEnabled();
- synchronized (pendingPackets) {
- if (!KexState.DONE.equals(kexState.get())) {
- if (pendingPackets.isEmpty() && debugEnabled) {
- log.debug("writePacket({})[{}] Start flagging packets as pending until key exchange is done", this, cmdName);
- }
- PendingWriteFuture future = new PendingWriteFuture(cmdName, buffer);
- pendingPackets.add(future);
- return future;
- }
- }
- }
+ PendingWriteFuture future = enqueuePendingPacket(buffer);
+ if (future != null) {
+ return future;
}
try {
@@ -889,6 +875,51 @@ public abstract class AbstractSession extends SessionHelper {
}
}
+ /**
+ * Checks if key-exchange is done - if so, or the packet is related to the
+ * key-exchange protocol, then allows the packet to go through, otherwise
+ * enqueues it to be sent when key-exchange completed
+ *
+ * @param buffer The {@link Buffer} containing the packet to be sent
+ * @return A {@link PendingWriteFuture} if enqueued, {@code null} if
+ * packet can go through.
+ */
+ protected PendingWriteFuture enqueuePendingPacket(Buffer buffer) {
+ if (KexState.DONE.equals(kexState.get())) {
+ return null;
+ }
+
+ byte[] bufData = buffer.array();
+ int cmd = bufData[buffer.rpos()] & 0xFF;
+ if (cmd <= SshConstants.SSH_MSG_KEX_LAST) {
+ return null;
+ }
+
+ String cmdName = SshConstants.getCommandMessageName(cmd);
+ PendingWriteFuture future;
+ int numPending;
+ synchronized (pendingPackets) {
+ if (KexState.DONE.equals(kexState.get())) {
+ return null;
+ }
+
+ future = new PendingWriteFuture(cmdName, buffer);
+ pendingPackets.add(future);
+ numPending = pendingPackets.size();
+ }
+
+ if (log.isDebugEnabled()) {
+ if (numPending == 1) {
+ log.debug("enqueuePendingPacket({})[{}] Start flagging packets as pending until key exchange is done", this, cmdName);
+ } else {
+ log.debug("enqueuePendingPacket({})[{}] enqueued until key exchange is done (pending={})", this, cmdName, numPending);
+
+ }
+ }
+
+ return future;
+ }
+
// NOTE: must acquire encodeLock when calling this method
protected Buffer resolveOutputPacket(Buffer buffer) throws IOException {
Buffer ignoreBuf = null;