You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2017/12/20 09:07:22 UTC
mina-sshd git commit: [SSHD-779] Attach pending packets
write-completion listeners to their respective write-future(s) outside the
encode locking block
Repository: mina-sshd
Updated Branches:
refs/heads/master 063038150 -> 8136bf615
[SSHD-779] Attach pending packets write-completion listeners to their respective write-future(s) outside the encode locking block
Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/8136bf61
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/8136bf61
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/8136bf61
Branch: refs/heads/master
Commit: 8136bf615f63213af50f47584b08e618202008d5
Parents: 0630381
Author: Goldstein Lyor <ly...@c-b4.com>
Authored: Sun Dec 17 07:59:04 2017 +0200
Committer: Goldstein Lyor <ly...@c-b4.com>
Committed: Wed Dec 20 11:06:47 2017 +0200
----------------------------------------------------------------------
.../helpers/AbstractConnectionService.java | 15 ++---
.../common/session/helpers/AbstractSession.java | 58 +++++++++++++++-----
2 files changed, 52 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8136bf61/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
index bcde65b..8f79530 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
@@ -34,6 +34,7 @@ import java.util.function.IntUnaryOperator;
import org.apache.sshd.agent.common.AgentForwardSupport;
import org.apache.sshd.agent.common.DefaultAgentForwardSupport;
import org.apache.sshd.client.channel.AbstractClientChannel;
+import org.apache.sshd.client.future.OpenFuture;
import org.apache.sshd.common.Closeable;
import org.apache.sshd.common.FactoryManager;
import org.apache.sshd.common.NamedFactory;
@@ -622,7 +623,8 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
}
int channelId = registerChannel(channel);
- channel.open(sender, rwsize, rmpsize, buffer).addListener(future -> {
+ OpenFuture openFuture = channel.open(sender, rwsize, rmpsize, buffer);
+ openFuture.addListener(future -> {
try {
if (future.isOpened()) {
Window window = channel.getLocalWindow();
@@ -683,7 +685,7 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
* @param buffer The request {@link Buffer}
* @throws Exception If failed to process the request
*/
- protected void globalRequest(Buffer buffer) throws Exception {
+ protected IoWriteFuture globalRequest(Buffer buffer) throws Exception {
String req = buffer.getString();
boolean wantReply = buffer.getBoolean();
if (log.isDebugEnabled()) {
@@ -715,18 +717,17 @@ public abstract class AbstractConnectionService<S extends AbstractSession>
this, handler.getClass().getSimpleName(), req, wantReply, result);
}
} else {
- sendGlobalResponse(buffer, req, result, wantReply);
- return;
+ return sendGlobalResponse(buffer, req, result, wantReply);
}
}
}
- handleUnknownRequest(buffer, req, wantReply);
+ return handleUnknownRequest(buffer, req, wantReply);
}
- protected void handleUnknownRequest(Buffer buffer, String req, boolean wantReply) throws IOException {
+ protected IoWriteFuture handleUnknownRequest(Buffer buffer, String req, boolean wantReply) throws IOException {
log.warn("handleUnknownRequest({}) unknown global request: {}", this, req);
- sendGlobalResponse(buffer, req, RequestHandler.Result.Unsupported, wantReply);
+ return sendGlobalResponse(buffer, req, RequestHandler.Result.Unsupported, wantReply);
}
protected IoWriteFuture sendGlobalResponse(Buffer buffer, String req, RequestHandler.Result result, boolean wantReply) throws IOException {
http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/8136bf61/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
index 940b830..474eda6 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractSession.java
@@ -66,6 +66,7 @@ import org.apache.sshd.common.forward.PortForwardingEventListener;
import org.apache.sshd.common.future.DefaultKeyExchangeFuture;
import org.apache.sshd.common.future.DefaultSshFuture;
import org.apache.sshd.common.future.KeyExchangeFuture;
+import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoSession;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.kex.AbstractKexFactoryManager;
@@ -844,20 +845,25 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
}
signalSessionEvent(SessionListener.Event.KeyEstablished);
+ Collection<? extends Map.Entry<? extends SshFutureListener<IoWriteFuture>, IoWriteFuture>> pendingWrites;
synchronized (pendingPackets) {
- if (!pendingPackets.isEmpty()) {
- if (log.isDebugEnabled()) {
- log.debug("handleNewKeys({}) Dequeing {} pending packets", this, pendingPackets.size());
- }
- synchronized (encodeLock) {
- for (PendingWriteFuture future = pendingPackets.poll();
- future != null;
- future = pendingPackets.poll()) {
- doWritePacket(future.getBuffer()).addListener(future);
- }
+ pendingWrites = sendPendingPackets(pendingPackets);
+ kexState.set(KexState.DONE);
+ }
+
+ int pendingCount = pendingWrites.size();
+ if (pendingCount > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("handleNewKeys({}) sent {} pending packets", this, pendingCount);
+ }
+
+ for (Map.Entry<? extends SshFutureListener<IoWriteFuture>, IoWriteFuture> pe : pendingWrites) {
+ SshFutureListener<IoWriteFuture> listener = pe.getKey();
+ IoWriteFuture future = pe.getValue();
+ if (listener != null) {
+ future.addListener(listener);
}
}
- kexState.set(KexState.DONE);
}
synchronized (lock) {
@@ -865,6 +871,25 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
}
}
+ protected List<Pair<PendingWriteFuture, IoWriteFuture>> sendPendingPackets(Queue<PendingWriteFuture> packetsQueue) throws IOException {
+ if (GenericUtils.isEmpty(packetsQueue)) {
+ return Collections.emptyList();
+ }
+
+ int numPending = packetsQueue.size();
+ List<Pair<PendingWriteFuture, IoWriteFuture>> pendingWrites = new ArrayList<>(numPending);
+ synchronized (encodeLock) {
+ for (PendingWriteFuture future = pendingPackets.poll();
+ future != null;
+ future = pendingPackets.poll()) {
+ IoWriteFuture writeFuture = doWritePacket(future.getBuffer());
+ pendingWrites.add(new Pair<>(future, writeFuture));
+ }
+ }
+
+ return pendingWrites;
+ }
+
protected void validateKexState(int cmd, KexState expected) {
KexState actual = kexState.get();
if (!expected.equals(actual)) {
@@ -1034,11 +1059,15 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
@Override
public <T extends Service> T getService(Class<T> clazz) {
- for (Service s : getServices()) {
+ Collection<? extends Service> registeredServices = getServices();
+ ValidateUtils.checkState(GenericUtils.isNotEmpty(registeredServices), "No registered services to look for %s", clazz.getSimpleName());
+
+ for (Service s : registeredServices) {
if (clazz.isInstance(s)) {
return clazz.cast(s);
}
}
+
throw new IllegalStateException("Attempted to access unknown service " + clazz.getSimpleName());
}
@@ -1084,7 +1113,7 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
}
future.setValue(t);
}, timeout, unit);
- future.addListener(future1 -> sched.cancel(false));
+ future.addListener(f -> sched.cancel(false));
return writeFuture;
}
@@ -1176,13 +1205,14 @@ public abstract class AbstractSession extends AbstractKexFactoryManager implemen
}
Object result;
+ boolean traceEnabled = log.isTraceEnabled();
synchronized (requestLock) {
try {
writePacket(buffer);
synchronized (requestResult) {
while (isOpen() && (maxWaitMillis > 0L) && (requestResult.get() == null)) {
- if (log.isTraceEnabled()) {
+ if (traceEnabled) {
log.trace("request({})[{}] remaining wait={}", this, request, maxWaitMillis);
}