You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by lg...@apache.org on 2019/02/28 07:00:55 UTC

[mina-sshd] 01/06: [SSHD-782] Moved heartbeat support to AbstractConnectionService

This is an automated email from the ASF dual-hosted git repository.

lgoldstein pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mina-sshd.git

commit c374791ff85b0eff2b6c7d8e5cbcb077967a49d1
Author: Lyor Goldstein <lg...@apache.org>
AuthorDate: Tue Feb 26 11:53:53 2019 +0200

    [SSHD-782] Moved heartbeat support to AbstractConnectionService
---
 .../client/session/ClientConnectionService.java    | 80 +++++++++++++---------
 .../session/helpers/AbstractConnectionService.java | 51 +++++++++++++-
 2 files changed, 98 insertions(+), 33 deletions(-)

diff --git a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
index 497e61e..06c6000 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/session/ClientConnectionService.java
@@ -31,6 +31,7 @@ import org.apache.sshd.common.SshException;
 import org.apache.sshd.common.io.AbstractIoWriteFuture;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.session.helpers.AbstractConnectionService;
+import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.server.x11.X11ForwardSupport;
 
@@ -42,11 +43,18 @@ import org.apache.sshd.server.x11.X11ForwardSupport;
 public class ClientConnectionService
         extends AbstractConnectionService
         implements ClientSessionHolder {
-
-    private ScheduledFuture<?> heartBeat;
+    protected final String heartbeatRequest;
+    protected final long heartbeatInterval;
+    /** Non-null only if using the &quot;keep-alive&quot; request mechanism */
+    protected ScheduledFuture<?> clientHeartbeat;
 
     public ClientConnectionService(AbstractClientSession s) throws SshException {
         super(s);
+
+        heartbeatRequest = s.getStringProperty(
+            ClientFactoryManager.HEARTBEAT_REQUEST, ClientFactoryManager.DEFAULT_KEEP_ALIVE_HEARTBEAT_STRING);
+        heartbeatInterval = s.getLongProperty(
+            ClientFactoryManager.HEARTBEAT_INTERVAL, ClientFactoryManager.DEFAULT_HEARTBEAT_INTERVAL);
     }
 
     @Override
@@ -65,60 +73,70 @@ public class ClientConnectionService
         if (!session.isAuthenticated()) {
             throw new IllegalStateException("Session is not authenticated");
         }
-        startHeartBeat();
+        super.start();
     }
 
     @Override
