You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2019/02/28 07:00:55 UTC
[mina-sshd] 01/06: [SSHD-782] Moved heartbeat support to
AbstractConnectionService
This is an automated email from the ASF dual-hosted git repository.
lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git
commit c374791ff85b0eff2b6c7d8e5cbcb077967a49d1
Author: Lyor Goldstein <lg...@apache.org>
AuthorDate: Tue Feb 26 11:53:53 2019 +0200
[SSHD-782] Moved heartbeat support to AbstractConnectionService
---
.../client/session/ClientConnectionService.java | 80 +++++++++++++---------
.../session/helpers/AbstractConnectionService.java | 51 +++++++++++++-
2 files changed, 98 insertions(+), 33 deletions(-)
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
index 497e61e..06c6000 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
@@ -31,6 +31,7 @@ import org.apache.sshd.common.SshException;
import org.apache.sshd.common.io.AbstractIoWriteFuture;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.session.helpers.AbstractConnectionService;
+import org.apache.sshd.common.util.GenericUtils;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.server.x11.X11ForwardSupport;
@@ -42,11 +43,18 @@ import org.apache.sshd.server.x11.X11ForwardSupport;
public class ClientConnectionService
extends AbstractConnectionService
implements ClientSessionHolder {
-
- private ScheduledFuture<?> heartBeat;
+ protected final String heartbeatRequest;
+ protected final long heartbeatInterval;
+ /** Non-null only if using the "keep-alive" request mechanism */
+ protected ScheduledFuture<?> clientHeartbeat;
public ClientConnectionService(AbstractClientSession s) throws SshException {
super(s);
+
+ heartbeatRequest = s.getStringProperty(
+ ClientFactoryManager.HEARTBEAT_REQUEST, ClientFactoryManager.DEFAULT_KEEP_ALIVE_HEARTBEAT_STRING);
+ heartbeatInterval = s.getLongProperty(
+ ClientFactoryManager.HEARTBEAT_INTERVAL, ClientFactoryManager.DEFAULT_HEARTBEAT_INTERVAL);
}
@Override
@@ -65,60 +73,70 @@ public class ClientConnectionService
if (!session.isAuthenticated()) {
throw new IllegalStateException("Session is not authenticated");
}
- startHeartBeat();
+ super.start();
}
@Override
- protected void preClose() {
- stopHeartBeat();
- super.preClose();
- }
+ protected synchronized ScheduledFuture<?> startHeartBeat() {
+ if ((heartbeatInterval > 0L) && GenericUtils.isNotEmpty(heartbeatRequest)) {
+ stopHeartBeat();
- protected synchronized void startHeartBeat() {
- stopHeartBeat();
- ClientSession session = getClientSession();
- long interval = session.getLongProperty(ClientFactoryManager.HEARTBEAT_INTERVAL, ClientFactoryManager.DEFAULT_HEARTBEAT_INTERVAL);
- if (interval > 0L) {
+ ClientSession session = getClientSession();
FactoryManager manager = session.getFactoryManager();
ScheduledExecutorService service = manager.getScheduledExecutorService();
- heartBeat = service.scheduleAtFixedRate(this::sendHeartBeat, interval, interval, TimeUnit.MILLISECONDS);
+ clientHeartbeat = service.scheduleAtFixedRate(
+ this::sendHeartBeat, heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
if (log.isDebugEnabled()) {
- log.debug("startHeartbeat - started at interval={}", interval);
+ log.debug("startHeartbeat({}) - started at interval={} with request={}",
+ session, heartbeatInterval, heartbeatRequest);
}
+
+ return clientHeartbeat;
+ } else {
+ return super.startHeartBeat();
}
}
+ @Override
protected synchronized void stopHeartBeat() {
- if (heartBeat != null) {
- heartBeat.cancel(true);
- heartBeat = null;
+ try {
+ super.stopHeartBeat();
+ } finally {
+ // No need to cancel since this is the same reference as the superclass heartbeat future
+ if (clientHeartbeat != null) {
+ clientHeartbeat = null;
+ }
}
}
- /**
- * Sends a heartbeat message
- * @return The {@link IoWriteFuture} that can be used to wait for the
- * message write completion
- */
+ @Override
protected IoWriteFuture sendHeartBeat() {
+ if (clientHeartbeat == null) {
+ return super.sendHeartBeat();
+ }
+
ClientSession session = getClientSession();
- String request = session.getStringProperty(ClientFactoryManager.HEARTBEAT_REQUEST, ClientFactoryManager.DEFAULT_KEEP_ALIVE_HEARTBEAT_STRING);
try {
- Buffer buf = session.createBuffer(SshConstants.SSH_MSG_GLOBAL_REQUEST, request.length() + Byte.SIZE);
- buf.putString(request);
+ Buffer buf = session.createBuffer(
+ SshConstants.SSH_MSG_GLOBAL_REQUEST, heartbeatRequest.length() + Byte.SIZE);
+ buf.putString(heartbeatRequest);
buf.putBoolean(false);
IoWriteFuture future = session.writePacket(buf);
future.addListener(this::futureDone);
return future;
- } catch (IOException e) {
- getSession().exceptionCaught(e);
+ } catch (IOException | RuntimeException e) {
+ session.exceptionCaught(e);
if (log.isDebugEnabled()) {
- log.debug("Error (" + e.getClass().getSimpleName() + ") sending keepalive message=" + request + ": " + e.getMessage());
+ log.debug("sendHeartBeat({}) failed ({}) to send request={}: {}",
+ session, e.getClass().getSimpleName(), heartbeatRequest, e.getMessage());
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("sendHeartBeat(" + session + ") exception details", e);
}
- Throwable t = e;
- return new AbstractIoWriteFuture(request, null) {
+
+ return new AbstractIoWriteFuture(heartbeatRequest, null) {
{
- setValue(t);
+ setValue(e);
}
};
}
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
index fcac6f2..4aacf7d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -52,6 +53,7 @@ import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
import org.apache.sshd.common.io.AbstractIoWriteFuture;
import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.session.Session;
import org.apache.sshd.common.session.UnknownChannelReferenceHandler;
import org.apache.sshd.common.util.EventListenerUtils;
import org.apache.sshd.common.util.GenericUtils;
@@ -96,6 +98,8 @@ public abstract class AbstractConnectionService
*/
protected final AtomicInteger nextChannelId = new AtomicInteger(0);
+ private ScheduledFuture<?> heartBeat;
+
private final AtomicReference<AgentForwardSupport> agentForwardHolder = new AtomicReference<>();
private final AtomicReference<X11ForwardSupport> x11ForwardHolder = new AtomicReference<>();
private final AtomicReference<ForwardingFilter> forwarderHolder = new AtomicReference<>();
@@ -108,7 +112,8 @@ public abstract class AbstractConnectionService
protected AbstractConnectionService(AbstractSession session) {
sessionInstance = Objects.requireNonNull(session, "No session");
- listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, getClass().getClassLoader(), listeners);
+ listenerProxy = EventListenerUtils.proxyWrapper(
+ PortForwardingEventListener.class, getClass().getClassLoader(), listeners);
}
@Override
@@ -170,7 +175,48 @@ public abstract class AbstractConnectionService
@Override
public void start() {
- // do nothing
+ startHeartBeat();
+ }
+
+ protected synchronized ScheduledFuture<?> startHeartBeat() {
+ // TODO SSHD-782
+ return null;
+ }
+
+ /**
+ * Sends a heartbeat message/packet
+ *
+ * @return The {@link IoWriteFuture} that can be used to wait for the
+ * message write completion - {@code null} if no heartbeat sent
+ */
+ protected IoWriteFuture sendHeartBeat() {
+ // TODO SSHD-782
+ return null;
+ }
+
+ protected synchronized void stopHeartBeat() {
+ boolean debugEnabled = log.isDebugEnabled();
+ Session session = getSession();
+ if (heartBeat == null) {
+ if (debugEnabled) {
+ log.debug("stopHeartBeat({}) no heartbeat to stop", session);
+ }
+ return;
+ }
+
+ if (debugEnabled) {
+ log.debug("stopHeartBeat({}) stopping", session);
+ }
+
+ try {
+ heartBeat.cancel(true);
+ } finally {
+ heartBeat = null;
+ }
+
+ if (debugEnabled) {
+ log.debug("stopHeartBeat({}) stopped", session);
+ }
}
@Override
@@ -195,6 +241,7 @@ public abstract class AbstractConnectionService
@Override
protected void preClose() {
+ stopHeartBeat();
this.listeners.clear();
this.managersHolder.clear();
super.preClose();