You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wicket.apache.org by re...@apache.org on 2022/02/05 15:46:57 UTC

[wicket] branch improvement/reiern70/WICKET-6954-ping-pong-keep-alive created (now d705894)

This is an automated email from the ASF dual-hosted git repository.

reiern70 pushed a change to branch improvement/reiern70/WICKET-6954-ping-pong-keep-alive
in repository https://gitbox.apache.org/repos/asf/wicket.git.


      at d705894  [WICKET-6954] initial work

This branch includes the following new commits:

     new 96e0e01  [WICKET-6954] initial work
     new d705894  [WICKET-6954] initial work

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[wicket] 01/02: [WICKET-6954] initial work

Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

reiern70 pushed a commit to branch improvement/reiern70/WICKET-6954-ping-pong-keep-alive
in repository https://gitbox.apache.org/repos/asf/wicket.git

commit 96e0e0170f8fb2881fbd11c39dcaf45d5eaead31
Author: reiern70 <re...@gmail.com>
AuthorDate: Sat Feb 5 10:26:23 2022 -0500

    [WICKET-6954] initial work
---
 .../wicket/protocol/ws/WebSocketSettings.java      | 229 +++++++++++++++++++++
 .../ws/api/AbstractWebSocketProcessor.java         |  10 +
 .../protocol/ws/api/BaseWebSocketBehavior.java     |  28 +++
 .../protocol/ws/api/IWebSocketConnection.java      |  48 +++++
 .../protocol/ws/api/IWebSocketProcessor.java       |   5 +
 .../ws/api/res/js/wicket-websocket-jquery.js       |  25 +++
 .../ws/api/res/js/wicket-websocket-setup.js.tmpl   |   5 +-
 .../ws/javax/JavaxWebSocketConnection.java         |  56 +++++
 .../protocol/ws/javax/JavaxWebSocketProcessor.java |  16 ++
 9 files changed, 421 insertions(+), 1 deletion(-)

diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
index 7aca394..dd59f49 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
@@ -41,6 +41,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import jakarta.servlet.http.HttpServletRequest;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -140,6 +146,41 @@ public class WebSocketSettings
 	private IWebSocketConnectionFilter connectionFilter;
 
 	/**
+	 * Boolean used to determine if ping-pong heart beat will be used.
+	 */
+	private boolean useHeartBeat = false;
+
+	/**
+	 * Flag used to determine if client will try to reconnect in case of ping-pong failure
+	 */
+	private boolean reconnectOnFailure = false;
+
+	/**
+	 * In case ping or remote client immediately fails, this determines how many times ping
+	 * will be retried before connection it terminated.
+	 */
+	private int maxPingRetries = 3;
+
+	/**
+	 * Periodicity by which the heartbeat timer will kick.
+	 */
+	private long heartBeatPace = Duration.ofSeconds(15).toMillis();
+
+	/**
+	 * The max threshold assumed for network latency.
+	 */
+	private long networkLatencyThreshold = Duration.ofSeconds(2).toMillis();
+
+	/**
+	 * The executor that handles delivering pings to remote peers and checking if peers have
+	 * correctly ponged back (and terminates connections in case not).
+	 */
+	private Executor heartBeatsExecutor = new HeartBeatsExecutor();
+
+	// internal heartbeats timer.
+	private Timer heartBeatsTimer;
+
+	/**
 	 * A {@link org.apache.wicket.protocol.ws.api.IWebSocketSessionConfigurer} that allows to configure
 	 * {@link org.apache.wicket.protocol.ws.api.IWebSocketSession}s.
 	 */
@@ -424,6 +465,167 @@ public class WebSocketSettings
 		return securePort.get();
 	}
 