-    protected void preClose() {
-        stopHeartBeat();
-        super.preClose();
-    }
+    protected synchronized ScheduledFuture<?> startHeartBeat() {
+        if ((heartbeatInterval > 0L) && GenericUtils.isNotEmpty(heartbeatRequest)) {
+            stopHeartBeat();
 
-    protected synchronized void startHeartBeat() {
-        stopHeartBeat();
-        ClientSession session = getClientSession();
-        long interval = session.getLongProperty(ClientFactoryManager.HEARTBEAT_INTERVAL, ClientFactoryManager.DEFAULT_HEARTBEAT_INTERVAL);
-        if (interval > 0L) {
+            ClientSession session = getClientSession();
             FactoryManager manager = session.getFactoryManager();
             ScheduledExecutorService service = manager.getScheduledExecutorService();
-            heartBeat = service.scheduleAtFixedRate(this::sendHeartBeat, interval, interval, TimeUnit.MILLISECONDS);
+            clientHeartbeat = service.scheduleAtFixedRate(
+                this::sendHeartBeat, heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
             if (log.isDebugEnabled()) {
-                log.debug("startHeartbeat - started at interval={}", interval);
+                log.debug("startHeartbeat({}) - started at interval={} with request={}",
+                    session, heartbeatInterval, heartbeatRequest);
             }
+
+            return clientHeartbeat;
+        } else {
+            return super.startHeartBeat();
         }
     }
 
+    @Override
     protected synchronized void stopHeartBeat() {
-        if (heartBeat != null) {
-            heartBeat.cancel(true);
-            heartBeat = null;
+        try {
+            super.stopHeartBeat();
+        } finally {
+            // No need to cancel since this is the same reference as the superclass heartbeat future
+            if (clientHeartbeat != null) {
+                clientHeartbeat = null;
+            }
         }
     }
 
-    /**
-     * Sends a heartbeat message
-     * @return The {@link IoWriteFuture} that can be used to wait for the
-     * message write completion
-     */
+    @Override
     protected IoWriteFuture sendHeartBeat() {
+        if (clientHeartbeat == null) {
+            return super.sendHeartBeat();
+        }
+
         ClientSession session = getClientSession();
-        String request = session.getStringProperty(ClientFactoryManager.HEARTBEAT_REQUEST, ClientFactoryManager.DEFAULT_KEEP_ALIVE_HEARTBEAT_STRING);
         try {
-            Buffer buf = session.createBuffer(SshConstants.SSH_MSG_GLOBAL_REQUEST, request.length() + Byte.SIZE);
-            buf.putString(request);
+            Buffer buf = session.createBuffer(
+                SshConstants.SSH_MSG_GLOBAL_REQUEST, heartbeatRequest.length() + Byte.SIZE);
+            buf.putString(heartbeatRequest);
             buf.putBoolean(false);
             IoWriteFuture future = session.writePacket(buf);
             future.addListener(this::futureDone);
             return future;
-        } catch (IOException e) {
-            getSession().exceptionCaught(e);
+        } catch (IOException | RuntimeException e) {
+            session.exceptionCaught(e);
             if (log.isDebugEnabled()) {
-                log.debug("Error (" + e.getClass().getSimpleName() + ") sending keepalive message=" + request + ": " + e.getMessage());
+                log.debug("sendHeartBeat({}) failed ({}) to send request={}: {}",
+                    session, e.getClass().getSimpleName(), heartbeatRequest, e.getMessage());
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("sendHeartBeat(" + session + ") exception details", e);
             }
-            Throwable t = e;
-            return new AbstractIoWriteFuture(request, null) {
+
+            return new AbstractIoWriteFuture(heartbeatRequest, null) {
                 {
-                    setValue(t);
+                    setValue(e);
                 }
             };
         }
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
index fcac6f2..4aacf7d 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/session/helpers/AbstractConnectionService.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -52,6 +53,7 @@ import org.apache.sshd.common.forward.PortForwardingEventListenerManager;
 import org.apache.sshd.common.io.AbstractIoWriteFuture;
 import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.session.ConnectionService;
+import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.session.UnknownChannelReferenceHandler;
 import org.apache.sshd.common.util.EventListenerUtils;
 import org.apache.sshd.common.util.GenericUtils;
@@ -96,6 +98,8 @@ public abstract class AbstractConnectionService
      */
     protected final AtomicInteger nextChannelId = new AtomicInteger(0);
 
+    private ScheduledFuture<?> heartBeat;
+
     private final AtomicReference<AgentForwardSupport> agentForwardHolder = new AtomicReference<>();
     private final AtomicReference<X11ForwardSupport> x11ForwardHolder = new AtomicReference<>();
     private final AtomicReference<ForwardingFilter> forwarderHolder = new AtomicReference<>();
@@ -108,7 +112,8 @@ public abstract class AbstractConnectionService
 
     protected AbstractConnectionService(AbstractSession session) {
         sessionInstance = Objects.requireNonNull(session, "No session");
-        listenerProxy = EventListenerUtils.proxyWrapper(PortForwardingEventListener.class, getClass().getClassLoader(), listeners);
+        listenerProxy = EventListenerUtils.proxyWrapper(
+            PortForwardingEventListener.class, getClass().getClassLoader(), listeners);
     }
 
     @Override
@@ -170,7 +175,48 @@ public abstract class AbstractConnectionService
 
     @Override
     public void start() {
-        // do nothing
+        startHeartBeat();
+    }
+
+    protected synchronized ScheduledFuture<?> startHeartBeat() {
+        // TODO SSHD-782
+        return null;
+    }
+
+    /**
+     * Sends a heartbeat message/packet
+     *
+     * @return The {@link IoWriteFuture} that can be used to wait for the
+     * message write completion - {@code null} if no heartbeat sent
+     */
+    protected IoWriteFuture sendHeartBeat() {
+        // TODO SSHD-782
+        return null;
+    }
+
+    protected synchronized void stopHeartBeat() {
+        boolean debugEnabled = log.isDebugEnabled();
+        Session session = getSession();
+        if (heartBeat == null) {
+            if (debugEnabled) {
+                log.debug("stopHeartBeat({}) no heartbeat to stop", session);
+            }
+            return;
+        }
+
+        if (debugEnabled) {
+            log.debug("stopHeartBeat({}) stopping", session);
+        }
+
+        try {
+            heartBeat.cancel(true);
+        } finally {
+            heartBeat = null;
+        }
+
+        if (debugEnabled) {
+            log.debug("stopHeartBeat({}) stopped", session);
+        }
     }
 
     @Override
@@ -195,6 +241,7 @@ public abstract class AbstractConnectionService
 
     @Override
     protected void preClose() {
+        stopHeartBeat();
         this.listeners.clear();
         this.managersHolder.clear();
         super.preClose();