You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wicket.apache.org by re...@apache.org on 2022/02/05 15:46:58 UTC

[wicket] 01/02: [WICKET-6954] initial work

This is an automated email from the ASF dual-hosted git repository.

reiern70 pushed a commit to branch improvement/reiern70/WICKET-6954-ping-pong-keep-alive
in repository https://gitbox.apache.org/repos/asf/wicket.git

commit 96e0e0170f8fb2881fbd11c39dcaf45d5eaead31
Author: reiern70 <re...@gmail.com>
AuthorDate: Sat Feb 5 10:26:23 2022 -0500

    [WICKET-6954] initial work
---
 .../wicket/protocol/ws/WebSocketSettings.java      | 229 +++++++++++++++++++++
 .../ws/api/AbstractWebSocketProcessor.java         |  10 +
 .../protocol/ws/api/BaseWebSocketBehavior.java     |  28 +++
 .../protocol/ws/api/IWebSocketConnection.java      |  48 +++++
 .../protocol/ws/api/IWebSocketProcessor.java       |   5 +
 .../ws/api/res/js/wicket-websocket-jquery.js       |  25 +++
 .../ws/api/res/js/wicket-websocket-setup.js.tmpl   |   5 +-
 .../ws/javax/JavaxWebSocketConnection.java         |  56 +++++
 .../protocol/ws/javax/JavaxWebSocketProcessor.java |  16 ++
 9 files changed, 421 insertions(+), 1 deletion(-)

diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
index 7aca394..dd59f49 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
@@ -41,6 +41,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import jakarta.servlet.http.HttpServletRequest;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -140,6 +146,41 @@ public class WebSocketSettings
 	private IWebSocketConnectionFilter connectionFilter;
 
 	/**
+	 * Boolean used to determine if ping-pong heart beat will be used.
+	 */
+	private boolean useHeartBeat = false;
+
+	/**
+	 * Flag used to determine if client will try to reconnect in case of ping-pong failure
+	 */
+	private boolean reconnectOnFailure = false;
+
+	/**
+	 * In case ping or remote client immediately fails, this determines how many times ping
+	 * will be retried before connection it terminated.
+	 */
+	private int maxPingRetries = 3;
+
+	/**
+	 * Periodicity by which the heartbeat timer will kick.
+	 */
+	private long heartBeatPace = Duration.ofSeconds(15).toMillis();
+
+	/**
+	 * The max threshold assumed for network latency.
+	 */
+	private long networkLatencyThreshold = Duration.ofSeconds(2).toMillis();
+
+	/**
+	 * The executor that handles delivering pings to remote peers and checking if peers have
+	 * correctly ponged back (and terminates connections in case not).
+	 */
+	private Executor heartBeatsExecutor = new HeartBeatsExecutor();
+
+	// internal heartbeats timer.
+	private Timer heartBeatsTimer;
+
+	/**
 	 * A {@link org.apache.wicket.protocol.ws.api.IWebSocketSessionConfigurer} that allows to configure
 	 * {@link org.apache.wicket.protocol.ws.api.IWebSocketSession}s.
 	 */
@@ -424,6 +465,167 @@ public class WebSocketSettings
 		return securePort.get();
 	}
 
