You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wicket.apache.org by re...@apache.org on 2022/04/05 16:46:24 UTC

[wicket] branch feature/reiern70/WICKET-6969 created (now 021b63d875)

This is an automated email from the ASF dual-hosted git repository.

reiern70 pushed a change to branch feature/reiern70/WICKET-6969
in repository https://gitbox.apache.org/repos/asf/wicket.git


      at 021b63d875 {WICKET-6969} allow asynchronous pushing of messages.

This branch includes the following new commits:

     new 021b63d875 {WICKET-6969} allow asynchronous pushing of messages.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[wicket] 01/01: {WICKET-6969} allow asynchronous pushing of messages.

Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

reiern70 pushed a commit to branch feature/reiern70/WICKET-6969
in repository https://gitbox.apache.org/repos/asf/wicket.git

commit 021b63d8752da8d679a2f5f0ee0f89b74f8b03af
Author: reiern70 <re...@gmail.com>
AuthorDate: Tue Apr 5 10:06:11 2022 -0600

    {WICKET-6969} allow asynchronous pushing of messages.
---
 .../wicket/protocol/ws/WebSocketSettings.java      | 31 ++++++++---
 .../ws/api/AbstractWebSocketConnection.java        | 10 +++-
 .../ws/api/AbstractWebSocketProcessor.java         |  8 +--
 .../protocol/ws/api/IWebSocketConnection.java      | 21 ++++++--
 .../protocol/ws/api/IWebSocketRequestHandler.java  | 54 +++++++++++++++++++
 .../protocol/ws/api/WebSocketPushBroadcaster.java  |  9 +++-
 .../protocol/ws/api/WebSocketRequestHandler.java   | 61 ++++++++++++++++++++++
 .../wicket/protocol/ws/api/WebSocketResponse.java  | 23 ++++++--
 .../ws/util/tester/TestWebSocketConnection.java    |  5 +-
 .../ws/util/tester/TestWebSocketProcessor.java     |  6 +++
 10 files changed, 206 insertions(+), 22 deletions(-)

diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
index df78a18bbf..e151b1f3d3 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java
@@ -158,11 +158,18 @@ public class WebSocketSettings
 	 */
 	private Function<Integer, Boolean> notifyOnCloseEvent = (code) -> true;
 
-	public boolean shouldNotifyOnCloseEvent(int closeCode) {
+	/**
+	 * Flag that allows to use asynchronous push. By default, it is set to false.
+	 */
+	private boolean asynchronousPush = false;
+
+	public boolean shouldNotifyOnCloseEvent(int closeCode)
+	{
 		return notifyOnCloseEvent == null || notifyOnCloseEvent.apply(closeCode);
 	}
 
-	public void setNotifyOnCloseEvent(Function<Integer, Boolean> notifyOnCloseEvent) {
+	public void setNotifyOnCloseEvent(Function<Integer, Boolean> notifyOnCloseEvent)
+	{
 		this.notifyOnCloseEvent = notifyOnCloseEvent;
 	}
 
@@ -174,11 +181,13 @@ public class WebSocketSettings
 	 */
 	private Function<Throwable, Boolean> notifyOnErrorEvent = (throwable) -> true;
 
-	public boolean shouldNotifyOnErrorEvent(Throwable throwable) {
+	public boolean shouldNotifyOnErrorEvent(Throwable throwable)
+	{
 		return notifyOnErrorEvent == null || notifyOnErrorEvent.apply(throwable);
 	}
 
-	public void setNotifyOnErrorEvent(Function<Throwable, Boolean> notifyOnErrorEvent) {
+	public void setNotifyOnErrorEvent(Function<Throwable, Boolean> notifyOnErrorEvent)
+	{
 		this.notifyOnErrorEvent = notifyOnErrorEvent;
 	}
 
@@ -303,9 +312,9 @@ public class WebSocketSettings
 	 *              The active web socket connection
 	 * @return the response object that should be used to write the response back to the client
 	 */
-	public WebResponse newWebSocketResponse(IWebSocketConnection connection)
+	public WebResponse newWebSocketResponse(IWebSocketConnection connection, boolean asynchronousPush)
 	{
-		return new WebSocketResponse(connection);
+		return new WebSocketResponse(connection, asynchronousPush);
 	}
 
 	/**
@@ -497,4 +506,14 @@ public class WebSocketSettings
 			return new Thread(r, "Wicket-WebSocket-HttpRequest-Thread-" + counter.getAndIncrement());
 		}
 	}
+
+	public void setAsynchronousPush(boolean asynchronousPush)
+	{
+		this.asynchronousPush = asynchronousPush;
+	}
+
+	public boolean isAsynchronousPush()
+	{
+		return asynchronousPush;
+	}
 }
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java
index 3cd0f627d4..c9b64029b7 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java
@@ -21,6 +21,8 @@ import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
 import org.apache.wicket.protocol.ws.api.registry.IKey;
 import org.apache.wicket.util.lang.Args;
 
+import java.util.concurrent.Future;
+
 /**
  * Abstract class handling the Web Socket broadcast messages.
  */
@@ -50,7 +52,13 @@ public abstract class AbstractWebSocketConnection implements IWebSocketConnectio
 	@Override
 	public void sendMessage(IWebSocketPushMessage message)
 	{
-		webSocketProcessor.broadcastMessage(message, this);
+		webSocketProcessor.broadcastMessage(message, this, false);
+	}
+
+	@Override
+	public void sendMessageAsync(IWebSocketPushMessage message)
+	{
+		webSocketProcessor.broadcastMessage(message, this, true);
 	}
 
 	@Override
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
index bf5a6a812d..4159465451 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java
@@ -184,7 +184,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
 			}
 		}
 
-		broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key), connection);
+		broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key), connection, webSocketSettings.isAsynchronousPush());
 	}
 
 	@Override
