You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wicket.apache.org by re...@apache.org on 2022/04/05 16:46:25 UTC

[wicket] 01/01: {WICKET-6969} allow asynchronous pushing of messages.

This is an automated email from the ASF dual-hosted git repository.

reiern70 pushed a commit to branch feature/reiern70/WICKET-6969
in repository https://gitbox.apache.org/repos/asf/wicket.git

commit 021b63d8752da8d679a2f5f0ee0f89b74f8b03af
Author: reiern70 <re...@gmail.com>
AuthorDate: Tue Apr 5 10:06:11 2022 -0600

    {WICKET-6969} allow asynchronous pushing of messages.
---
 .../wicket/protocol/ws/WebSocketSettings.java      | 31 ++++++++---
 .../ws/api/AbstractWebSocketConnection.java        | 10 +++-
 .../ws/api/AbstractWebSocketProcessor.java         |  8 +--
 .../protocol/ws/api/IWebSocketConnection.java      | 21 ++++++--
 .../protocol/ws/api/IWebSocketRequestHandler.java  | 54 +++++++++++++++++++
 .../protocol/ws/api/WebSocketPushBroadcaster.java  |  9 +++-
 .../protocol/ws/api/WebSocketRequestHandler.java   | 61 ++++++++++++++++++++++
 .../wicket/protocol/ws/api/WebSocketResponse.java  | 23 ++++++--
 .../ws/util/tester/TestWebSocketConnection.java    |  5 +-
 .../ws/util/tester/TestWebSocketProcessor.java     |  6 +++
 10 files changed, 206 insertions(+), 22 deletions(-)

diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
index df78a18bbf..e151b1f3d3 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
@@ -158,11 +158,18 @@ public class WebSocketSettings
 	 */
 	private Function<Integer, Boolean> notifyOnCloseEvent = (code) -> true;
 
-	public boolean shouldNotifyOnCloseEvent(int closeCode) {
+	/**
+	 * Flag that allows to use asynchronous push. By default, it is set to false.
+	 */
+	private boolean asynchronousPush = false;
+
+	public boolean shouldNotifyOnCloseEvent(int closeCode)
+	{
 		return notifyOnCloseEvent == null || notifyOnCloseEvent.apply(closeCode);
 	}
 
-	public void setNotifyOnCloseEvent(Function<Integer, Boolean> notifyOnCloseEvent) {
+	public void setNotifyOnCloseEvent(Function<Integer, Boolean> notifyOnCloseEvent)
+	{
 		this.notifyOnCloseEvent = notifyOnCloseEvent;
 	}
 
@@ -174,11 +181,13 @@ public class WebSocketSettings
 	 */
 	private Function<Throwable, Boolean> notifyOnErrorEvent = (throwable) -> true;
 
-	public boolean shouldNotifyOnErrorEvent(Throwable throwable) {
+	public boolean shouldNotifyOnErrorEvent(Throwable throwable)
+	{
 		return notifyOnErrorEvent == null || notifyOnErrorEvent.apply(throwable);
 	}
 
-	public void setNotifyOnErrorEvent(Function<Throwable, Boolean> notifyOnErrorEvent) {
+	public void setNotifyOnErrorEvent(Function<Throwable, Boolean> notifyOnErrorEvent)
+	{
 		this.notifyOnErrorEvent = notifyOnErrorEvent;
 	}
 
@@ -303,9 +312,9 @@ public class WebSocketSettings
 	 *              The active web socket connection
 	 * @return the response object that should be used to write the response back to the client
 	 */
-	public WebResponse newWebSocketResponse(IWebSocketConnection connection)
+	public WebResponse newWebSocketResponse(IWebSocketConnection connection, boolean asynchronousPush)
 	{
-		return new WebSocketResponse(connection);
+		return new WebSocketResponse(connection, asynchronousPush);
 	}
 
 	/**
@@ -497,4 +506,14 @@ public class WebSocketSettings
 			return new Thread(r, "Wicket-WebSocket-HttpRequest-Thread-" + counter.getAndIncrement());
 		}
 	}
+
+	public void setAsynchronousPush(boolean asynchronousPush)
+	{
+		this.asynchronousPush = asynchronousPush;
+	}
+
+	public boolean isAsynchronousPush()
+	{
+		return asynchronousPush;
+	}
 }
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java
index 3cd0f627d4..c9b64029b7 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java
@@ -21,6 +21,8 @@ import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
 import org.apache.wicket.protocol.ws.api.registry.IKey;
 import org.apache.wicket.util.lang.Args;
 
+import java.util.concurrent.Future;
+
 /**
  * Abstract class handling the Web Socket broadcast messages.
  */
@@ -50,7 +52,13 @@ public abstract class AbstractWebSocketConnection implements IWebSocketConnectio
 	@Override
 	public void sendMessage(IWebSocketPushMessage message)
 	{
-		webSocketProcessor.broadcastMessage(message, this);
+		webSocketProcessor.broadcastMessage(message, this, false);
+	}
+
+	@Override
+	public void sendMessageAsync(IWebSocketPushMessage message)
+	{
+		webSocketProcessor.broadcastMessage(message, this, true);
 	}
 
 	@Override
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
index bf5a6a812d..4159465451 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
@@ -184,7 +184,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
 			}
 		}
 
-		broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key), connection);
+		broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key), connection, webSocketSettings.isAsynchronousPush());
 	}
 
 	@Override
