You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2017/12/20 09:07:22 UTC

mina-sshd git commit: [SSHD-779] Attach pending packets write-completion listeners to their respective write-future(s) outside the encode locking block

Repository: mina-sshd
Updated Branches:
  refs/heads/master 063038150 -> 8136bf615


[SSHD-779] Attach pending packets write-completion listeners to their respective write-future(s) outside the encode locking block


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/8136bf61
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/8136bf61
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/8136bf61

Branch: refs/heads/master
Commit: 8136bf615f63213af50f47584b08e618202008d5
Parents: 0630381
Author: Goldstein Lyor <ly...@c-b4.com>
Authored: Sun Dec 17 07:59:04 2017 +0200
Committer: Goldstein Lyor <ly...@c-b4.com>
Committed: Wed Dec 20 11:06:47 2017 +0200

----------------------------------------------------------------------
 .../helpers/AbstractConnectionService.java      | 15 ++---
 .../common/session/helpers/AbstractSession.java | 58 +++++++++++++++-----
 2 files changed, 52 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8136bf61/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
index bcde65b..8f79530 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
@@ -34,6 +34,7 @@ import java.util.function.IntUnaryOperator;
 import org.apache.sshd.agent.common.AgentForwardSupport;
 import org.apache.sshd.agent.common.DefaultAgentForwardSupport;
 import org.apache.sshd.client.channel.AbstractClientChannel;
+import org.apache.sshd.client.future.OpenFuture;
 import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.NamedFactory;
@@ -622,7 +623,8 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
         }
 
         int channelId = registerChannel(channel);
-        channel.open(sender, rwsize, rmpsize, buffer).addListener(future -> {
+        OpenFuture openFuture = channel.open(sender, rwsize, rmpsize, buffer);
+        openFuture.addListener(future -> {
             try {
                 if (future.isOpened()) {
                     Window window = channel.getLocalWindow();
@@ -683,7 +685,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
      * @param buffer The request {@link Buffer}
      * @throws Exception If failed to process the request
      */
-    protected void globalRequest(Buffer buffer) throws Exception {
+    protected IoWriteFuture globalRequest(Buffer buffer) throws Exception {
         String req = buffer.getString();
         boolean wantReply = buffer.getBoolean();
         if (log.isDebugEnabled()) {
@@ -715,18 +717,17 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
                                   this, handler.getClass().getSimpleName(), req, wantReply, result);
                     }
                 } else {
-                    sendGlobalResponse(buffer, req, result, wantReply);
-                    return;
+                    return sendGlobalResponse(buffer, req, result, wantReply);
                 }
             }
         }
 
-        handleUnknownRequest(buffer, req, wantReply);
+        return handleUnknownRequest(buffer, req, wantReply);
     }
 
-    protected void handleUnknownRequest(Buffer buffer, String req, boolean wantReply) throws IOException {
+    protected IoWriteFuture handleUnknownRequest(Buffer buffer, String req, boolean wantReply) throws IOException {
         log.warn("handleUnknownRequest({}) unknown global request: {}", this, req);
-        sendGlobalResponse(buffer, req, RequestHandler.Result.Unsupported, wantReply);
+        return sendGlobalResponse(buffer, req, RequestHandler.Result.Unsupported, wantReply);
     }
 
     protected IoWriteFuture sendGlobalResponse(Buffer buffer, String req, RequestHandler.Result result, boolean wantReply) throws IOException {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8136bf61/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index 940b830..474eda6 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -66,6 +66,7 @@ import org.apache.sshd.common.forward.PortForwardingEventListener;
 import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
 import org.apache.sshd.common.future.DefaultSshFuture;
 import org.apache.sshd.common.future.KeyExchangeFuture;
+import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoSession;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.kex.AbstractKexFactoryManager;
@@ -844,20 +845,25 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         }
 
         signalSessionEvent(SessionListener.Event.KeyEstablished);
+        Collection<? extends Map.Entry<? extends SshFutureListener<IoWriteFuture>, IoWriteFuture>> pendingWrites;
         synchronized (pendingPackets) {
-            if (!pendingPackets.isEmpty()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("handleNewKeys({}) Dequeing {} pending packets", this, pendingPackets.size());
-                }
-                synchronized (encodeLock) {
-                    for (PendingWriteFuture future = pendingPackets.poll();
-                            future != null;
-                            future = pendingPackets.poll()) {
-                        doWritePacket(future.getBuffer()).addListener(future);
-                    }
+            pendingWrites = sendPendingPackets(pendingPackets);
+            kexState.set(KexState.DONE);
+        }
+
+        int pendingCount = pendingWrites.size();
+        if (pendingCount > 0) {
+            if (log.isDebugEnabled()) {
+                log.debug("handleNewKeys({}) sent {} pending packets", this, pendingCount);
+            }
+
+            for (Map.Entry<? extends SshFutureListener<IoWriteFuture>, IoWriteFuture> pe : pendingWrites) {
+                SshFutureListener<IoWriteFuture> listener = pe.getKey();
+                IoWriteFuture future = pe.getValue();
+                if (listener != null) {
+                    future.addListener(listener);
                 }
             }
-            kexState.set(KexState.DONE);
         }
 
         synchronized (lock) {
@@ -865,6 +871,25 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         }
     }
 
+    protected List<Pair<PendingWriteFuture, IoWriteFuture>> sendPendingPackets(Queue<PendingWriteFuture> packetsQueue) throws IOException {
+        if (GenericUtils.isEmpty(packetsQueue)) {
+            return Collections.emptyList();
+        }
+
+        int numPending = packetsQueue.size();
+        List<Pair<PendingWriteFuture, IoWriteFuture>> pendingWrites = new ArrayList<>(numPending);
+        synchronized (encodeLock) {
+            for (PendingWriteFuture future = pendingPackets.poll();
+                    future != null;
+                    future = pendingPackets.poll()) {
+                IoWriteFuture writeFuture = doWritePacket(future.getBuffer());
+                pendingWrites.add(new Pair<>(future, writeFuture));
+            }
+        }
+
+        return pendingWrites;
+    }
+
     protected void validateKexState(int cmd, KexState expected) {
         KexState actual = kexState.get();
         if (!expected.equals(actual)) {
@@ -1034,11 +1059,15 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
 
     @Override
     public <T extends Service> T getService(Class<T> clazz) {
-        for (Service s : getServices()) {
+        Collection<? extends Service> registeredServices = getServices();
+        ValidateUtils.checkState(GenericUtils.isNotEmpty(registeredServices), "No registered services to look for %s", clazz.getSimpleName());
+
+        for (Service s : registeredServices) {
             if (clazz.isInstance(s)) {
                 return clazz.cast(s);
             }
         }
+
         throw new IllegalStateException("Attempted to access unknown service " + clazz.getSimpleName());
     }
 
@@ -1084,7 +1113,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
             }
             future.setValue(t);
         }, timeout, unit);
-        future.addListener(future1 -> sched.cancel(false));
+        future.addListener(f -> sched.cancel(false));
         return writeFuture;
     }
 
@@ -1176,13 +1205,14 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
         }
 
         Object result;
+        boolean traceEnabled = log.isTraceEnabled();
         synchronized (requestLock) {
             try {
                 writePacket(buffer);
 
                 synchronized (requestResult) {
                     while (isOpen() && (maxWaitMillis > 0L) && (requestResult.get() == null)) {
-                        if (log.isTraceEnabled()) {
+                        if (traceEnabled) {
                             log.trace("request({})[{}] remaining wait={}", this, request, maxWaitMillis);
                         }