+	public void setUseHeartBeat(boolean useHeartBeat)
+	{
+		this.useHeartBeat = useHeartBeat;
+	}
+
+	public boolean isUseHeartBeat()
+	{
+		return useHeartBeat;
+	}
+
+	public void setReconnectOnFailure(boolean reconnectOnFailure)
+	{
+		this.reconnectOnFailure = reconnectOnFailure;
+	}
+
+	public boolean isReconnectOnFailure()
+	{
+		return reconnectOnFailure;
+	}
+
+	public long getHeartBeatPace()
+	{
+		return heartBeatPace;
+	}
+
+	public void setHeartBeatPace(long heartBeatPace)
+	{
+		this.heartBeatPace = heartBeatPace;
+	}
+
+	public void setHeartBeatPace(Duration heartBeatPace)
+	{
+		this.heartBeatPace = heartBeatPace.toMillis();
+	}
+
+	public long getNetworkLatencyThreshold()
+	{
+		return networkLatencyThreshold;
+	}
+
+	public void setNetworkLatencyThreshold(long networkLatencyThreshold)
+	{
+		this.networkLatencyThreshold = networkLatencyThreshold;
+	}
+
+	public void setNetworkLatencyThreshold(Duration networkLatencyThreshold)
+	{
+		this.networkLatencyThreshold = networkLatencyThreshold.toMillis();
+	}
+
+	public void setHeartBeatsExecutor(Executor heartBeatsExecutor)
+	{
+		this.heartBeatsExecutor = heartBeatsExecutor;
+	}
+
+	public void setMaxPingRetries(int maxPingRetries)
+	{
+		this.maxPingRetries = maxPingRetries;
+	}
+
+
+	public void startHeartBeatTimer(String application)
+	{
+		if (useHeartBeat ==  false)
+		{
+			LOG.debug("useClientSideHeartBeat is set to false. Thus we won't start heartbeat's sending thread");
+		}
+
+		if (LOG.isDebugEnabled())
+		{
+			LOG.debug("Starting thread pushing hart beats");
+		}
+
+		TimerTask timerTask = new TimerTask()
+		{
+			@Override
+			public void run()
+			{
+				try
+				{
+					sendHeartBeats(application);
+				}
+				catch (Exception e)
+				{
+					LOG.error("Error while checking sessions", e);
+				}
+			}
+		};
+
+		this.heartBeatsTimer = new Timer(true);
+		this.heartBeatsTimer.schedule(timerTask, new Date(), Duration.ofSeconds(heartBeatPace).toMillis());
+	}
+
+	public void stopHeartBeatTimer()
+	{
+		if (LOG.isDebugEnabled())
+		{
+			LOG.debug("Stopping thread pushing hart beats");
+		}
+		if (this.heartBeatsTimer != null)
+		{
+			this.heartBeatsTimer.cancel();
+		}
+	}
+
+	private void sendHeartBeats(Application application)
+	{
+		for (IWebSocketConnection connection: connectionRegistry.getConnections(application))
+		{
+			// connection didn't received the PONG from peer terminate it
+			if (connection.isAlive() == false)
+			{
+				if (connection.getLastTimeAlive() - System.currentTimeMillis() > (heartBeatPace + networkLatencyThreshold))
+				{
+					heartBeatsExecutor.run(() -> terminateConnection(connection));
+				}
+			}
+			else
+			{
+				heartBeatsExecutor.run(() -> ping(connection, maxPingRetries));
+			}
+		}
+	}
+
+	private void sendHeartBeats(String application)
+	{
+		sendHeartBeats(Application.get(application));
+	}
+
+	private void ping(IWebSocketConnection connection, final int pingRetryCounter)
+	{
+		try
+		{
+			connection.ping();
+		}
+		catch (IOException e)
+		{
+			if (pingRetryCounter == 0)
+			{
+				// ping failed enough times kill connection
+				terminateConnection(connection);
+			}
+			else
+			{
+				ping(connection, pingRetryCounter - 1);
+			}
+		}
+	}
+
+	private void terminateConnection(IWebSocketConnection connection)
+	{
+		connection.setAlive(false);
+		if (LOG.isDebugEnabled())
+		{
+			LOG.debug("Terminating connection with ID {} because ping of remote peer failed {} times",
+					connection.getKey(), maxPingRetries);
+		}
+		connection.terminate("Failed to ping remote peer");
+		connectionRegistry.removeConnection(connection.getApplication(), connection.getSessionId(), connection.getKey());
+	}
+
 	/**
 	 * Simple executor that runs the tasks in the caller thread.
 	 */
@@ -487,6 +689,33 @@ public class WebSocketSettings
 		}
 	}
 