@@ -210,7 +210,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
 	{
 		IKey key = getRegistryKey();
 		IWebSocketConnection connection = connectionRegistry.getConnection(application, sessionId, key);
-		broadcastMessage(message, connection);
+		broadcastMessage(message, connection, webSocketSettings.isAsynchronousPush());
 	}
 
 	/**
@@ -225,7 +225,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
 	 * @param message
 	 *      the message to broadcast
 	 */
-	public final void broadcastMessage(final IWebSocketMessage message, IWebSocketConnection connection)
+	public final void broadcastMessage(final IWebSocketMessage message, IWebSocketConnection connection, boolean asynchronousPush)
 	{
 		if (connection != null && (connection.isOpen() || isSpecialMessage(message)))
 		{
@@ -233,7 +233,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
 			Session oldSession = ThreadContext.getSession();
 			RequestCycle oldRequestCycle = ThreadContext.getRequestCycle();
 
-			WebResponse webResponse = webSocketSettings.newWebSocketResponse(connection);
+			WebResponse webResponse = webSocketSettings.newWebSocketResponse(connection, asynchronousPush);
 			try
 			{
 				WebSocketRequestMapper requestMapper = new WebSocketRequestMapper(application.getRootRequestMapper());
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
index 7275559bd5..762025602c 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java
@@ -71,11 +71,11 @@ public interface IWebSocketConnection
      *
      * @param message
      *      the text message
-     * @param timeOut
+     * @param timeout
      *      the timeout for operation
      * @return a {@link java.util.concurrent.Future} representing the send operation
      */
-    Future<Void> sendMessageAsync(String message, long timeOut);
+    Future<Void> sendMessageAsync(String message, long timeout);
 
 	/**
 	 * Sends a binary message to the client.
@@ -113,11 +113,11 @@ public interface IWebSocketConnection
      *      the offset to read from
      * @param length
      *      how much data to read
-     * @param timeOut
-     *      *      the timeout for operation
+     * @param timeout
+     *      the timeout for operation
      * @return a {@link java.util.concurrent.Future} representing the send operation
      */
-    Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeOut);
+    Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeout);
 
 	/**
 	 * Broadcasts a push message to the wicket page (and it's components) associated with this
@@ -130,6 +130,17 @@ public interface IWebSocketConnection
 	 */
 	void sendMessage(IWebSocketPushMessage message);
 
+	/**
+	 * Broadcasts a push message to the wicket page (and it's components) associated with this
+	 * connection. The components can then send messages or component updates to client by adding
+	 * them to the target. Pushing to client is done asynchronously.
+	 *
+	 * @param message
+	 *     the push message to send
+	 *
+	 */
+	void sendMessageAsync(IWebSocketPushMessage message);
+
 	/**
 	 * @return The application for which this WebSocket connection is registered
 	 */
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java
index 8d49155c28..eae4504d04 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketRequestHandler.java
@@ -19,6 +19,8 @@ package org.apache.wicket.protocol.ws.api;
 import org.apache.wicket.core.request.handler.IPartialPageRequestHandler;
 import org.apache.wicket.request.ILoggableRequestHandler;
 
+import java.util.concurrent.Future;
+
 /**
  * An interface for outbound communication with web socket clients
  *
@@ -34,6 +36,28 @@ public interface IWebSocketRequestHandler extends IPartialPageRequestHandler, IL
 	 */
 	void push(CharSequence message);
 
+	/**
+	 * Pushes a text message to the client in an asynchronous way.
+	 *
+	 * @param message
+	 *      the text message to push to the client if the web socket connection is open
+	 * @return
+	 *      a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed.
+	 */
+	Future<Void> pushAsync(CharSequence message);
+
+	/**
+	 * Pushes a text message to the client in an asynchronous way.
+	 *
+	 * @param message
+	 *      the text message to push to the client if the web socket connection is open
+	 * @param timeout
+	 *      the timeout for operation
+	 * @return
+	 * 		a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed.
+	 */
+	Future<Void> pushAsync(CharSequence message, long timeout);
+
 	/**
 	 * Pushes a binary message to the client.
 	 *
@@ -45,4 +69,34 @@ public interface IWebSocketRequestHandler extends IPartialPageRequestHandler, IL
 	 *      how many bytes to read from the message
 	 */
 	void push(byte[] message, int offset, int length);
+
+	/**
+	 * Pushes a binary message to the client.
+	 *
+	 * @param message
+	 *      the binary message to push to the client if the web socket connection is open
+	 * @param offset
+	 *      the offset to start to read from the message
+	 * @param length
+	 *      how many bytes to read from the message
+	 * @return
+	 * 		a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed.
+	 */
+	Future<Void> pushAsync(byte[] message, int offset, int length);
+
+	/**
+	 * Pushes a binary message to the client.
+	 *
+	 * @param message
+	 *      the binary message to push to the client if the web socket connection is open
+	 * @param offset
+	 *      the offset to start to read from the message
+	 * @param length
+	 *      how many bytes to read from the message
+	 * @param timeout
+	 *      the timeout for operation
+	 * @return
+	 * 		a {@link java.util.concurrent.Future} representing the send operation. Or null if connection is closed.
+	 */
+	Future<Void> pushAsync(byte[] message, int offset, int length, long timeout);
 }
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java
index c5f6d78e02..23a71476f9 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java
@@ -189,7 +189,14 @@ public class WebSocketPushBroadcaster
 				@Override
 				public void run()
 				{
-					wsConnection.sendMessage(message);
+					if (webSocketSettings.isAsynchronousPush())
+					{
+						wsConnection.sendMessageAsync(message);
+					}
+					else
+					{
+						wsConnection.sendMessage(message);
+					}
 				}
 			});
 		}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java