@@ -210,7 +210,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
 	{
 		IKey key = getRegistryKey();
 		IWebSocketConnection connection = connectionRegistry.getConnection(application, sessionId, key);
-		broadcastMessage(message, connection);
+		broadcastMessage(message, connection, webSocketSettings.isAsynchronousPush());
 	}
 
 	/**
@@ -225,7 +225,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
 	 * @param message
 	 *      the message to broadcast
 	 */
-	public final void broadcastMessage(final IWebSocketMessage message, IWebSocketConnection connection)
+	public final void broadcastMessage(final IWebSocketMessage message, IWebSocketConnection connection, boolean asynchronousPush)
 	{
 		if (connection != null && (connection.isOpen() || isSpecialMessage(message)))
 		{
@@ -233,7 +233,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
 			Session oldSession = ThreadContext.getSession();
 			RequestCycle oldRequestCycle = ThreadContext.getRequestCycle();
 
-			WebResponse webResponse = webSocketSettings.newWebSocketResponse(connection);
+			WebResponse webResponse = webSocketSettings.newWebSocketResponse(connection, asynchronousPush);
 			try
 			{
 				WebSocketRequestMapper requestMapper = new WebSocketRequestMapper(application.getRootRequestMapper());
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
index 7275559bd5..762025602c 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
@@ -71,11 +71,11 @@ public interface IWebSocketConnection
      *
      * @param message
      *      the text message
-     * @param timeOut
+     * @param timeout
      *      the timeout for operation
      * @return a {@link java.util.concurrent.Future} representing the send operation
      */
-    Future<Void> sendMessageAsync(String message, long timeOut);
+    Future<Void> sendMessageAsync(String message, long timeout);
 
 	/**
 	 * Sends a binary message to the client.
@@ -113,11 +113,11 @@ public interface IWebSocketConnection
      *      the offset to read from
      * @param length
      *      how much data to read
-     * @param timeOut
-     *      *      the timeout for operation
+     * @param timeout
+     *      the timeout for operation
      * @return a {@link java.util.concurrent.Future} representing the send operation
      */
-    Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeOut);
+    Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeout);
 
 	/**
 	 * Broadcasts a push message to the wicket page (and it's components) associated with this
@@ -130,6 +130,17 @@ public interface IWebSocketConnection
 	 */
 	void sendMessage(IWebSocketPushMessage message);
 
+	/**
+	 * Broadcasts a push message to the wicket page (and it's components) associated with this
+	 * connection. The components can then send messages or component updates to client by adding
+	 * them to the target. Pushing to client is done asynchronously.
+	 *
+	 * @param message
+	 *     the push message to send
+	 *
+	 */
+	void sendMessageAsync(IWebSocketPushMessage message);
+
 	/**
 	 * @return The application for which this WebSocket connection is registered
 	 */
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java
index 8d49155c28..eae4504d04 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java
@@ -19,6 +19,8 @@ package org.apache.wicket.protocol.ws.api;
 import org.apache.wicket.core.request.handler.IPartialPageRequestHandler;
 import org.apache.wicket.request.ILoggableRequestHandler;
 
+import java.util.concurrent.Future;
+
 /**
  * An interface for outbound communication with web socket clients
  *
@@ -34,6 +36,28 @@ public interface IWebSocketRequestHandler extends IPartialPageRequestHandler, IL
 	 */
 	void push(CharSequence message);
 
+	/**
+	 * Pushes a text message to the client in an asynchronous way.
+	 *
+	 * @param message
+	 *      the text message to push to the client if the web socket connection is open
+	 * @return
+	 *      a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed.
+	 */
+	Future<Void> pushAsync(CharSequence message);
+
+	/**
+	 * Pushes a text message to the client in an asynchronous way.
+	 *
+	 * @param message
+	 *      the text message to push to the client if the web socket connection is open
+	 * @param timeout
+	 *      the timeout for operation
+	 * @return
+	 * 		a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed.
+	 */
+	Future<Void> pushAsync(CharSequence message, long timeout);
+
 	/**
 	 * Pushes a binary message to the client.
 	 *
@@ -45,4 +69,34 @@ public interface IWebSocketRequestHandler extends IPartialPageRequestHandler, IL
 	 *      how many bytes to read from the message
 	 */
 	void push(byte[] message, int offset, int length);
+
+	/**
+	 * Pushes a binary message to the client.
+	 *
+	 * @param message
+	 *      the binary message to push to the client if the web socket connection is open
+	 * @param offset
+	 *      the offset to start to read from the message
+	 * @param length
+	 *      how many bytes to read from the message
+	 * @return
+	 * 		a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed.
+	 */
+	Future<Void> pushAsync(byte[] message, int offset, int length);
+
+	/**
+	 * Pushes a binary message to the client.
+	 *
+	 * @param message
+	 *      the binary message to push to the client if the web socket connection is open
+	 * @param offset
+	 *      the offset to start to read from the message
+	 * @param length
+	 *      how many bytes to read from the message
+	 * @param timeout
+	 *      the timeout for operation
+	 * @return
+	 * 		a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed.
+	 */
+	Future<Void> pushAsync(byte[] message, int offset, int length, long timeout);
 }
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java
index c5f6d78e02..23a71476f9 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java
@@ -189,7 +189,14 @@ public class WebSocketPushBroadcaster
 				@Override
 				public void run()
 				{
-					wsConnection.sendMessage(message);
+					if (webSocketSettings.isAsynchronousPush())
+					{
+						wsConnection.sendMessageAsync(message);
+					}
+					else
+					{
+						wsConnection.sendMessage(message);
+					}
 				}
 			});
 		}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java