+	public static class HeartBeatsExecutor implements Executor
+	{
+
+		private final java.util.concurrent.Executor executor;
+
+
+		public HeartBeatsExecutor()
+		{
+			this(new ThreadPoolExecutor(1, 8,
+					60L, TimeUnit.SECONDS,
+					new SynchronousQueue<>(),
+					new ThreadFactory()));
+		}
+
+		public HeartBeatsExecutor(java.util.concurrent.Executor executor)
+		{
+			this.executor = executor;
+
+		}
+
+		@Override
+		public void run(final Runnable command)
+		{
+			executor.execute(command);
+		}
+	}
+
 	public static class ThreadFactory implements java.util.concurrent.ThreadFactory
 	{
 		private final AtomicInteger counter = new AtomicInteger();
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
index 59e49d1..0290544 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
@@ -16,6 +16,8 @@
  */
 package org.apache.wicket.protocol.ws.api;
 
+import java.nio.ByteBuffer;
+
 import jakarta.servlet.http.HttpServletRequest;
 import jakarta.servlet.http.HttpSession;
 
@@ -147,6 +149,14 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
 		this.connectionFilter = webSocketSettings.getConnectionFilter();
 	}
 
+
+	@Override
+	public void onPong(ByteBuffer byteBuffer) {
+		IKey key = getRegistryKey();
+		IWebSocketConnection webSocketConnection = connectionRegistry.getConnection(getApplication(), getSessionId(), key);
+		webSocketConnection.onPong(byteBuffer);
+	}
+
 	@Override
 	public void onMessage(final String message)
 	{
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java
index 4e37f54..a79f8e7 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java
@@ -162,6 +162,14 @@ public class BaseWebSocketBehavior extends Behavior
 		Integer securePort = getSecurePort(webSocketSettings);
 		variables.put("securePort", securePort);
 
+		variables.put("useHeartBeat", isUseHeartBeat(webSocketSettings));
+
+		variables.put("reconnectOnFailure", isReconnectOnFailure(webSocketSettings));
+
+		variables.put("heartBeatPace", getHeartBeatPace(webSocketSettings));
+
+		variables.put("networkLatencyThreshold", getNetworkLatencyThreshold(webSocketSettings));
+
 		CharSequence contextPath = getContextPath(webSocketSettings);
 		Args.notNull(contextPath, "contextPath");
 		variables.put("contextPath", contextPath);
@@ -178,6 +186,26 @@ public class BaseWebSocketBehavior extends Behavior
 		return variables;
 	}
 