index 7ce1f620b5..58d9f4d280 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketRequestHandler.java
@@ -19,6 +19,7 @@ package org.apache.wicket.protocol.ws.api;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.Future;
 
 import org.apache.wicket.Component;
 import org.apache.wicket.Page;
@@ -77,6 +78,36 @@ public class WebSocketRequestHandler extends AbstractPartialPageRequestHandler i
 		}
 	}
 
+	@Override
+	public Future<Void> pushAsync(CharSequence message, long timeout)
+	{
+		if (connection.isOpen())
+		{
+			Args.notNull(message, "message");
+			return connection.sendMessageAsync(message.toString(), timeout);
+		}
+		else
+		{
+			LOG.warn("The websocket connection is already closed. Cannot push the text message '{}'", message);
+		}
+		return null;
+	}
+
+	@Override
+	public Future<Void> pushAsync(CharSequence message)
+	{
+		if (connection.isOpen())
+		{
+			Args.notNull(message, "message");
+			return connection.sendMessageAsync(message.toString());
+		}
+		else
+		{
+			LOG.warn("The websocket connection is already closed. Cannot push the text message '{}'", message);
+		}
+		return null;
+	}
+
 	@Override
 	public void push(byte[] message, int offset, int length)
 	{
@@ -97,6 +128,36 @@ public class WebSocketRequestHandler extends AbstractPartialPageRequestHandler i
 		}
 	}
 