index 7ce1f620b5..58d9f4d280 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java
@@ -19,6 +19,7 @@ package org.apache.wicket.protocol.ws.api;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.Future;
 
 import org.apache.wicket.Component;
 import org.apache.wicket.Page;
@@ -77,6 +78,36 @@ public class WebSocketRequestHandler extends AbstractPartialPageRequestHandler i
 		}
 	}
 
+	@Override
+	public Future<Void> pushAsync(CharSequence message, long timeout)
+	{
+		if (connection.isOpen())
+		{
+			Args.notNull(message, "message");
+			return connection.sendMessageAsync(message.toString(), timeout);
+		}
+		else
+		{
+			LOG.warn("The websocket connection is already closed. Cannot push the text message '{}'", message);
+		}
+		return null;
+	}
+
+	@Override
+	public Future<Void> pushAsync(CharSequence message)
+	{
+		if (connection.isOpen())
+		{
+			Args.notNull(message, "message");
+			return connection.sendMessageAsync(message.toString());
+		}
+		else
+		{
+			LOG.warn("The websocket connection is already closed. Cannot push the text message '{}'", message);
+		}
+		return null;
+	}
+
 	@Override
 	public void push(byte[] message, int offset, int length)
 	{
@@ -97,6 +128,36 @@ public class WebSocketRequestHandler extends AbstractPartialPageRequestHandler i
 		}
 	}
 