+	protected boolean isUseHeartBeat(WebSocketSettings webSocketSettings)
+	{
+		return webSocketSettings.isUseHeartBeat();
+	}
+
+	protected boolean isReconnectOnFailure(WebSocketSettings webSocketSettings)
+	{
+		return webSocketSettings.isReconnectOnFailure();
+	}
+
+	protected long getHeartBeatPace(WebSocketSettings webSocketSettings)
+	{
+		return webSocketSettings.getHeartBeatPace();
+	}
+
+	protected long getNetworkLatencyThreshold(WebSocketSettings webSocketSettings)
+	{
+		return webSocketSettings.getNetworkLatencyThreshold();
+	}
+
 	protected Integer getPort(WebSocketSettings webSocketSettings)
 	{
 		return webSocketSettings.getPort();
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
index 82cf6e9..54f6027 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
@@ -17,6 +17,7 @@
 package org.apache.wicket.protocol.ws.api;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.wicket.Application;
 import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
@@ -30,6 +31,53 @@ import org.apache.wicket.protocol.ws.api.registry.IKey;
 public interface IWebSocketConnection
 {
 	/**
+	 *
+	 * @return the last time connection was checked to be alive (checked via ping/pong mechanism)
+	 */
+	long getLastTimeAlive();
+
+	/**
+	 *
+	 * @return {@code true} if connection is alive (checked via ping/pong mechanism). {@code false} otherwise
+	 */
+	boolean isAlive();
+
+	/**
+	 * Allows setting whether connection is alive or not.
+	 *
+	 * @param alive Alive
+	 */
+	void setAlive(boolean alive);
+
+	/**
+	 * Terminates the connection.
+	 *
+	 * @param reason The reason to terminate connection.
+	 */
+	void terminate(String reason);
+
+	/**
+	 * Sends a ping message to the server.
+	 *
+	 * @throws IOException if something went wrong with ping
+	 */
+	void ping() throws IOException;
+
+
+	/**
+	 * Allows the developer to send an unsolicited Pong message containing the given application data in order to serve
+     * as a unidirectional heartbeat for the session.
+	 */
+	void pong() throws IOException;
+
+	/**
+	 * Called when remote peer answers to ping with pong message.
+	 *
+	 * @param byteBuffer Contains application specific content
+	 */
+	void onPong(ByteBuffer byteBuffer);
+
+	/**
 	 * @return {@code true} when the underlying native web socket
 	 *      connection is still open.
 	 */
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java
index 74b34d1..5aa9391 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java
@@ -16,8 +16,11 @@
  */
 package org.apache.wicket.protocol.ws.api;
 
+import java.nio.ByteBuffer;
+
 import org.apache.wicket.protocol.http.WebApplication;
 import org.apache.wicket.protocol.ws.WebSocketSettings;
+import org.apache.wicket.protocol.ws.api.registry.IKey;
 
 /**
  * Processes web socket messages.
@@ -41,6 +44,8 @@ public interface IWebSocketProcessor
 		WebSocketSettings.Holder.get(application).getSocketSessionConfigurer().configureSession(webSocketSession);
 	}
 
+	void onPong(ByteBuffer byteBuffer);
+
 	/**
 	 * Called when a text message arrives from the client
 	 *
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js
index 93b7386..55086dc 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js
@@ -92,8 +92,15 @@
 
 				self.ws.onopen = function (evt) {
 					Wicket.Event.publish(topics.Opened, evt);
+					if (Wicket.WebSocket.useHeartBeat) {
+						self.heartbeat();
+					}
 				};
 
+				if (WWS.useHeartBeat) {
+					self.ws.on("ping", self.heartbeat)
+				}
+
 				self.ws.onmessage = function (event) {
 
 					var message = event.data;
@@ -139,6 +146,24 @@
 			}
 		},
 
+		heartbeat: function () {
+			clearTimeout(this.pingTimeout);
+
+			// Use `WebSocket#terminate()`, which immediately destroys the connection,
+			// instead of `WebSocket#close()`, which waits for the close timer.
+			// Delay should be equal to the interval at which your server
+			// sends out pings plus a conservative assumption of the latency.
+			this.pingTimeout = setTimeout(() => {
+					this.ws.terminate();
+					// try to reconnect to server
+					if (Wicket.WebSocket.reconnectOnFailure)
+					{
+						Wicket.WebSocket.INSTANCE = null;
+						Wicket.WebSocket.createDefaultConnection();
+					}
+				}, Wicket.WebSocket.heartBeatPace + Wicket.WebSocket.networkLatencyThreshold);
+		},
+
 		send: function (text) {
 			if (this.ws && text) {
 				Wicket.Log.info('[WebSocket.send] Sending: ' + text);
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl
index 98e9bc8..8fa142a 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl
@@ -4,7 +4,10 @@
 	if (typeof(Wicket.WebSocket.appName) === "undefined") {
 		jQuery.extend(Wicket.WebSocket, { pageId: ${pageId},  context: '${context}', resourceName: '${resourceName}', connectionToken: '${connectionToken}',
 			baseUrl: '${baseUrl}', contextPath: '${contextPath}', appName: '${applicationName}',
-			port: ${port}, securePort: ${securePort}, filterPrefix: '${filterPrefix}', sessionId: '${sessionId}' });
+			port: ${port}, securePort: ${securePort}, filterPrefix: '${filterPrefix}', sessionId: '${sessionId}',
+			useHeartBeat: ${useHeartBeat}, reconnectOnFailure: ${reconnectOnFailure},
+			heartBeatPace: ${heartBeatPace}, networkLatencyThreshold: ${networkLatencyThreshold});
+
 		Wicket.WebSocket.createDefaultConnection();
 	}
 })();
diff --git a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java
index b9f6419..4d690ec 100644
--- a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java
@@ -18,6 +18,8 @@ package org.apache.wicket.protocol.ws.javax;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import jakarta.websocket.CloseReason;
 import jakarta.websocket.Session;
@@ -40,6 +42,9 @@ public class JavaxWebSocketConnection extends AbstractWebSocketConnection
 
 	private final Session session;
 
+	private final AtomicBoolean alive = new AtomicBoolean(false);
+	private final AtomicLong lastTimeAlive = new AtomicLong(System.currentTimeMillis());
+
 	/**
 	 * Constructor.
 	 *
@@ -50,6 +55,57 @@ public class JavaxWebSocketConnection extends AbstractWebSocketConnection
 	{
 		super(webSocketProcessor);
 		this.session = Args.notNull(session, "session");
+		setAlive(true);
+	}
+
+	@Override
+	public long getLastTimeAlive()
+	{
+		return lastTimeAlive.get();
+	}
+
+	@Override
+	public boolean isAlive()
+	{
+		return alive.get();
+	}
+
+	@Override
+	public void setAlive(boolean alive)
+	{
+		if (alive)
+		{
+			// is connection if alive we set the timestamp.
+			this.lastTimeAlive.set(System.currentTimeMillis());
+		}
+		this.alive.set(alive);
+	}
+
+	@Override
+	public synchronized void terminate(String reason)
+	{
+		close(CloseReason.CloseCodes.GOING_AWAY.getCode(), reason);
+	}
+
+	@Override
+	public void ping() throws IOException
+	{
+		ByteBuffer buf = ByteBuffer.wrap(new byte[]{0xA});
+		session.getBasicRemote().sendPing(buf);
+	}
+
+	@Override
+	public void pong() throws IOException
+	{
+		ByteBuffer buf = ByteBuffer.wrap(new byte[]{0xA});
+		session.getBasicRemote().sendPong(buf);
+	}
+
+	@Override
+	public void onPong(ByteBuffer byteBuffer)
+	{
+		// we received pong answer from remote peer. Thus, connection is alive
+		setAlive(true);
 	}
 
 	@Override
diff --git a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java
index c393517..cd3adf8 100644
--- a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java
@@ -20,12 +20,15 @@ import java.nio.ByteBuffer;
 
 import jakarta.websocket.EndpointConfig;
 import jakarta.websocket.MessageHandler;
+import jakarta.websocket.PongMessage;
 import jakarta.websocket.Session;
 
 import org.apache.wicket.protocol.http.WebApplication;
 import org.apache.wicket.protocol.ws.WebSocketSettings;
 import org.apache.wicket.protocol.ws.api.AbstractWebSocketProcessor;
 import org.apache.wicket.protocol.ws.api.IWebSocketSession;
+import org.apache.wicket.protocol.ws.api.message.TextMessage;
+import org.apache.wicket.protocol.ws.api.registry.IKey;
 
 /**
  * An {@link org.apache.wicket.protocol.ws.api.IWebSocketProcessor processor} that integrates with
@@ -53,6 +56,7 @@ public class JavaxWebSocketProcessor extends AbstractWebSocketProcessor
 
 		session.addMessageHandler(new StringMessageHandler());
 		session.addMessageHandler(new BinaryMessageHandler());
+		session.addMessageHandler(new PongMessageMessageHandler());
 	}
 
 	@Override
@@ -60,6 +64,18 @@ public class JavaxWebSocketProcessor extends AbstractWebSocketProcessor
 	{
 	}
 
+
+	private class PongMessageMessageHandler implements MessageHandler.Whole<PongMessage>
+	{
+		@Override
+		public void onMessage(PongMessage message)
+		{
+			IKey key = getRegistryKey();
+			JavaxWebSocketProcessor.this.onPong(message.getApplicationData());
+		}
+	}
+
+
 	private class StringMessageHandler implements MessageHandler.Whole<String>
 	{
 		@Override