You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wicket.apache.org by re...@apache.org on 2022/04/05 16:46:25 UTC
[wicket] 01/01: {WICKET-6969} allow asynchronous pushing of messages.
This is an automated email from the ASF dual-hosted git repository.
reiern70 pushed a commit to branch feature/reiern70/WICKET-6969
in repository https://gitbox.apache.org/repos/asf/wicket.git
commit 021b63d8752da8d679a2f5f0ee0f89b74f8b03af
Author: reiern70 <re...@gmail.com>
AuthorDate: Tue Apr 5 10:06:11 2022 -0600
{WICKET-6969} allow asynchronous pushing of messages.
---
.../wicket/protocol/ws/WebSocketSettings.java | 31 ++++++++---
.../ws/api/AbstractWebSocketConnection.java | 10 +++-
.../ws/api/AbstractWebSocketProcessor.java | 8 +--
.../protocol/ws/api/IWebSocketConnection.java | 21 ++++++--
.../protocol/ws/api/IWebSocketRequestHandler.java | 54 +++++++++++++++++++
.../protocol/ws/api/WebSocketPushBroadcaster.java | 9 +++-
.../protocol/ws/api/WebSocketRequestHandler.java | 61 ++++++++++++++++++++++
.../wicket/protocol/ws/api/WebSocketResponse.java | 23 ++++++--
.../ws/util/tester/TestWebSocketConnection.java | 5 +-
.../ws/util/tester/TestWebSocketProcessor.java | 6 +++
10 files changed, 206 insertions(+), 22 deletions(-)
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
index df78a18bbf..e151b1f3d3 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
@@ -158,11 +158,18 @@ public class WebSocketSettings
*/
private Function<Integer, Boolean> notifyOnCloseEvent = (code) -> true;
- public boolean shouldNotifyOnCloseEvent(int closeCode) {
+ /**
+ * Flag that allows to use asynchronous push. By default, it is set to false.
+ */
+ private boolean asynchronousPush = false;
+
+ public boolean shouldNotifyOnCloseEvent(int closeCode)
+ {
return notifyOnCloseEvent == null || notifyOnCloseEvent.apply(closeCode);
}
- public void setNotifyOnCloseEvent(Function<Integer, Boolean> notifyOnCloseEvent) {
+ public void setNotifyOnCloseEvent(Function<Integer, Boolean> notifyOnCloseEvent)
+ {
this.notifyOnCloseEvent = notifyOnCloseEvent;
}
@@ -174,11 +181,13 @@ public class WebSocketSettings
*/
private Function<Throwable, Boolean> notifyOnErrorEvent = (throwable) -> true;
- public boolean shouldNotifyOnErrorEvent(Throwable throwable) {
+ public boolean shouldNotifyOnErrorEvent(Throwable throwable)
+ {
return notifyOnErrorEvent == null || notifyOnErrorEvent.apply(throwable);
}
- public void setNotifyOnErrorEvent(Function<Throwable, Boolean> notifyOnErrorEvent) {
+ public void setNotifyOnErrorEvent(Function<Throwable, Boolean> notifyOnErrorEvent)
+ {
this.notifyOnErrorEvent = notifyOnErrorEvent;
}
@@ -303,9 +312,9 @@ public class WebSocketSettings
* The active web socket connection
* @return the response object that should be used to write the response back to the client
*/
- public WebResponse newWebSocketResponse(IWebSocketConnection connection)
+ public WebResponse newWebSocketResponse(IWebSocketConnection connection, boolean asynchronousPush)
{
- return new WebSocketResponse(connection);
+ return new WebSocketResponse(connection, asynchronousPush);
}
/**
@@ -497,4 +506,14 @@ public class WebSocketSettings
return new Thread(r, "Wicket-WebSocket-HttpRequest-Thread-" + counter.getAndIncrement());
}
}
+
+ public void setAsynchronousPush(boolean asynchronousPush)
+ {
+ this.asynchronousPush = asynchronousPush;
+ }
+
+ public boolean isAsynchronousPush()
+ {
+ return asynchronousPush;
+ }
}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java
index 3cd0f627d4..c9b64029b7 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java
@@ -21,6 +21,8 @@ import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
import org.apache.wicket.protocol.ws.api.registry.IKey;
import org.apache.wicket.util.lang.Args;
+import java.util.concurrent.Future;
+
/**
* Abstract class handling the Web Socket broadcast messages.
*/
@@ -50,7 +52,13 @@ public abstract class AbstractWebSocketConnection implements IWebSocketConnectio
@Override
public void sendMessage(IWebSocketPushMessage message)
{
- webSocketProcessor.broadcastMessage(message, this);
+ webSocketProcessor.broadcastMessage(message, this, false);
+ }
+
+ @Override
+ public void sendMessageAsync(IWebSocketPushMessage message)
+ {
+ webSocketProcessor.broadcastMessage(message, this, true);
}
@Override
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
index bf5a6a812d..4159465451 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
@@ -184,7 +184,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
}
}
- broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key), connection);
+ broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key), connection, webSocketSettings.isAsynchronousPush());
}
@Override
@@ -210,7 +210,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
{
IKey key = getRegistryKey();
IWebSocketConnection connection = connectionRegistry.getConnection(application, sessionId, key);
- broadcastMessage(message, connection);
+ broadcastMessage(message, connection, webSocketSettings.isAsynchronousPush());
}
/**
@@ -225,7 +225,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
* @param message
* the message to broadcast
*/
- public final void broadcastMessage(final IWebSocketMessage message, IWebSocketConnection connection)
+ public final void broadcastMessage(final IWebSocketMessage message, IWebSocketConnection connection, boolean asynchronousPush)
{
if (connection != null && (connection.isOpen() || isSpecialMessage(message)))
{
@@ -233,7 +233,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
Session oldSession = ThreadContext.getSession();
RequestCycle oldRequestCycle = ThreadContext.getRequestCycle();
- WebResponse webResponse = webSocketSettings.newWebSocketResponse(connection);
+ WebResponse webResponse = webSocketSettings.newWebSocketResponse(connection, asynchronousPush);
try
{
WebSocketRequestMapper requestMapper = new WebSocketRequestMapper(application.getRootRequestMapper());
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
index 7275559bd5..762025602c 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
@@ -71,11 +71,11 @@ public interface IWebSocketConnection
*
* @param message
* the text message
- * @param timeOut
+ * @param timeout
* the timeout for operation
* @return a {@link java.util.concurrent.Future} representing the send operation
*/
- Future<Void> sendMessageAsync(String message, long timeOut);
+ Future<Void> sendMessageAsync(String message, long timeout);
/**
* Sends a binary message to the client.
@@ -113,11 +113,11 @@ public interface IWebSocketConnection
* the offset to read from
* @param length
* how much data to read
- * @param timeOut
- * * the timeout for operation
+ * @param timeout
+ * the timeout for operation
* @return a {@link java.util.concurrent.Future} representing the send operation
*/
- Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeOut);
+ Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeout);
/**
* Broadcasts a push message to the wicket page (and it's components) associated with this
@@ -130,6 +130,17 @@ public interface IWebSocketConnection
*/
void sendMessage(IWebSocketPushMessage message);
+ /**
+ * Broadcasts a push message to the wicket page (and it's components) associated with this
+ * connection. The components can then send messages or component updates to client by adding
+ * them to the target. Pushing to client is done asynchronously.
+ *
+ * @param message
+ * the push message to send
+ *
+ */
+ void sendMessageAsync(IWebSocketPushMessage message);
+
/**
* @return The application for which this WebSocket connection is registered
*/
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java
index 8d49155c28..eae4504d04 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java
@@ -19,6 +19,8 @@ package org.apache.wicket.protocol.ws.api;
import org.apache.wicket.core.request.handler.IPartialPageRequestHandler;
import org.apache.wicket.request.ILoggableRequestHandler;
+import java.util.concurrent.Future;
+
/**
* An interface for outbound communication with web socket clients
*
@@ -34,6 +36,28 @@ public interface IWebSocketRequestHandler extends IPartialPageRequestHandler, IL
*/
void push(CharSequence message);
+ /**
+ * Pushes a text message to the client in an asynchronous way.
+ *
+ * @param message
+ * the text message to push to the client if the web socket connection is open
+ * @return
+ * a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed.
+ */
+ Future<Void> pushAsync(CharSequence message);
+
+ /**
+ * Pushes a text message to the client in an asynchronous way.
+ *
+ * @param message
+ * the text message to push to the client if the web socket connection is open
+ * @param timeout
+ * the timeout for operation
+ * @return
+ * a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed.
+ */
+ Future<Void> pushAsync(CharSequence message, long timeout);
+
/**
* Pushes a binary message to the client.
*
@@ -45,4 +69,34 @@ public interface IWebSocketRequestHandler extends IPartialPageRequestHandler, IL
* how many bytes to read from the message
*/
void push(byte[] message, int offset, int length);
+
+ /**
+ * Pushes a binary message to the client.
+ *
+ * @param message
+ * the binary message to push to the client if the web socket connection is open
+ * @param offset
+ * the offset to start to read from the message
+ * @param length
+ * how many bytes to read from the message
+ * @return
+ * a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed.
+ */
+ Future<Void> pushAsync(byte[] message, int offset, int length);
+
+ /**
+ * Pushes a binary message to the client.
+ *
+ * @param message
+ * the binary message to push to the client if the web socket connection is open
+ * @param offset
+ * the offset to start to read from the message
+ * @param length
+ * how many bytes to read from the message
+ * @param timeout
+ * the timeout for operation
+ * @return
+ * a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed.
+ */
+ Future<Void> pushAsync(byte[] message, int offset, int length, long timeout);
}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java
index c5f6d78e02..23a71476f9 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java
@@ -189,7 +189,14 @@ public class WebSocketPushBroadcaster
@Override
public void run()
{
- wsConnection.sendMessage(message);
+ if (webSocketSettings.isAsynchronousPush())
+ {
+ wsConnection.sendMessageAsync(message);
+ }
+ else
+ {
+ wsConnection.sendMessage(message);
+ }
}
});
}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java
index 7ce1f620b5..58d9f4d280 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java
@@ -19,6 +19,7 @@ package org.apache.wicket.protocol.ws.api;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.concurrent.Future;
import org.apache.wicket.Component;
import org.apache.wicket.Page;
@@ -77,6 +78,36 @@ public class WebSocketRequestHandler extends AbstractPartialPageRequestHandler i
}
}
+ @Override
+ public Future<Void> pushAsync(CharSequence message, long timeout)
+ {
+ if (connection.isOpen())
+ {
+ Args.notNull(message, "message");
+ return connection.sendMessageAsync(message.toString(), timeout);
+ }
+ else
+ {
+ LOG.warn("The websocket connection is already closed. Cannot push the text message '{}'", message);
+ }
+ return null;
+ }
+
+ @Override
+ public Future<Void> pushAsync(CharSequence message)
+ {
+ if (connection.isOpen())
+ {
+ Args.notNull(message, "message");
+ return connection.sendMessageAsync(message.toString());
+ }
+ else
+ {
+ LOG.warn("The websocket connection is already closed. Cannot push the text message '{}'", message);
+ }
+ return null;
+ }
+
@Override
public void push(byte[] message, int offset, int length)
{
@@ -97,6 +128,36 @@ public class WebSocketRequestHandler extends AbstractPartialPageRequestHandler i
}
}
+ @Override
+ public Future<Void> pushAsync(byte[] message, int offset, int length)
+ {
+ if (connection.isOpen())
+ {
+ Args.notNull(message, "message");
+ return connection.sendMessageAsync(message, offset, length);
+ }
+ else
+ {
+ LOG.warn("The websocket connection is already closed. Cannot push the binary message '{}'", message);
+ }
+ return null;
+ }
+
+ @Override
+ public Future<Void> pushAsync(byte[] message, int offset, int length, long timeout)
+ {
+ if (connection.isOpen())
+ {
+ Args.notNull(message, "message");
+ return connection.sendMessageAsync(message, offset, length);
+ }
+ else
+ {
+ LOG.warn("The websocket connection is already closed. Cannot push the binary message '{}'", message);
+ }
+ return null;
+ }
+
/**
* @return if <code>true</code> then EMPTY partial updates will se send. If <code>false</code> then EMPTY
* partial updates will be skipped. A possible use case is: a page receives and a push event but no one is
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java
index af1c938afc..9cb330355b 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java
@@ -47,9 +47,12 @@ public class WebSocketResponse extends WebResponse
private boolean isRedirect = false;
- public WebSocketResponse(final IWebSocketConnection conn)
+ private final boolean asynchronous;
+
+ public WebSocketResponse(final IWebSocketConnection conn, boolean asynchronous)
{
this.connection = conn;
+ this.asynchronous = asynchronous;
}
@Override
@@ -87,13 +90,27 @@ public class WebSocketResponse extends WebResponse
{
if (text != null)
{
- connection.sendMessage(text.toString());
+ if (asynchronous)
+ {
+ connection.sendMessageAsync(text.toString());
+ }
+ else
+ {
+ connection.sendMessage(text.toString());
+ }
text = null;
}
else if (binary != null)
{
byte[] bytes = binary.toByteArray();
- connection.sendMessage(bytes, 0, bytes.length);
+ if (asynchronous)
+ {
+ connection.sendMessageAsync(bytes, 0, bytes.length);
+ }
+ else
+ {
+ connection.sendMessage(bytes, 0, bytes.length);
+ }
binary.close();
binary = null;
}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
index 28dce651a5..ed4bd3748e 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Future;
import org.apache.wicket.Application;
import org.apache.wicket.protocol.http.WebApplication;
import org.apache.wicket.protocol.ws.api.IWebSocketConnection;
+import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
import org.apache.wicket.protocol.ws.api.registry.IKey;
/**
@@ -92,14 +93,14 @@ abstract class TestWebSocketConnection implements IWebSocketConnection
}
@Override
- public Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeOut)
+ public Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeout)
{
checkOpenness();
onOutMessage(message, offset, length);
return null;
}
- /**
+ /**
* A callback method that is called when a text message should be send to the client
*
* @param message
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
index cc121e6b7a..013ecefc6c 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
@@ -158,6 +158,12 @@ abstract class TestWebSocketProcessor extends AbstractWebSocketProcessor
{
TestWebSocketProcessor.this.broadcastMessage(message);
}
+
+ @Override
+ public void sendMessageAsync(IWebSocketPushMessage message)
+ {
+ TestWebSocketProcessor.this.broadcastMessage(message);
+ }
});
}