+	@Override
+	public Future<Void> pushAsync(byte[] message, int offset, int length)
+	{
+		if (connection.isOpen())
+		{
+			Args.notNull(message, "message");
+			return connection.sendMessageAsync(message, offset, length);
+		}
+		else
+		{
+			LOG.warn("The websocket connection is already closed. Cannot push the binary message '{}'", message);
+		}
+		return null;
+	}
+
+	@Override
+	public Future<Void> pushAsync(byte[] message, int offset, int length, long timeout)
+	{
+		if (connection.isOpen())
+		{
+			Args.notNull(message, "message");
+			return connection.sendMessageAsync(message, offset, length);
+		}
+		else
+		{
+			LOG.warn("The websocket connection is already closed. Cannot push the binary message '{}'", message);
+		}
+		return null;
+	}
+
 	/**
 	 * @return if <code>true</code> then EMPTY partial updates will se send. If <code>false</code> then EMPTY
 	 *    partial updates will be skipped. A possible use case is: a page receives and a push event but no one is
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java
index af1c938afc..9cb330355b 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java
@@ -47,9 +47,12 @@ public class WebSocketResponse extends WebResponse
 
 	private boolean isRedirect = false;
 
-	public WebSocketResponse(final IWebSocketConnection conn)
+	private final boolean asynchronous;
+
+	public WebSocketResponse(final IWebSocketConnection conn, boolean asynchronous)
 	{
 		this.connection = conn;
+		this.asynchronous = asynchronous;
 	}
 
 	@Override
@@ -87,13 +90,27 @@ public class WebSocketResponse extends WebResponse
 			{
 				if (text != null)
 				{
-					connection.sendMessage(text.toString());
+					if (asynchronous)
+					{
+						connection.sendMessageAsync(text.toString());
+					}
+					else
+					{
+						connection.sendMessage(text.toString());
+					}
 					text = null;
 				}
 				else if (binary != null)
 				{
 					byte[] bytes = binary.toByteArray();
-					connection.sendMessage(bytes, 0, bytes.length);
+					if (asynchronous)
+					{
+						connection.sendMessageAsync(bytes, 0, bytes.length);
+					}
+					else
+					{
+						connection.sendMessage(bytes, 0, bytes.length);
+					}
 					binary.close();
 					binary = null;
 				}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
index 28dce651a5..ed4bd3748e 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Future;
 import org.apache.wicket.Application;
 import org.apache.wicket.protocol.http.WebApplication;
 import org.apache.wicket.protocol.ws.api.IWebSocketConnection;
+import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
 import org.apache.wicket.protocol.ws.api.registry.IKey;
 
 /**
@@ -92,14 +93,14 @@ abstract class TestWebSocketConnection implements IWebSocketConnection
     }
 
     @Override
-    public Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeOut)
+    public Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeout)
     {
         checkOpenness();
         onOutMessage(message, offset, length);
         return null;
     }
 
-    /**
+	/**
 	 * A callback method that is called when a text message should be send to the client
 	 *
 	 * @param message
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
index cc121e6b7a..013ecefc6c 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
@@ -158,6 +158,12 @@ abstract class TestWebSocketProcessor extends AbstractWebSocketProcessor
 			{
 				TestWebSocketProcessor.this.broadcastMessage(message);
 			}
+
+			@Override
+			public void sendMessageAsync(IWebSocketPushMessage message)
+			{
+				TestWebSocketProcessor.this.broadcastMessage(message);
+			}
 		});
 	}