+	public void setUseHeartBeat(boolean useHeartBeat)
+	{
+		this.useHeartBeat = useHeartBeat;
+	}
+
+	public boolean isUseHeartBeat()
+	{
+		return useHeartBeat;
+	}
+
+	public void setReconnectOnFailure(boolean reconnectOnFailure)
+	{
+		this.reconnectOnFailure = reconnectOnFailure;
+	}
+
+	public boolean isReconnectOnFailure()
+	{
+		return reconnectOnFailure;
+	}
+
+	public long getHeartBeatPace()
+	{
+		return heartBeatPace;
+	}
+
+	public void setHeartBeatPace(long heartBeatPace)
+	{
+		this.heartBeatPace = heartBeatPace;
+	}
+
+	public void setHeartBeatPace(Duration heartBeatPace)
+	{
+		this.heartBeatPace = heartBeatPace.toMillis();
+	}
+
+	public long getNetworkLatencyThreshold()
+	{
+		return networkLatencyThreshold;
+	}
+
+	public void setNetworkLatencyThreshold(long networkLatencyThreshold)
+	{
+		this.networkLatencyThreshold = networkLatencyThreshold;
+	}
+
+	public void setNetworkLatencyThreshold(Duration networkLatencyThreshold)
+	{
+		this.networkLatencyThreshold = networkLatencyThreshold.toMillis();
+	}
+
+	public void setHeartBeatsExecutor(Executor heartBeatsExecutor)
+	{
+		this.heartBeatsExecutor = heartBeatsExecutor;
+	}
+
+	public void setMaxPingRetries(int maxPingRetries)
+	{
+		this.maxPingRetries = maxPingRetries;
+	}
+
+
+	public void startHeartBeatTimer(String application)
+	{
+		if (useHeartBeat ==  false)
+		{
+			LOG.debug("useClientSideHeartBeat is set to false. Thus we won't start heartbeat's sending thread");
+		}
+
+		if (LOG.isDebugEnabled())
+		{
+			LOG.debug("Starting thread pushing hart beats");
+		}
+
+		TimerTask timerTask = new TimerTask()
+		{
+			@Override
+			public void run()
+			{
+				try
+				{
+					sendHeartBeats(application);
+				}
+				catch (Exception e)
+				{
+					LOG.error("Error while checking sessions", e);
+				}
+			}
+		};
+
+		this.heartBeatsTimer = new Timer(true);
+		this.heartBeatsTimer.schedule(timerTask, new Date(), Duration.ofSeconds(heartBeatPace).toMillis());
+	}
+
+	public void stopHeartBeatTimer()
+	{
+		if (LOG.isDebugEnabled())
+		{
+			LOG.debug("Stopping thread pushing hart beats");
+		}
+		if (this.heartBeatsTimer != null)
+		{
+			this.heartBeatsTimer.cancel();
+		}
+	}
+
+	private void sendHeartBeats(Application application)
+	{
+		for (IWebSocketConnection connection: connectionRegistry.getConnections(application))
+		{
+			// connection didn't received the PONG from peer terminate it
+			if (connection.isAlive() == false)
+			{
+				if (connection.getLastTimeAlive() - System.currentTimeMillis() > (heartBeatPace + networkLatencyThreshold))
+				{
+					heartBeatsExecutor.run(() -> terminateConnection(connection));
+				}
+			}
+			else
+			{
+				heartBeatsExecutor.run(() -> ping(connection, maxPingRetries));
+			}
+		}
+	}
+
+	private void sendHeartBeats(String application)
+	{
+		sendHeartBeats(Application.get(application));
+	}
+
+	private void ping(IWebSocketConnection connection, final int pingRetryCounter)
+	{
+		try
+		{
+			connection.ping();
+		}
+		catch (IOException e)
+		{
+			if (pingRetryCounter == 0)
+			{
+				// ping failed enough times kill connection
+				terminateConnection(connection);
+			}
+			else
+			{
+				ping(connection, pingRetryCounter - 1);
+			}
+		}
+	}
+
+	private void terminateConnection(IWebSocketConnection connection)
+	{
+		connection.setAlive(false);
+		if (LOG.isDebugEnabled())
+		{
+			LOG.debug("Terminating connection with ID {} because ping of remote peer failed {} times",
+					connection.getKey(), maxPingRetries);
+		}
+		connection.terminate("Failed to ping remote peer");
+		connectionRegistry.removeConnection(connection.getApplication(), connection.getSessionId(), connection.getKey());
+	}
+
 	/**
 	 * Simple executor that runs the tasks in the caller thread.
 	 */
@@ -487,6 +689,33 @@ public class WebSocketSettings
 		}
 	}
 
