You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wicket.apache.org by re...@apache.org on 2022/02/07 03:03:30 UTC
[wicket] 01/01: [WICKET-6954] implement server side heart-beat + client reconnection in case of inactivity
This is an automated email from the ASF dual-hosted git repository.
reiern70 pushed a commit to branch improvement/reiern70/WICKET-6954-ping-pong-keep-alive-9-x
in repository https://gitbox.apache.org/repos/asf/wicket.git
commit 81e97270ad15923f922e8eaf971ef61e8afbeb18
Author: reiern70 <re...@gmail.com>
AuthorDate: Sat Feb 5 07:26:23 2022 -0500
[WICKET-6954] implement server side heart-beat + client reconnection in case of inactivity
---
.../examples/websocket/JSR356Application.java | 11 +-
.../websocket/WebSocketBehaviorDemoPage.java | 10 +-
.../wicket/protocol/ws/WebSocketSettings.java | 165 ++++++++++++++++++++-
.../ws/api/AbstractWebSocketProcessor.java | 30 +++-
.../protocol/ws/api/BaseWebSocketBehavior.java | 28 ++++
.../protocol/ws/api/IWebSocketConnection.java | 50 +++++++
.../protocol/ws/api/IWebSocketProcessor.java | 10 ++
.../protocol/ws/api/message/ConnectedMessage.java | 17 +++
...nnectedMessage.java => PongMessageMessage.java} | 29 +++-
.../ws/api/res/js/wicket-websocket-jquery.js | 32 ++++
.../ws/api/res/js/wicket-websocket-setup.js.tmpl | 5 +-
.../protocol/ws/timer/AbstractHeartBeatTimer.java | 92 ++++++++++++
.../ws/timer/HeartBeatWithReconnectTimer.java | 77 ++++++++++
.../protocol/ws/timer/PingPongHeartBeatTimer.java | 105 +++++++++++++
.../ws/util/tester/TestWebSocketConnection.java | 53 ++++++-
.../ws/util/tester/TestWebSocketProcessor.java | 18 ++-
.../protocol/ws/util/tester/WebSocketTester.java | 22 +++
.../util/tester/WebSocketTesterProcessorTest.java | 5 +
.../ws/javax/JavaxWebSocketConnection.java | 80 +++++++++-
.../protocol/ws/javax/JavaxWebSocketProcessor.java | 19 ++-
20 files changed, 834 insertions(+), 24 deletions(-)
diff --git a/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/JSR356Application.java b/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/JSR356Application.java
index 8bc6e9d..1a6462e 100644
--- a/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/JSR356Application.java
+++ b/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/JSR356Application.java
@@ -23,8 +23,8 @@ import org.apache.wicket.protocol.http.WebApplication;
import org.apache.wicket.protocol.https.HttpsConfig;
import org.apache.wicket.protocol.https.HttpsMapper;
import org.apache.wicket.protocol.ws.WebSocketSettings;
-import org.apache.wicket.protocol.ws.api.IWebSocketSession;
-import org.apache.wicket.protocol.ws.api.IWebSocketSessionConfigurer;
+import org.apache.wicket.protocol.ws.timer.HeartBeatWithReconnectTimer;
+import org.apache.wicket.protocol.ws.timer.PingPongHeartBeatTimer;
import org.apache.wicket.request.Request;
import org.apache.wicket.request.Response;
import org.slf4j.Logger;
@@ -43,6 +43,7 @@ public class JSR356Application extends WicketExampleApplication
private static final Logger LOGGER = LoggerFactory.getLogger(JSR356Application.class);
private ScheduledExecutorService scheduledExecutorService;
+ private HeartBeatWithReconnectTimer heartBeatWithReconnectTimer;
@Override
public Class<HomePage> getHomePage()
@@ -83,6 +84,10 @@ public class JSR356Application extends WicketExampleApplication
webSocketSettings.setSecurePort(8443);
}
+ webSocketSettings.setUseHeartBeat(true);
+ webSocketSettings.setReconnectOnFailure(true);
+ heartBeatWithReconnectTimer = new HeartBeatWithReconnectTimer(webSocketSettings);
+ heartBeatWithReconnectTimer.start(this);
// The websocket example loads JS from ajax.googleapis.com, which is not allowed by the CSP.
// This now serves as an example on how to disable CSP
getCspSettings().blocking().disabled();
@@ -97,7 +102,7 @@ public class JSR356Application extends WicketExampleApplication
@Override
protected void onDestroy() {
scheduledExecutorService.shutdownNow();
-
+ heartBeatWithReconnectTimer.stop();
super.onDestroy();
}
diff --git a/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/WebSocketBehaviorDemoPage.java b/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/WebSocketBehaviorDemoPage.java
index c6bff98..76d0781 100644
--- a/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/WebSocketBehaviorDemoPage.java
+++ b/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/WebSocketBehaviorDemoPage.java
@@ -92,12 +92,14 @@ public class WebSocketBehaviorDemoPage extends WicketExamplePage
private static final long serialVersionUID = 1L;
@Override
- protected void onConnect(ConnectedMessage message)
- {
+ protected void onConnect(ConnectedMessage message) {
super.onConnect(message);
- ScheduledExecutorService service = JSR356Application.get().getScheduledExecutorService();
- ChartUpdater.start(message, service);
+ if (!message.isReconnected())
+ {
+ ScheduledExecutorService service = JSR356Application.get().getScheduledExecutorService();
+ ChartUpdater.start(message, service);
+ }
}
});
add(downloadingContainer.setOutputMarkupPlaceholderTag(true).setVisible(false));
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
index df78a18..b0d579d 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
@@ -40,7 +40,7 @@ import org.apache.wicket.util.string.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.servlet.http.HttpServletRequest;
+import java.time.Duration;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import javax.servlet.http.HttpServletRequest;
+
/**
* Web Socket related settings.
*
@@ -88,6 +90,7 @@ public class WebSocketSettings
private final AtomicInteger port = new AtomicInteger();
private final AtomicInteger securePort = new AtomicInteger();
+
/**
* Holds this WebSocketSettings in the Application's metadata.
* This way wicket-core module doesn't have reference to wicket-native-websocket.
@@ -140,6 +143,49 @@ public class WebSocketSettings
private IWebSocketConnectionFilter connectionFilter;
/**
+ * Boolean used to determine if ping-pong heart beat will be used.
+ */
+ private boolean useHeartBeat = false;
+
+
+ /**
+ * Boolean used to determine if ping-pong heart beat will be used.
+ */
+ private boolean usePingPongHeartBeat = false;
+
+ /**
+ * Flag used to determine if client will try to reconnect in case of ping-pong failure
+ */
+ private boolean reconnectOnFailure = false;
+
+ /**
+ * In case ping or remote client immediately fails, this determines how many times ping
+ * will be retried before connection it terminated.
+ */
+ private int maxPingRetries = 3;
+
+ /**
+ * Periodicity by which the heartbeat timer will kick.
+ */
+ private long heartBeatPace = Duration.ofSeconds(15).toMillis();
+
+ /**
+ * The max threshold assumed for network latency.
+ */
+ private long networkLatencyThreshold = Duration.ofSeconds(2).toMillis();
+
+ /**
+ * The executor that handles delivering pings to remote peers and checking if peers have
+ * correctly ponged back (and terminates connections in case not).
+ */
+ private Executor heartBeatsExecutor = new HeartBeatsExecutor();
+
+ /**
+ * Whether messages are broadcast when receiving pong messages
+ */
+ private boolean sendMessagesOnPong = false;
+
+ /**
* A {@link org.apache.wicket.protocol.ws.api.IWebSocketSessionConfigurer} that allows to configure
* {@link org.apache.wicket.protocol.ws.api.IWebSocketSession}s.
*/
@@ -424,6 +470,96 @@ public class WebSocketSettings
return securePort.get();
}
+ public void setUseHeartBeat(boolean useHeartBeat)
+ {
+ this.useHeartBeat = useHeartBeat;
+ }
+
+ public boolean isUseHeartBeat()
+ {
+ return useHeartBeat;
+ }
+
+ public void setReconnectOnFailure(boolean reconnectOnFailure)
+ {
+ this.reconnectOnFailure = reconnectOnFailure;
+ }
+
+ public boolean isReconnectOnFailure()
+ {
+ return reconnectOnFailure;
+ }
+
+ public long getHeartBeatPace()
+ {
+ return heartBeatPace;
+ }
+
+ public void setHeartBeatPace(long heartBeatPace)
+ {
+ this.heartBeatPace = heartBeatPace;
+ }
+
+ public void setHeartBeatPace(Duration heartBeatPace)
+ {
+ this.heartBeatPace = heartBeatPace.toMillis();
+ }
+
+ public long getNetworkLatencyThreshold()
+ {
+ return networkLatencyThreshold;
+ }
+
+ public void setNetworkLatencyThreshold(long networkLatencyThreshold)
+ {
+ this.networkLatencyThreshold = networkLatencyThreshold;
+ }
+
+ public void setNetworkLatencyThreshold(Duration networkLatencyThreshold)
+ {
+ this.networkLatencyThreshold = networkLatencyThreshold.toMillis();
+ }
+
+ public void setHeartBeatsExecutor(Executor heartBeatsExecutor)
+ {
+ this.heartBeatsExecutor = heartBeatsExecutor;
+ }
+
+ public void setMaxPingRetries(int maxPingRetries)
+ {
+ this.maxPingRetries = maxPingRetries;
+ }
+
+ public void setSendMessagesOnPong(boolean sendMessagesOnPong)
+ {
+ this.sendMessagesOnPong = sendMessagesOnPong;
+ }
+
+ public boolean isSendMessagesOnPong()
+ {
+ return sendMessagesOnPong;
+ }
+
+ public Executor getHeartBeatsExecutor()
+ {
+ return heartBeatsExecutor;
+ }
+
+ public int getMaxPingRetries()
+ {
+ return maxPingRetries;
+ }
+
+ public void setUsePingPongHeartBeat(boolean usePingPongHeartBeat)
+ {
+ this.usePingPongHeartBeat = usePingPongHeartBeat;
+ }
+
+ public boolean isUsePingPongHeartBeat()
+ {
+ return usePingPongHeartBeat;
+ }
+
/**
* Simple executor that runs the tasks in the caller thread.
*/
@@ -487,6 +623,33 @@ public class WebSocketSettings
}
}
+ public static class HeartBeatsExecutor implements Executor
+ {
+
+ private final java.util.concurrent.Executor executor;
+
+
+ public HeartBeatsExecutor()
+ {
+ this(new ThreadPoolExecutor(1, 8,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new ThreadFactory()));
+ }
+
+ public HeartBeatsExecutor(java.util.concurrent.Executor executor)
+ {
+ this.executor = executor;
+
+ }
+
+ @Override
+ public void run(final Runnable command)
+ {
+ executor.execute(command);
+ }
+ }
+
public static class ThreadFactory implements java.util.concurrent.ThreadFactory
{
private final AtomicInteger counter = new AtomicInteger();
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
index bf5a6a8..4a18ce4 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
@@ -16,6 +16,8 @@
*/
package org.apache.wicket.protocol.ws.api;
+import java.nio.ByteBuffer;
+
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
@@ -45,6 +47,7 @@ import org.apache.wicket.protocol.ws.api.message.ConnectedMessage;
import org.apache.wicket.protocol.ws.api.message.ErrorMessage;
import org.apache.wicket.protocol.ws.api.message.IWebSocketMessage;
import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
+import org.apache.wicket.protocol.ws.api.message.PongMessageMessage;
import org.apache.wicket.protocol.ws.api.message.TextMessage;
import org.apache.wicket.protocol.ws.api.registry.IKey;
import org.apache.wicket.protocol.ws.api.registry.IWebSocketConnectionRegistry;
@@ -81,7 +84,6 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
* A pageId indicating that the endpoint is WebSocketResource
*/
static final int NO_PAGE_ID = -1;
- static final String NO_PAGE_CLASS = "_NO_PAGE";
private final WebRequest webRequest;
private final int pageId;
@@ -149,6 +151,24 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
}
@Override
+ public void onPong(ByteBuffer byteBuffer)
+ {
+ IKey key = getRegistryKey();
+ WebApplication application = getApplication();
+ String sessionId = getSessionId();
+ IWebSocketConnection webSocketConnection = connectionRegistry.getConnection(application, sessionId, key);
+ if (webSocketConnection != null)
+ {
+ webSocketConnection.onPong(byteBuffer);
+ if (webSocketSettings.isSendMessagesOnPong())
+ {
+ // if we want to deliver messages on pong deliver them
+ broadcastMessage(new PongMessageMessage(application, sessionId, key, byteBuffer));
+ }
+ }
+ }
+
+ @Override
public void onMessage(final String message)
{
broadcastMessage(new TextMessage(getApplication(), getSessionId(), getRegistryKey(), message));
@@ -168,7 +188,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
* the web socket connection to use to communicate with the client
* @see #onOpen(Object)
*/
- protected final void onConnect(final IWebSocketConnection connection) {
+ protected final void onConnect(final IWebSocketConnection connection, boolean reconnected) {
IKey key = getRegistryKey();
connectionRegistry.setConnection(getApplication(), getSessionId(), key, connection);
@@ -184,7 +204,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
}
}
- broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key), connection);
+ broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key, reconnected), connection);
}
@Override
@@ -202,7 +222,9 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
{
if (webSocketSettings.shouldNotifyOnErrorEvent(t)) {
IKey key = getRegistryKey();
- broadcastMessage(new ErrorMessage(getApplication(), getSessionId(), key, t));
+ IWebSocketConnection connection = connectionRegistry.getConnection(application, sessionId, key);
+ ErrorMessage message = new ErrorMessage(application, sessionId, key, t);
+ broadcastMessage(message, connection);
}
}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java
index 6bf3b4a..a386056 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java
@@ -161,6 +161,14 @@ public class BaseWebSocketBehavior extends Behavior
Integer securePort = getSecurePort(webSocketSettings);
variables.put("securePort", securePort);
+ variables.put("useHeartBeat", isUseHeartBeat(webSocketSettings));
+
+ variables.put("reconnectOnFailure", isReconnectOnFailure(webSocketSettings));
+
+ variables.put("heartBeatPace", getHeartBeatPace(webSocketSettings));
+
+ variables.put("networkLatencyThreshold", getNetworkLatencyThreshold(webSocketSettings));
+
CharSequence contextPath = getContextPath(webSocketSettings);
Args.notNull(contextPath, "contextPath");
variables.put("contextPath", contextPath);
@@ -177,6 +185,26 @@ public class BaseWebSocketBehavior extends Behavior
return variables;
}
+ protected boolean isUseHeartBeat(WebSocketSettings webSocketSettings)
+ {
+ return webSocketSettings.isUseHeartBeat();
+ }
+
+ protected boolean isReconnectOnFailure(WebSocketSettings webSocketSettings)
+ {
+ return webSocketSettings.isReconnectOnFailure();
+ }
+
+ protected long getHeartBeatPace(WebSocketSettings webSocketSettings)
+ {
+ return webSocketSettings.getHeartBeatPace();
+ }
+
+ protected long getNetworkLatencyThreshold(WebSocketSettings webSocketSettings)
+ {
+ return webSocketSettings.getNetworkLatencyThreshold();
+ }
+
protected Integer getPort(WebSocketSettings webSocketSettings)
{
return webSocketSettings.getPort();
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
index 82cf6e9..b09cb98 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
@@ -17,6 +17,7 @@
package org.apache.wicket.protocol.ws.api;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.wicket.Application;
import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
@@ -30,6 +31,53 @@ import org.apache.wicket.protocol.ws.api.registry.IKey;
public interface IWebSocketConnection
{
/**
+ *
+ * @return the last time connection was checked to be alive (checked via ping/pong mechanism)
+ */
+ long getLastTimeAlive();
+
+ /**
+ *
+ * @return {@code true} if connection is alive (checked via ping/pong mechanism). {@code false} otherwise
+ */
+ boolean isAlive();
+
+ /**
+ * Allows setting whether connection is alive or not.
+ *
+ * @param alive Alive
+ */
+ void setAlive(boolean alive);
+
+ /**
+ * Terminates the connection.
+ *
+ * @param reason The reason to terminate connection.
+ */
+ void terminate(String reason);
+
+ /**
+ * Sends a ping message to the server.
+ *
+ * @throws IOException if something went wrong with ping
+ */
+ void ping() throws IOException;
+
+
+ /**
+ * Allows the developer to send an unsolicited Pong message containing the given application data in order to serve
+ * as a unidirectional heartbeat for the session.
+ */
+ void pong() throws IOException;
+
+ /**
+ * Called when remote peer answers to ping with pong message.
+ *
+ * @param byteBuffer Contains application specific content
+ */
+ void onPong(ByteBuffer byteBuffer);
+
+ /**
* @return {@code true} when the underlying native web socket
* connection is still open.
*/
@@ -69,6 +117,8 @@ public interface IWebSocketConnection
*/
IWebSocketConnection sendMessage(byte[] message, int offset, int length) throws IOException;
+ IWebSocketConnection sendMessage(byte[] message) throws IOException;
+
/**
* Broadcasts a push message to the wicket page (and it's components) associated with this
* connection. The components can then send messages or component updates to client by adding
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java
index 74b34d1..708002f 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java
@@ -16,8 +16,11 @@
*/
package org.apache.wicket.protocol.ws.api;
+import java.nio.ByteBuffer;
+
import org.apache.wicket.protocol.http.WebApplication;
import org.apache.wicket.protocol.ws.WebSocketSettings;
+import org.apache.wicket.protocol.ws.api.registry.IKey;
/**
* Processes web socket messages.
@@ -42,6 +45,13 @@ public interface IWebSocketProcessor
}
/**
+ * Called when remote peer answers to ping with pong message.
+ *
+ * @param byteBuffer Contains application specific content
+ */
+ void onPong(ByteBuffer byteBuffer);
+
+ /**
* Called when a text message arrives from the client
*
* @param message
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/ConnectedMessage.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/ConnectedMessage.java
index 8e0485f..b69b8ae 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/ConnectedMessage.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/ConnectedMessage.java
@@ -27,9 +27,26 @@ import org.apache.wicket.protocol.ws.api.registry.IKey;
*/
public class ConnectedMessage extends AbstractClientMessage
{
+ private final boolean reconnected;
+
+ public ConnectedMessage(Application application, String sessionId, IKey key, boolean reconnected)
+ {
+ super(application, sessionId, key);
+ this.reconnected = reconnected;
+ }
+
public ConnectedMessage(Application application, String sessionId, IKey key)
{
super(application, sessionId, key);
+ reconnected = false;
+ }
+
+ /**
+ * @return {@code true} if connection happened because client initiated a reconnection
+ */
+ public boolean isReconnected()
+ {
+ return reconnected;
}
@Override
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/ConnectedMessage.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/PongMessageMessage.java
similarity index 59%
copy from wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/ConnectedMessage.java
copy to wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/PongMessageMessage.java
index 8e0485f..0708f82 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/ConnectedMessage.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/PongMessageMessage.java
@@ -16,25 +16,40 @@
*/
package org.apache.wicket.protocol.ws.api.message;
+import java.nio.ByteBuffer;
+
import org.apache.wicket.Application;
import org.apache.wicket.protocol.ws.api.registry.IKey;
+import org.apache.wicket.util.lang.Args;
/**
- * A {@link IWebSocketMessage message} when a client creates web socket
- * connection.
+ * A {@link IWebSocketMessage message} with Pong message data
*
* @since 6.0
*/
-public class ConnectedMessage extends AbstractClientMessage
+public class PongMessageMessage extends AbstractClientMessage
{
- public ConnectedMessage(Application application, String sessionId, IKey key)
+ private final ByteBuffer byteBuffer;
+
+ /**
+ *
+ * @param application
+ * the Wicket application
+ * @param sessionId
+ * the id of the http session
+ * @param key
+ * the page id or resource name
+ * @param byteBuffer
+ * the message sent from the client
+ */
+ public PongMessageMessage(Application application, String sessionId, IKey key, ByteBuffer byteBuffer)
{
super(application, sessionId, key);
+ this.byteBuffer = Args.notNull(byteBuffer, "byteBuffer");
}
- @Override
- public final String toString()
+ public ByteBuffer getByteBuffer()
{
- return "Client is connected";
+ return byteBuffer;
}
}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js
index 93b7386..75cf05c 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js
@@ -85,17 +85,32 @@
url += '&context=' + encodeURIComponent(WWS.context);
}
+ // this flag is used at server side to send reconnect event
+ if (WWS.reconnect) {
+ url += '&reconnect=' + true
+ }
+
url += '&wicket-ajax-baseurl=' + encodeURIComponent(WWS.baseUrl);
url += '&wicket-app-name=' + encodeURIComponent(WWS.appName);
+ console.log(url);
+
self.ws = new WebSocket(url);
self.ws.onopen = function (evt) {
Wicket.Event.publish(topics.Opened, evt);
+ if (Wicket.WebSocket.useHeartBeat) {
+ self.heartbeat();
+ }
};
self.ws.onmessage = function (event) {
+ if (WWS.useHeartBeat) {
+ // reset heartbeat in any message
+ self.heartbeat();
+ }
+
var message = event.data;
if (typeof(message) === 'string' && message.indexOf('<ajax-response>') > -1) {
Wicket.channelManager.schedule(Wicket.WebSocket.MESSAGE_CHANNEL, Wicket.bind(function () {
@@ -120,6 +135,7 @@
self.ws.onclose = function (evt) {
if (self.ws) {
self.ws.close();
+ clearTimeout(self.pingTimeout);
self.ws = null;
Wicket.Event.publish(topics.Closed, evt);
}
@@ -139,6 +155,22 @@
}
},
+ heartbeat: function () {
+ clearTimeout(this.pingTimeout);
+ // Set a timeout in order to check ping received
+ this.pingTimeout = setTimeout(() => {
+ this.ws.close();
+ // try to reconnect to server
+ if (Wicket.WebSocket.reconnectOnFailure)
+ {
+ Wicket.Log.debug("Trying to reconnect to server");
+ Wicket.WebSocket.INSTANCE = null;
+ Wicket.WebSocket.reconnect = true;
+ Wicket.WebSocket.createDefaultConnection();
+ }
+ }, Wicket.WebSocket.heartBeatPace + Wicket.WebSocket.networkLatencyThreshold);
+ },
+
send: function (text) {
if (this.ws && text) {
Wicket.Log.info('[WebSocket.send] Sending: ' + text);
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl
index 98e9bc8..ee55ba5 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl
@@ -4,7 +4,10 @@
if (typeof(Wicket.WebSocket.appName) === "undefined") {
jQuery.extend(Wicket.WebSocket, { pageId: ${pageId}, context: '${context}', resourceName: '${resourceName}', connectionToken: '${connectionToken}',
baseUrl: '${baseUrl}', contextPath: '${contextPath}', appName: '${applicationName}',
- port: ${port}, securePort: ${securePort}, filterPrefix: '${filterPrefix}', sessionId: '${sessionId}' });
+ port: ${port}, securePort: ${securePort}, filterPrefix: '${filterPrefix}', sessionId: '${sessionId}',
+ useHeartBeat: ${useHeartBeat}, reconnectOnFailure: ${reconnectOnFailure},
+ heartBeatPace: ${heartBeatPace}, networkLatencyThreshold: ${networkLatencyThreshold}});
+
Wicket.WebSocket.createDefaultConnection();
}
})();
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/AbstractHeartBeatTimer.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/AbstractHeartBeatTimer.java
new file mode 100644
index 0000000..4829329
--- /dev/null
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/AbstractHeartBeatTimer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wicket.protocol.ws.timer;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.wicket.Application;
+import org.apache.wicket.protocol.ws.WebSocketSettings;
+import org.apache.wicket.protocol.ws.api.IWebSocketConnection;
+import org.apache.wicket.protocol.ws.concurrent.Executor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractHeartBeatTimer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractHeartBeatTimer.class);
+
+ protected final WebSocketSettings webSocketSettings;
+
+ // internal heartbeat's timer.
+ private Timer heartBeatsTimer;
+
+ public AbstractHeartBeatTimer(WebSocketSettings webSocketSettings)
+ {
+ this.webSocketSettings = webSocketSettings;
+ }
+
+ public final void start(Application application)
+ {
+ if (isTimerEnabled() == false)
+ {
+ return;
+ }
+
+ if (LOG.isInfoEnabled())
+ {
+ LOG.info("Starting thread pushing heart beats");
+ }
+
+ TimerTask timerTask = new TimerTask()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ sendHeartBeats(application);
+ }
+ catch (Exception e)
+ {
+ LOG.error("Error while checking connections", e);
+ }
+ }
+ };
+
+ this.heartBeatsTimer = new Timer(true);
+ this.heartBeatsTimer.schedule(timerTask, new Date(), webSocketSettings.getHeartBeatPace());
+ }
+
+ protected abstract boolean isTimerEnabled();
+
+ public final void stop()
+ {
+ if (LOG.isInfoEnabled())
+ {
+ LOG.info("Stopping thread pushing heart beats");
+ }
+ if (this.heartBeatsTimer != null)
+ {
+ this.heartBeatsTimer.cancel();
+ }
+ }
+
+ protected abstract void sendHeartBeats(Application application);
+}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/HeartBeatWithReconnectTimer.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/HeartBeatWithReconnectTimer.java
new file mode 100644
index 0000000..6b05fb5
--- /dev/null
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/HeartBeatWithReconnectTimer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wicket.protocol.ws.timer;
+
+import java.io.IOException;
+
+import org.apache.wicket.Application;
+import org.apache.wicket.protocol.ws.WebSocketSettings;
+import org.apache.wicket.protocol.ws.api.IWebSocketConnection;
+import org.apache.wicket.protocol.ws.concurrent.Executor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HeartBeatWithReconnectTimer extends AbstractHeartBeatTimer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HeartBeatWithReconnectTimer.class);
+
+
+ public HeartBeatWithReconnectTimer(WebSocketSettings webSocketSettings)
+ {
+ super(webSocketSettings);
+ }
+
+
+ @Override
+ protected boolean isTimerEnabled() {
+ if (webSocketSettings.isUseHeartBeat() == false)
+ {
+ if (LOG.isInfoEnabled())
+ {
+ LOG.info("useHeartBeat is set to false. Thus we won't start heartbeat's sending thread");
+ }
+ return false;
+ }
+ return true;
+ }
+
+
+ protected void sendHeartBeats(Application application)
+ {
+ final Executor heartBeatsExecutor = webSocketSettings.getHeartBeatsExecutor();
+ final int maxPingRetries = webSocketSettings.getMaxPingRetries();
+ for (IWebSocketConnection connection: webSocketSettings.getConnectionRegistry().getConnections(application))
+ {
+ heartBeatsExecutor.run(() -> ping(connection, maxPingRetries));
+ }
+ }
+
+ private void ping(IWebSocketConnection connection, final int pingRetryCounter)
+ {
+ try
+ {
+ // we just sent a binary message
+ connection.sendMessage(new byte[]{10});
+ // if client does not receive message it might try to reconnect
+ // depending on settings
+ }
+ catch (IOException e)
+ {
+ ping(connection, pingRetryCounter - 1);
+ }
+ }
+}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/PingPongHeartBeatTimer.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/PingPongHeartBeatTimer.java
new file mode 100644
index 0000000..ae24ac4
--- /dev/null
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/PingPongHeartBeatTimer.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wicket.protocol.ws.timer;
+
+import java.io.IOException;
+
+import org.apache.wicket.Application;
+import org.apache.wicket.protocol.ws.WebSocketSettings;
+import org.apache.wicket.protocol.ws.api.IWebSocketConnection;
+import org.apache.wicket.protocol.ws.concurrent.Executor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PingPongHeartBeatTimer extends AbstractHeartBeatTimer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PingPongHeartBeatTimer.class);
+
+ public PingPongHeartBeatTimer(WebSocketSettings webSocketSettings)
+ {
+ super(webSocketSettings);
+ }
+
+ @Override
+ protected boolean isTimerEnabled() {
+ if (webSocketSettings.isUsePingPongHeartBeat() == false)
+ {
+ if (LOG.isInfoEnabled())
+ {
+ LOG.info("usePingPongHeartBeat is set to false. Thus we won't start heartbeat's sending thread");
+ }
+ return false;
+ }
+ return true;
+ }
+
+
+ protected void sendHeartBeats(Application application)
+ {
+ final long heartBeatPace = webSocketSettings.getHeartBeatPace();
+ final long networkLatencyThreshold = webSocketSettings.getNetworkLatencyThreshold();
+ final Executor heartBeatsExecutor = webSocketSettings.getHeartBeatsExecutor();
+ final int maxPingRetries = webSocketSettings.getMaxPingRetries();
+ for (IWebSocketConnection connection: webSocketSettings.getConnectionRegistry().getConnections(application))
+ {
+ // connection didn't received the PONG from peer terminate it
+ if (connection.isAlive() == false)
+ {
+ if (connection.getLastTimeAlive() - System.currentTimeMillis() > (heartBeatPace + networkLatencyThreshold))
+ {
+ heartBeatsExecutor.run(() -> terminateConnection(connection));
+ }
+ }
+ else
+ {
+ heartBeatsExecutor.run(() -> ping(connection, maxPingRetries));
+ }
+ }
+ }
+
+ private void ping(IWebSocketConnection connection, final int pingRetryCounter)
+ {
+ try
+ {
+ connection.ping();
+ }
+ catch (IOException e)
+ {
+ if (pingRetryCounter == 0)
+ {
+ // ping failed enough times kill connection
+ terminateConnection(connection);
+ }
+ else
+ {
+ ping(connection, pingRetryCounter - 1);
+ }
+ }
+ }
+
+ private void terminateConnection(IWebSocketConnection connection)
+ {
+ connection.setAlive(false);
+ if (LOG.isInfoEnabled())
+ {
+ LOG.info("Terminating connection with ID {} because ping of remote peer failed {} times",
+ connection.getKey(), webSocketSettings.getMaxPingRetries());
+ }
+ connection.terminate("Failed to ping remote peer");
+ webSocketSettings.getConnectionRegistry().removeConnection(connection.getApplication(), connection.getSessionId(), connection.getKey());
+ }
+}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
index 1923f49..f9c83cd 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
@@ -17,6 +17,7 @@
package org.apache.wicket.protocol.ws.util.tester;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.wicket.Application;
import org.apache.wicket.protocol.http.WebApplication;
@@ -43,6 +44,41 @@ abstract class TestWebSocketConnection implements IWebSocketConnection
}
@Override
+ public long getLastTimeAlive() {
+ return 0;
+ }
+
+ @Override
+ public boolean isAlive() {
+ return false;
+ }
+
+ @Override
+ public void setAlive(boolean alive) {
+
+ }
+
+ @Override
+ public void ping() throws IOException {
+
+ }
+
+ @Override
+ public void pong() throws IOException {
+
+ }
+
+ @Override
+ public void onPong(ByteBuffer byteBuffer) {
+
+ }
+
+ @Override
+ public void terminate(String reason) {
+ close(-1, "abnormally closed");
+ }
+
+ @Override
public boolean isOpen()
{
return isOpen;
@@ -70,6 +106,13 @@ abstract class TestWebSocketConnection implements IWebSocketConnection
return this;
}
+ @Override
+ public IWebSocketConnection sendMessage(byte[] message) throws IOException {
+ checkOpenness();
+ onOutMessage(message);
+ return this;
+ }
+
/**
* A callback method that is called when a text message should be send to the client
*
@@ -79,7 +122,7 @@ abstract class TestWebSocketConnection implements IWebSocketConnection
protected abstract void onOutMessage(String message);
/**
- * A callback method that is called when a text message should be send to the client
+ * A callback method that is called when a text message should be sent to the client
*
* @param message
* the binary message to deliver to the client
@@ -90,6 +133,14 @@ abstract class TestWebSocketConnection implements IWebSocketConnection
*/
protected abstract void onOutMessage(byte[] message, int offset, int length);
+ /**
+ * A callback method that is called when a text message should be sent to the client
+ *
+ * @param message
+ * the binary message to deliver to the client
+ */
+ protected abstract void onOutMessage(byte[] message);
+
private void checkOpenness()
{
if (isOpen() == false)
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
index cc121e6..78994e2 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
@@ -16,6 +16,8 @@
*/
package org.apache.wicket.protocol.ws.util.tester;
+import java.io.IOException;
+
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
@@ -24,6 +26,7 @@ import org.apache.wicket.Page;
import org.apache.wicket.protocol.http.WebApplication;
import org.apache.wicket.protocol.http.mock.MockHttpServletRequest;
import org.apache.wicket.protocol.ws.api.AbstractWebSocketProcessor;
+import org.apache.wicket.protocol.ws.api.IWebSocketConnection;
import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
import org.apache.wicket.request.http.WebRequest;
import org.apache.wicket.util.lang.Args;
@@ -154,11 +157,16 @@ abstract class TestWebSocketProcessor extends AbstractWebSocketProcessor
}
@Override
+ protected void onOutMessage(byte[] message) {
+ TestWebSocketProcessor.this.onOutMessage(message);
+ }
+
+ @Override
public void sendMessage(IWebSocketPushMessage message)
{
TestWebSocketProcessor.this.broadcastMessage(message);
}
- });
+ }, false);
}
/**
@@ -180,4 +188,12 @@ abstract class TestWebSocketProcessor extends AbstractWebSocketProcessor
* the length of bytes to read from the binary message
*/
protected abstract void onOutMessage(byte[] message, int offset, int length);
+
+ /**
+ * A callback method that is being called when a binary message is written to the TestWebSocketConnection
+ *
+ * @param message
+ * the binary message to deliver to the client
+ */
+ protected abstract void onOutMessage(byte[] message);
}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java
index df67eac..472a2c9 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java
@@ -72,6 +72,12 @@ public class WebSocketTester
{
WebSocketTester.this.onOutMessage(message, offset, length);
}
+
+ @Override
+ protected void onOutMessage(byte[] message)
+ {
+ WebSocketTester.this.onOutMessage(message);
+ }
};
socketProcessor.onOpen(null);
}
@@ -112,6 +118,12 @@ public class WebSocketTester
{
WebSocketTester.this.onOutMessage(message, offset, length);
}
+
+ @Override
+ protected void onOutMessage(byte[] message)
+ {
+ WebSocketTester.this.onOutMessage(message);
+ }
};
socketProcessor.onOpen(null);
}
@@ -205,4 +217,14 @@ public class WebSocketTester
protected void onOutMessage(byte[] message, int offset, int length)
{
}
+
+ /**
+ * A callback method which may be overritten to receive messages pushed by the server
+ *
+ * @param message
+ * the pushed binary message from the server
+ */
+ protected void onOutMessage(byte[] message)
+ {
+ }
}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/test/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTesterProcessorTest.java b/wicket-native-websocket/wicket-native-websocket-core/src/test/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTesterProcessorTest.java
index 0b807fb..fcdb180 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/test/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTesterProcessorTest.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/test/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTesterProcessorTest.java
@@ -59,6 +59,11 @@ public class WebSocketTesterProcessorTest
{
messageReceived.set(true);
}
+
+ @Override
+ protected void onOutMessage(byte[] message) {
+ messageReceived.set(true);
+ }
}
WicketTester tester;
diff --git a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java
index f7e4ca4..3fc9817 100644
--- a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java
@@ -18,6 +18,8 @@ package org.apache.wicket.protocol.ws.javax;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import javax.websocket.CloseReason;
import javax.websocket.Session;
@@ -40,6 +42,9 @@ public class JavaxWebSocketConnection extends AbstractWebSocketConnection
private final Session session;
+ private final AtomicBoolean alive = new AtomicBoolean(false);
+ private final AtomicLong lastTimeAlive = new AtomicLong(System.currentTimeMillis());
+
/**
* Constructor.
*
@@ -50,6 +55,69 @@ public class JavaxWebSocketConnection extends AbstractWebSocketConnection
{
super(webSocketProcessor);
this.session = Args.notNull(session, "session");
+ setAlive(true);
+ }
+
+ @Override
+ public long getLastTimeAlive()
+ {
+ return lastTimeAlive.get();
+ }
+
+ @Override
+ public boolean isAlive()
+ {
+ return alive.get();
+ }
+
+ @Override
+ public void setAlive(boolean alive)
+ {
+ if (alive)
+ {
+ // is connection if alive we set the timestamp.
+ this.lastTimeAlive.set(System.currentTimeMillis());
+ }
+ this.alive.set(alive);
+ }
+
+ @Override
+ public synchronized void terminate(String reason)
+ {
+ close(CloseReason.CloseCodes.CLOSED_ABNORMALLY.getCode(), reason);
+ }
+
+ @Override
+ public void ping() throws IOException
+ {
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("Pinging connection {}", getKey());
+ }
+ ByteBuffer buf = ByteBuffer.wrap(new byte[]{0xA});
+ session.getBasicRemote().sendPing(buf);
+ }
+
+ @Override
+ public void pong() throws IOException
+ {
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("Sending unidirectional pon for connection {}", getKey());
+ }
+ ByteBuffer buf = ByteBuffer.wrap(new byte[]{0xA});
+ session.getBasicRemote().sendPong(buf);
+ }
+
+ @Override
+ public void onPong(ByteBuffer byteBuffer)
+ {
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("Pong receive for {} with contents {}", getKey(), byteBuffer.array());
+ }
+ // we received pong answer from remote peer. Thus, connection is alive
+ setAlive(true);
}
@Override
@@ -66,7 +134,8 @@ public class JavaxWebSocketConnection extends AbstractWebSocketConnection
try
{
session.close(new CloseReason(new CloseCode(code), reason));
- } catch (IOException iox)
+ }
+ catch (IOException iox)
{
LOG.error("An error occurred while closing WebSocket session", iox);
}
@@ -93,6 +162,15 @@ public class JavaxWebSocketConnection extends AbstractWebSocketConnection
return this;
}
+ @Override
+ public IWebSocketConnection sendMessage(byte[] message) throws IOException {
+ checkClosed();
+
+ ByteBuffer buf = ByteBuffer.wrap(message);
+ session.getBasicRemote().sendBinary(buf);
+ return this;
+ }
+
private void checkClosed()
{
if (!isOpen())
diff --git a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java
index 4d2e052..29fa130 100644
--- a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java
@@ -17,15 +17,19 @@
package org.apache.wicket.protocol.ws.javax;
import java.nio.ByteBuffer;
+import java.util.List;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
+import javax.websocket.PongMessage;
import javax.websocket.Session;
import org.apache.wicket.protocol.http.WebApplication;
import org.apache.wicket.protocol.ws.WebSocketSettings;
import org.apache.wicket.protocol.ws.api.AbstractWebSocketProcessor;
import org.apache.wicket.protocol.ws.api.IWebSocketSession;
+import org.apache.wicket.protocol.ws.api.message.TextMessage;
+import org.apache.wicket.protocol.ws.api.registry.IKey;
/**
* An {@link org.apache.wicket.protocol.ws.api.IWebSocketProcessor processor} that integrates with
@@ -49,10 +53,12 @@ public class JavaxWebSocketProcessor extends AbstractWebSocketProcessor
{
super(new JavaxUpgradeHttpRequest(session, endpointConfig), application);
- onConnect(new JavaxWebSocketConnection(session, this));
+ List<String> reconnect = session.getRequestParameterMap().get("reconnect");
+ onConnect(new JavaxWebSocketConnection(session, this), reconnect != null && !reconnect.isEmpty());
session.addMessageHandler(new StringMessageHandler());
session.addMessageHandler(new BinaryMessageHandler());
+ session.addMessageHandler(new PongMessageMessageHandler());
}
@Override
@@ -60,6 +66,17 @@ public class JavaxWebSocketProcessor extends AbstractWebSocketProcessor
{
}
+
+ private class PongMessageMessageHandler implements MessageHandler.Whole<PongMessage>
+ {
+ @Override
+ public void onMessage(PongMessage message)
+ {
+ JavaxWebSocketProcessor.this.onPong(message.getApplicationData());
+ }
+ }
+
+
private class StringMessageHandler implements MessageHandler.Whole<String>
{
@Override