+	@Override
+	public Future<Void> pushAsync(byte[] message, int offset, int length)
+	{
+		if (connection.isOpen())
+		{
+			Args.notNull(message, "message");
+			return connection.sendMessageAsync(message, offset, length);
+		}
+		else
+		{
+			LOG.warn("The websocket connection is already closed. Cannot push the binary message '{}'", message);
+		}
+		return null;
+	}
+
+	@Override
+	public Future<Void> pushAsync(byte[] message, int offset, int length, long timeout)
+	{
+		if (connection.isOpen())
+		{
+			Args.notNull(message, "message");
+			return connection.sendMessageAsync(message, offset, length);
+		}
+		else
+		{
+			LOG.warn("The websocket connection is already closed. Cannot push the binary message '{}'", message);
+		}
+		return null;
+	}
+
 	/**
 	 * @return if <code>true</code> then EMPTY partial updates will se send. If <code>false</code> then EMPTY
 	 *    partial updates will be skipped. A possible use case is: a page receives and a push event but no one is
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java
index af1c938afc..9cb330355b 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java
@@ -47,9 +47,12 @@ public class WebSocketResponse extends WebResponse
 
 	private boolean isRedirect = false;
 
-	public WebSocketResponse(final IWebSocketConnection conn)
+	private final boolean asynchronous;
+
+	public WebSocketResponse(final IWebSocketConnection conn, boolean asynchronous)
 	{
 		this.connection = conn;
+		this.asynchronous = asynchronous;
 	}
 
 	@Override
@@ -87,13 +90,27 @@ public class WebSocketResponse extends WebResponse
 			{
 				if (text != null)
 				{
-					connection.sendMessage(text.toString());
+					if (asynchronous)
+					{
+						connection.sendMessageAsync(text.toString());
+					}
+					else
+					{
+						connection.sendMessage(text.toString());
+					}
 					text = null;
 				}
 				else if (binary != null)
 				{
 					byte[] bytes = binary.toByteArray();
-					connection.sendMessage(bytes, 0, bytes.length);
+					if (asynchronous)
+					{
+						connection.sendMessageAsync(bytes, 0, bytes.length);
+					}
+					else
+					{
+						connection.sendMessage(bytes, 0, bytes.length);
+					}
 					binary.close();
 					binary = null;
 				}
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
index 28dce651a5..ed4bd3748e 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Future;
 import org.apache.wicket.Application;
 import org.apache.wicket.protocol.http.WebApplication;
 import org.apache.wicket.protocol.ws.api.IWebSocketConnection;
+import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
 import org.apache.wicket.protocol.ws.api.registry.IKey;
 
 /**
@@ -92,14 +93,14 @@ abstract class TestWebSocketConnection implements IWebSocketConnection
     }
 
     @Override
-    public Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeOut)
+    public Future<Void> sendMessageAsync(byte[] message, int offset, int length, long timeout)
     {
         checkOpenness();
         onOutMessage(message, offset, length);
         return null;
     }
 
-    /**
+	/**
 	 * A callback method that is called when a text message should be send to the client
 	 *
 	 * @param message
diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
index cc121e6b7a..013ecefc6c 100644
--- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
+++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java
@@ -158,6 +158,12 @@ abstract class TestWebSocketProcessor extends AbstractWebSocketProcessor
 			{
 				TestWebSocketProcessor.this.broadcastMessage(message);
 			}
+
+			@Override
+			public void sendMessageAsync(IWebSocketPushMessage message)
+			{
+				TestWebSocketProcessor.this.broadcastMessage(message);
+			}
 		});
 	}