+	public static class HeartBeatsExecutor implements Executor
+	{
+
+		private final java.util.concurrent.Executor executor;
+
+
+		public HeartBeatsExecutor()
+		{
+			this(new ThreadPoolExecutor(1, 8,
+					60L, TimeUnit.SECONDS,
+					new SynchronousQueue<>(),
+					new ThreadFactory()));
+		}
+
+		public HeartBeatsExecutor(java.util.concurrent.Executor executor)
+		{
+			this.executor = executor;
+
+		}
+
+		@Override
+		public void run(final Runnable command)
+		{
+			executor.execute(command);
+		}
+	}
+
 	public static class ThreadFactory implements java.util.concurrent.ThreadFactory
 	{
 		private final AtomicInteger counter = new AtomicInteger();
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
index 59e49d1..0290544 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
@@ -16,6 +16,8 @@
  */
 package org.apache.wicket.protocol.ws.api;
 
+import java.nio.ByteBuffer;
+
 import jakarta.servlet.http.HttpServletRequest;
 import jakarta.servlet.http.HttpSession;
 
@@ -147,6 +149,14 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
 		this.connectionFilter = webSocketSettings.getConnectionFilter();
 	}
 
+
+	@Override
+	public void onPong(ByteBuffer byteBuffer) {
+		IKey key = getRegistryKey();
+		IWebSocketConnection webSocketConnection = connectionRegistry.getConnection(getApplication(), getSessionId(), key);
+		webSocketConnection.onPong(byteBuffer);
+	}
+
 	@Override
 	public void onMessage(final String message)
 	{
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java
index 4e37f54..a79f8e7 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java
@@ -162,6 +162,14 @@ public class BaseWebSocketBehavior extends Behavior
 		Integer securePort = getSecurePort(webSocketSettings);
 		variables.put("securePort", securePort);
 
+		variables.put("useHeartBeat", isUseHeartBeat(webSocketSettings));
+
+		variables.put("reconnectOnFailure", isReconnectOnFailure(webSocketSettings));
+
+		variables.put("heartBeatPace", getHeartBeatPace(webSocketSettings));
+
+		variables.put("networkLatencyThreshold", getNetworkLatencyThreshold(webSocketSettings));
+
 		CharSequence contextPath = getContextPath(webSocketSettings);
 		Args.notNull(contextPath, "contextPath");
 		variables.put("contextPath", contextPath);
@@ -178,6 +186,26 @@ public class BaseWebSocketBehavior extends Behavior
 		return variables;
 	}
 
+	protected boolean isUseHeartBeat(WebSocketSettings webSocketSettings)
+	{
+		return webSocketSettings.isUseHeartBeat();
+	}
+
+	protected boolean isReconnectOnFailure(WebSocketSettings webSocketSettings)
+	{
+		return webSocketSettings.isReconnectOnFailure();
+	}
+
+	protected long getHeartBeatPace(WebSocketSettings webSocketSettings)
+	{
+		return webSocketSettings.getHeartBeatPace();
+	}
+
+	protected long getNetworkLatencyThreshold(WebSocketSettings webSocketSettings)
+	{
+		return webSocketSettings.getNetworkLatencyThreshold();
+	}
+
 	protected Integer getPort(WebSocketSettings webSocketSettings)
 	{
 		return webSocketSettings.getPort();
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
index 82cf6e9..54f6027 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
@@ -17,6 +17,7 @@
 package org.apache.wicket.protocol.ws.api;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.wicket.Application;
 import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
@@ -30,6 +31,53 @@ import org.apache.wicket.protocol.ws.api.registry.IKey;
 public interface IWebSocketConnection
 {
 	/**
+	 *
+	 * @return the last time connection was checked to be alive (checked via ping/pong mechanism)
+	 */
+	long getLastTimeAlive();
+
+	/**
+	 *
+	 * @return {@code true} if connection is alive (checked via ping/pong mechanism). {@code false} otherwise
+	 */
+	boolean isAlive();
+
+	/**
+	 * Allows setting whether connection is alive or not.
+	 *
+	 * @param alive Alive
+	 */
+	void setAlive(boolean alive);
+
+	/**
+	 * Terminates the connection.
+	 *
+	 * @param reason The reason to terminate connection.
+	 */
+	void terminate(String reason);
+
+	/**
+	 * Sends a ping message to the server.
+	 *
+	 * @throws IOException if something went wrong with ping
+	 */
+	void ping() throws IOException;
+
+
+	/**
+	 * Allows the developer to send an unsolicited Pong message containing the given application data in order to serve
+     * as a unidirectional heartbeat for the session.
+	 */
+	void pong() throws IOException;
+
+	/**
+	 * Called when remote peer answers to ping with pong message.
+	 *
+	 * @param byteBuffer Contains application specific content
+	 */
+	void onPong(ByteBuffer byteBuffer);
+
+	/**
 	 * @return {@code true} when the underlying native web socket
 	 *      connection is still open.
 	 */
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java
index 74b34d1..5aa9391 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java
@@ -16,8 +16,11 @@
  */
 package org.apache.wicket.protocol.ws.api;
 
+import java.nio.ByteBuffer;
+
 import org.apache.wicket.protocol.http.WebApplication;
 import org.apache.wicket.protocol.ws.WebSocketSettings;
+import org.apache.wicket.protocol.ws.api.registry.IKey;
 
 /**
  * Processes web socket messages.
@@ -41,6 +44,8 @@ public interface IWebSocketProcessor
 		WebSocketSettings.Holder.get(application).getSocketSessionConfigurer().configureSession(webSocketSession);
 	}
 
+	void onPong(ByteBuffer byteBuffer);
+
 	/**
 	 * Called when a text message arrives from the client
 	 *
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js
index 93b7386..55086dc 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js
@@ -92,8 +92,15 @@
 
 				self.ws.onopen = function (evt) {
 					Wicket.Event.publish(topics.Opened, evt);
+					if (Wicket.WebSocket.useHeartBeat) {
+						self.heartbeat();
+					}
 				};
 
+				if (WWS.useHeartBeat) {
+					self.ws.on("ping", self.heartbeat)
+				}
+
 				self.ws.onmessage = function (event) {
 
 					var message = event.data;
@@ -139,6 +146,24 @@
 			}
 		},
 
+		heartbeat: function () {
+			clearTimeout(this.pingTimeout);
+
+			// Use `WebSocket#terminate()`, which immediately destroys the connection,
+			// instead of `WebSocket#close()`, which waits for the close timer.
+			// Delay should be equal to the interval at which your server
+			// sends out pings plus a conservative assumption of the latency.
+			this.pingTimeout = setTimeout(() => {
+					this.ws.terminate();
+					// try to reconnect to server
+					if (Wicket.WebSocket.reconnectOnFailure)
+					{
+						Wicket.WebSocket.INSTANCE = null;
+						Wicket.WebSocket.createDefaultConnection();
+					}
+				}, Wicket.WebSocket.heartBeatPace + Wicket.WebSocket.networkLatencyThreshold);
+		},
+
 		send: function (text) {
 			if (this.ws && text) {
 				Wicket.Log.info('[WebSocket.send] Sending: ' + text);
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl
index 98e9bc8..8fa142a 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl
@@ -4,7 +4,10 @@
 	if (typeof(Wicket.WebSocket.appName) === "undefined") {
 		jQuery.extend(Wicket.WebSocket, { pageId: ${pageId},  context: '${context}', resourceName: '${resourceName}', connectionToken: '${connectionToken}',
 			baseUrl: '${baseUrl}', contextPath: '${contextPath}', appName: '${applicationName}',
-			port: ${port}, securePort: ${securePort}, filterPrefix: '${filterPrefix}', sessionId: '${sessionId}' });
+			port: ${port}, securePort: ${securePort}, filterPrefix: '${filterPrefix}', sessionId: '${sessionId}',
+			useHeartBeat: ${useHeartBeat}, reconnectOnFailure: ${reconnectOnFailure},
+			heartBeatPace: ${heartBeatPace}, networkLatencyThreshold: ${networkLatencyThreshold});
+
 		Wicket.WebSocket.createDefaultConnection();
 	}
 })();
diff --git a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java
index b9f6419..4d690ec 100644
--- a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java
@@ -18,6 +18,8 @@ package org.apache.wicket.protocol.ws.javax;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import jakarta.websocket.CloseReason;
 import jakarta.websocket.Session;
@@ -40,6 +42,9 @@ public class JavaxWebSocketConnection extends AbstractWebSocketConnection
 
 	private final Session session;
 
+	private final AtomicBoolean alive = new AtomicBoolean(false);
+	private final AtomicLong lastTimeAlive = new AtomicLong(System.currentTimeMillis());
+
 	/**
 	 * Constructor.
 	 *
@@ -50,6 +55,57 @@ public class JavaxWebSocketConnection extends AbstractWebSocketConnection
 	{
 		super(webSocketProcessor);
 		this.session = Args.notNull(session, "session");
+		setAlive(true);
+	}
+
+	@Override
+	public long getLastTimeAlive()
+	{
+		return lastTimeAlive.get();
+	}
+
+	@Override
+	public boolean isAlive()
+	{
+		return alive.get();
+	}
+
+	@Override
+	public void setAlive(boolean alive)
+	{
+		if (alive)
+		{
+			// is connection if alive we set the timestamp.
+			this.lastTimeAlive.set(System.currentTimeMillis());
+		}
+		this.alive.set(alive);
+	}
+
+	@Override
+	public synchronized void terminate(String reason)
+	{
+		close(CloseReason.CloseCodes.GOING_AWAY.getCode(), reason);
+	}
+
+	@Override
+	public void ping() throws IOException
+	{
+		ByteBuffer buf = ByteBuffer.wrap(new byte[]{0xA});
+		session.getBasicRemote().sendPing(buf);
+	}
+
+	@Override
+	public void pong() throws IOException
+	{
+		ByteBuffer buf = ByteBuffer.wrap(new byte[]{0xA});
+		session.getBasicRemote().sendPong(buf);
+	}
+
+	@Override
+	public void onPong(ByteBuffer byteBuffer)
+	{
+		// we received pong answer from remote peer. Thus, connection is alive
+		setAlive(true);
 	}
 
 	@Override
diff --git a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java
index c393517..cd3adf8 100644
--- a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java
@@ -20,12 +20,15 @@ import java.nio.ByteBuffer;
 
 import jakarta.websocket.EndpointConfig;
 import jakarta.websocket.MessageHandler;
+import jakarta.websocket.PongMessage;
 import jakarta.websocket.Session;
 
 import org.apache.wicket.protocol.http.WebApplication;
 import org.apache.wicket.protocol.ws.WebSocketSettings;
 import org.apache.wicket.protocol.ws.api.AbstractWebSocketProcessor;
 import org.apache.wicket.protocol.ws.api.IWebSocketSession;
+import org.apache.wicket.protocol.ws.api.message.TextMessage;
+import org.apache.wicket.protocol.ws.api.registry.IKey;
 
 /**
  * An {@link org.apache.wicket.protocol.ws.api.IWebSocketProcessor processor} that integrates with
@@ -53,6 +56,7 @@ public class JavaxWebSocketProcessor extends AbstractWebSocketProcessor
 
 		session.addMessageHandler(new StringMessageHandler());
 		session.addMessageHandler(new BinaryMessageHandler());
+		session.addMessageHandler(new PongMessageMessageHandler());
 	}
 
 	@Override
@@ -60,6 +64,18 @@ public class JavaxWebSocketProcessor extends AbstractWebSocketProcessor
 	{
 	}
 
+
+	private class PongMessageMessageHandler implements MessageHandler.Whole<PongMessage>
+	{
+		@Override
+		public void onMessage(PongMessage message)
+		{
+			IKey key = getRegistryKey();
+			JavaxWebSocketProcessor.this.onPong(message.getApplicationData());
+		}
+	}
+
+
 	private class StringMessageHandler implements MessageHandler.Whole<String>
 	{
 		@Override

[wicket] 02/02: [WICKET-6954] initial work

Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

reiern70 pushed a commit to branch improvement/reiern70/WICKET-6954-ping-pong-keep-alive
in repository https://gitbox.apache.org/repos/asf/wicket.git

commit d705894bd2b3956e3abf6786076fd4de1ee8f5a3
Author: reiern70 <re...@gmail.com>
AuthorDate: Sat Feb 5 10:46:39 2022 -0500

    [WICKET-6954] initial work
---
 .../wicket/protocol/ws/WebSocketSettings.java      | 18 ++++++-
 .../ws/api/AbstractWebSocketProcessor.java         | 19 ++++++--
 .../protocol/ws/api/IWebSocketProcessor.java       |  5 ++
 .../ws/api/message/PongMessageMessage.java         | 55 ++++++++++++++++++++++
 .../protocol/ws/javax/JavaxWebSocketProcessor.java |  1 -
 5 files changed, 92 insertions(+), 6 deletions(-)

diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
index dd59f49..27237a0 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
@@ -94,6 +94,7 @@ public class WebSocketSettings
 	private final AtomicInteger port = new AtomicInteger();
 	private final AtomicInteger securePort = new AtomicInteger();
 
+
 	/**
 	 * Holds this WebSocketSettings in the Application's metadata.
 	 * This way wicket-core module doesn't have reference to wicket-native-websocket.
@@ -177,7 +178,12 @@ public class WebSocketSettings
 	 */
 	private Executor heartBeatsExecutor = new HeartBeatsExecutor();
 
-	// internal heartbeats timer.
+	/**
+	 * Whether messages are broadcast when receiving pong messages
+	 */
+	private boolean sendMessagesOnPong = false;
+
+	// internal heartbeat's timer.
 	private Timer heartBeatsTimer;
 
 	/**
@@ -589,6 +595,16 @@ public class WebSocketSettings
 		}
 	}
 
+	public void setSendMessagesOnPong(boolean sendMessagesOnPong)
+	{
+		this.sendMessagesOnPong = sendMessagesOnPong;
+	}
+
+	public boolean isSendMessagesOnPong()
+	{
+		return sendMessagesOnPong;
+	}
+
 	private void sendHeartBeats(String application)
 	{
 		sendHeartBeats(Application.get(application));
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
index 0290544..d5aef01 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
@@ -47,6 +47,7 @@ import org.apache.wicket.protocol.ws.api.message.ConnectedMessage;
 import org.apache.wicket.protocol.ws.api.message.ErrorMessage;
 import org.apache.wicket.protocol.ws.api.message.IWebSocketMessage;
 import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
+import org.apache.wicket.protocol.ws.api.message.PongMessageMessage;
 import org.apache.wicket.protocol.ws.api.message.TextMessage;
 import org.apache.wicket.protocol.ws.api.registry.IKey;
 import org.apache.wicket.protocol.ws.api.registry.IWebSocketConnectionRegistry;
@@ -149,12 +150,22 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
 		this.connectionFilter = webSocketSettings.getConnectionFilter();
 	}
 
-
 	@Override
-	public void onPong(ByteBuffer byteBuffer) {
+	public void onPong(ByteBuffer byteBuffer)
+	{
 		IKey key = getRegistryKey();
-		IWebSocketConnection webSocketConnection = connectionRegistry.getConnection(getApplication(), getSessionId(), key);
-		webSocketConnection.onPong(byteBuffer);
+		WebApplication application = getApplication();
+		String sessionId = getSessionId();
+		IWebSocketConnection webSocketConnection = connectionRegistry.getConnection(application, sessionId, key);
+		if (webSocketConnection != null)
+		{
+			webSocketConnection.onPong(byteBuffer);
+			if (webSocketSettings.isSendMessagesOnPong())
+			{
+				// if we want to deliver messages on pong deliver them
+				broadcastMessage(new PongMessageMessage(application, sessionId, key, byteBuffer));
+			}
+		}
 	}
 
 	@Override
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java
index 5aa9391..708002f 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java
@@ -44,6 +44,11 @@ public interface IWebSocketProcessor
 		WebSocketSettings.Holder.get(application).getSocketSessionConfigurer().configureSession(webSocketSession);
 	}
 
+	/**
+	 * Called when remote peer answers to ping with pong message.
+	 *
+	 * @param byteBuffer Contains application specific content
+	 */
 	void onPong(ByteBuffer byteBuffer);
 
 	/**
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/PongMessageMessage.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/PongMessageMessage.java
new file mode 100644
index 0000000..0708f82
--- /dev/null
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/PongMessageMessage.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wicket.protocol.ws.api.message;
+
+import java.nio.ByteBuffer;
+
+import org.apache.wicket.Application;
+import org.apache.wicket.protocol.ws.api.registry.IKey;
+import org.apache.wicket.util.lang.Args;
+
+/**
+ * A {@link IWebSocketMessage message} with Pong message data
+ *
+ * @since 6.0
+ */
+public class PongMessageMessage extends AbstractClientMessage
+{
+	private final ByteBuffer byteBuffer;
+
+	/**
+	 *
+	 * @param application
+	 *      the Wicket application
+	 * @param sessionId
+	 *      the id of the http session
+	 * @param key
+	 *      the page id or resource name
+	 * @param byteBuffer
+	 *      the message sent from the client
+	 */
+	public PongMessageMessage(Application application, String sessionId, IKey key, ByteBuffer byteBuffer)
+	{
+		super(application, sessionId, key);
+		this.byteBuffer = Args.notNull(byteBuffer, "byteBuffer");
+	}
+
+	public ByteBuffer getByteBuffer()
+	{
+		return byteBuffer;
+	}
+}
diff --git a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java
index cd3adf8..85f78ea 100644
--- a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java
@@ -70,7 +70,6 @@ public class JavaxWebSocketProcessor extends AbstractWebSocketProcessor
 		@Override
 		public void onMessage(PongMessage message)
 		{
-			IKey key = getRegistryKey();
 			JavaxWebSocketProcessor.this.onPong(message.getApplicationData());
 		}
 	}