You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@myfaces.apache.org by ta...@apache.org on 2022/10/19 10:30:19 UTC
[myfaces] branch main updated: MYFACES-4445: websockets refactoring (#348)
This is an automated email from the ASF dual-hosted git repository.
tandraschko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/myfaces.git
The following commit(s) were added to refs/heads/main by this push:
new 90ab05fa5 MYFACES-4445: websockets refactoring (#348)
90ab05fa5 is described below
commit 90ab05fa528fe412e6d8f81ed2810876de9ec1e8
Author: Milan Siebenbürger <mi...@gmail.com>
AuthorDate: Wed Oct 19 12:30:13 2022 +0200
MYFACES-4445: websockets refactoring (#348)
- reworked channel token generation - new token is generated only if needed (ie new session/view), just one token for application scope
- fixed usage of user-specified channel - now also works even in application scope calls
- changed default max idle timeout for websockets - according to omnifaces/mojarra spec
- specified default websocket scope - according to documentation (session scope for user-specified websocket as default)
---
.../java/org/apache/myfaces/push/EndpointImpl.java | 27 +-
.../myfaces/push/WebsocketComponentRenderer.java | 77 +++++-
.../apache/myfaces/push/cdi/PushContextImpl.java | 73 +-----
.../myfaces/push/cdi/WebsocketScopeManager.java | 14 +-
.../myfaces/push/cdi/WebsocketSessionManager.java | 282 +++++++++++++++++----
5 files changed, 338 insertions(+), 135 deletions(-)
diff --git a/impl/src/main/java/org/apache/myfaces/push/EndpointImpl.java b/impl/src/main/java/org/apache/myfaces/push/EndpointImpl.java
index 5c9e1a970..be7a54dc6 100644
--- a/impl/src/main/java/org/apache/myfaces/push/EndpointImpl.java
+++ b/impl/src/main/java/org/apache/myfaces/push/EndpointImpl.java
@@ -62,11 +62,24 @@ public class EndpointImpl extends Endpoint
if (Boolean.TRUE.equals(config.getUserProperties().get(WebsocketConfigurator.WEBSOCKET_VALID)) &&
sessionManager.addOrUpdateSession(channelToken, session))
{
+ // default value 0, could be reconfigured if needed
session.setMaxIdleTimeout((Long) config.getUserProperties().getOrDefault(
- WebsocketConfigurator.MAX_IDLE_TIMEOUT, 300000L));
+ WebsocketConfigurator.MAX_IDLE_TIMEOUT, 0));
Serializable user = (Serializable) session.getUserProperties().get(WebsocketConfigurator.WEBSOCKET_USER);
+ if (LOG.isLoggable(Level.FINE))
+ {
+ LOG.log(Level.FINE, "EndPointImpl.onOpen (channel = {0}, token = {1}, user = {2})",
+ new Object[] {channel, channelToken, user});
+ }
+
+ // register user
+ if (user != null)
+ {
+ sessionManager.registerUser(user, channel, channelToken);
+ }
+
beanManager.get().getEvent()
.select(WebsocketEvent.Opened.Literal.INSTANCE)
.fire(new WebsocketEvent(channel, user, null));
@@ -97,6 +110,11 @@ public class EndpointImpl extends Endpoint
String channelToken = session.getQueryString();
Serializable user = (Serializable) session.getUserProperties().get(WebsocketConfigurator.WEBSOCKET_USER);
+ if (LOG.isLoggable(Level.FINE))
+ {
+ LOG.log(Level.FINE, "EndPointImpl.onClose (channel = {0}, token = {1}, user = {2})",
+ new Object[] {channel, channelToken, user});
+ }
if (!beanManager.isInitialized())
{
@@ -115,7 +133,12 @@ public class EndpointImpl extends Endpoint
}
WebsocketSessionManager sessionManager = CDIUtils.get(beanManager.get(), WebsocketSessionManager.class);
- sessionManager.removeSession(channelToken);
+ sessionManager.removeSession(channelToken, session);
+ // deregister user
+ if (user != null)
+ {
+ sessionManager.deregisterUser(user, channel, channelToken);
+ }
beanManager.get().getEvent()
.select(WebsocketEvent.Closed.Literal.INSTANCE)
diff --git a/impl/src/main/java/org/apache/myfaces/push/WebsocketComponentRenderer.java b/impl/src/main/java/org/apache/myfaces/push/WebsocketComponentRenderer.java
index 5aab3f419..cd7972ac1 100644
--- a/impl/src/main/java/org/apache/myfaces/push/WebsocketComponentRenderer.java
+++ b/impl/src/main/java/org/apache/myfaces/push/WebsocketComponentRenderer.java
@@ -22,6 +22,9 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import jakarta.enterprise.inject.spi.BeanManager;
import jakarta.faces.FacesWrapper;
import jakarta.faces.component.UIComponent;
@@ -40,6 +43,7 @@ import org.apache.myfaces.cdi.util.CDIUtils;
import org.apache.myfaces.push.cdi.WebsocketChannelMetadata;
import org.apache.myfaces.push.cdi.WebsocketChannelTokenBuilder;
import org.apache.myfaces.push.cdi.WebsocketScopeManager;
+import org.apache.myfaces.push.cdi.WebsocketSessionManager;
import org.apache.myfaces.renderkit.html.util.ClientBehaviorRendererUtils;
import org.apache.myfaces.renderkit.html.util.HTML;
import org.apache.myfaces.renderkit.html.util.HtmlRendererUtils;
@@ -51,6 +55,7 @@ import org.apache.myfaces.renderkit.html.util.ResourceUtils;
@ListenerFor(systemEventClass = PostAddToViewEvent.class)
public class WebsocketComponentRenderer extends Renderer implements ComponentSystemEventListener
{
+ private static final Logger LOG = Logger.getLogger(WebsocketComponentRenderer.class.getName());
@Override
public void processEvent(ComponentSystemEvent event)
@@ -125,16 +130,61 @@ public class WebsocketComponentRenderer extends Renderer implements ComponentSys
// Create channel token
// TODO: Use ResponseStateManager to create the token
- String scope = component.getScope() == null
- ? WebsocketScopeManager.SCOPE_APPLICATION
- : component.getScope();
+ // The default scope is application. When the user attribute is specified, then the default scope is session.
+ String scope = component.getScope();
+ if (scope == null)
+ {
+ scope = (component.getUser() == null ?
+ WebsocketScopeManager.SCOPE_APPLICATION : WebsocketScopeManager.SCOPE_SESSION);
+ }
+
WebsocketChannelMetadata metadata = new WebsocketChannelMetadata(
channel, scope, component.getUser(), component.isConnected());
WebsocketScopeManager scopeManager = CDIUtils.get(beanManager, WebsocketScopeManager.class);
+ // try to find an existing channelToken
String channelToken = null;
- // Force a new channelToken if "connected" property is set to false, because in that case websocket creation
+ if (scopeManager.getScope(scope, true).isChannelAvailable(channel))
+ {
+ if (component.getUser() == null)
+ {
+ List<String> channelTokenList = scopeManager.getScope (scope, true).getChannelTokens(channel);
+ if (LOG.isLoggable(Level.FINE))
+ {
+ LOG.log(Level.FINE, "WebsocketComponentRenderer.encodeEnd: for channel = {0} found : ",
+ channel);
+ channelTokenList.forEach (p -> LOG.log(Level.FINE, " {0}", p));
+ }
+ // should be just one
+ if (channelTokenList.size() == 1)
+ {
+ channelToken = channelTokenList.get(0);
+ }
+ }
+ else
+ {
+ List<String> channelTokenList = scopeManager.getScope (scope, true)
+ .getChannelTokens(channel, component.getUser());
+ if (LOG.isLoggable(Level.FINE))
+ {
+ LOG.log(Level.FINE,
+ "WebsocketComponentRenderer.encodeEnd: for channel = {0}, user = {1} found : ",
+ new Object[] {channel, component.getUser()});
+ channelTokenList.forEach (p -> LOG.log(Level.FINE, " {0}", p));
+ }
+ // should be just one for combination channel / user
+ if (channelTokenList.size() == 1)
+ {
+ channelToken = channelTokenList.get(0);
+ }
+
+ }
+ }
+
+ // Create channel token if needed
+ // TODO: Use ResponseStateManager to create the token
+ // Force a new channelToken if "connected" property is set to false, because in that case websocket creation
if (!component.isConnected())
{
// This bean is required because you always need to register the token, so it can be properly destroyed
@@ -144,15 +194,18 @@ public class WebsocketComponentRenderer extends Renderer implements ComponentSys
{
// No channel token found for that combination, create a new token for this view
channelToken = channelTokenBuilder.createChannelToken(facesContext, channel);
-
- // Register channel in view scope to chain discard view algorithm using @PreDestroy
- scopeManager.getViewScope(true).registerToken(channelToken, metadata);
-
- // Register channel in session scope to allow validation on handshake WebsocketConfigurator)
- scopeManager.getSessionScope(true).registerToken(channelToken, metadata);
- }
+ scopeManager.getScope(scope, true).registerWebsocketSession(channelToken, metadata);
+ }
+
+ // Register channel in view scope to chain discard view algorithm using @PreDestroy
+ scopeManager.getViewScope(true).registerToken(channelToken, metadata);
+
+ // Register channel in session scope to allow validation on handshake WebsocketConfigurator)
+ scopeManager.getSessionScope(true).registerToken(channelToken, metadata);
- scopeManager.getScope(scope, true).registerWebsocketSession(channelToken, metadata);
+ // Prepare channelToken to websocket sessionMap (real session will be connected later)
+ WebsocketSessionManager sessionManager = CDIUtils.get(beanManager, WebsocketSessionManager.class);
+ sessionManager.registerSessionToken(channelToken);
writer.startElement(HTML.SCRIPT_ELEM, component);
HtmlRendererUtils.renderScriptType(facesContext, writer);
diff --git a/impl/src/main/java/org/apache/myfaces/push/cdi/PushContextImpl.java b/impl/src/main/java/org/apache/myfaces/push/cdi/PushContextImpl.java
index 36d367440..2294e64a1 100644
--- a/impl/src/main/java/org/apache/myfaces/push/cdi/PushContextImpl.java
+++ b/impl/src/main/java/org/apache/myfaces/push/cdi/PushContextImpl.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -46,17 +47,10 @@ public class PushContextImpl implements PushContext
this.sessionManager = CDIUtils.get(beanManager, WebsocketSessionManager.class);
}
- public String getChannel()
- {
- return channel;
- }
-
@Override
public Set<Future<Void>> send(Object message)
{
//1. locate the channel and define the context
- String channel = getChannel();
-
WebsocketScopeManager.AbstractScope applicationScope = scopeManager.getApplicationScope(false);
WebsocketScopeManager.AbstractScope viewScope = null;
WebsocketScopeManager.AbstractScope sessionScope = null;
@@ -86,12 +80,12 @@ public class PushContextImpl implements PushContext
// Use view scope for context
channelTokens = viewScope.getChannelTokens(channel);
}
- else if (sessionScope != null && sessionScope.isChannelAvailable(getChannel()))
+ else if (sessionScope != null && sessionScope.isChannelAvailable(channel))
{
// Use session scope for context
channelTokens = sessionScope.getChannelTokens(channel);
}
- else if (applicationScope != null && applicationScope.isChannelAvailable(getChannel()))
+ else if (applicationScope.isChannelAvailable(channel))
{
// Use application scope for context
channelTokens = applicationScope.getChannelTokens(channel);
@@ -131,64 +125,23 @@ public class PushContextImpl implements PushContext
@Override
public <S extends Serializable> Map<S, Set<Future<Void>>> send(Object message, Collection<S> users)
{
- //1. locate the channel and define the context
- String channel = getChannel();
- WebsocketScopeManager.AbstractScope applicationScope = scopeManager.getApplicationScope(false);
- WebsocketScopeManager.AbstractScope viewScope = null;
- WebsocketScopeManager.AbstractScope sessionScope = null;
+ Map<S, Set<Future<Void>>> resultsByUser = new HashMap<>(users.size());
- if (CDIUtils.isRequestScopeActive(beanManager))
+ for (S user : users)
{
- if (CDIUtils.isSessionScopeActive(beanManager))
- {
- sessionScope = scopeManager.getSessionScope(false);
- if (CDIUtils.isViewScopeActive(beanManager))
- {
- viewScope = scopeManager.getViewScope(false);
- }
- }
- }
-
- if (applicationScope == null)
- {
- // No base bean to push message
- return Collections.emptyMap();
- }
+ Set<String> channelTokenSet = sessionManager.getChannelTokensForUser(user, channel);
+ Set<Future<Void>> results = new HashSet<>(channelTokenSet.size());
- Map<S, Set<Future<Void>>> result = new HashMap<>();
-
- if (viewScope != null && viewScope.isChannelAvailable(channel))
- {
- // Use view scope for context
- for (S user : users)
- {
- result.put(user, send(viewScope.getChannelTokens(channel, user), message));
- }
- }
- else if (sessionScope != null && sessionScope.isChannelAvailable(getChannel()))
- {
- // Use session scope for context
- for (S user : users)
- {
- result.put(user, send(sessionScope.getChannelTokens(channel, user), message));
- }
- }
- else if (applicationScope != null && applicationScope.isChannelAvailable(getChannel()))
- {
- // Use application scope for context
- for (S user : users)
+ for (String channelToken : channelTokenSet)
{
- result.put(user, send(applicationScope.getChannelTokens(channel, user), message));
+ results.addAll(sessionManager.send(channelToken, message));
}
+
+ resultsByUser.put(user, results);
}
- else
- {
- throw new FacesException("CDI bean not found for push message");
- }
-
- //2. send the message
- return result;
+
+ return resultsByUser;
}
private Set<Future<Void>> send(List<String> channelTokens, Object message)
diff --git a/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketScopeManager.java b/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketScopeManager.java
index 1c6a1cb22..a08c77db6 100644
--- a/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketScopeManager.java
+++ b/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketScopeManager.java
@@ -103,9 +103,13 @@ public class WebsocketScopeManager
// When current session scope is about to be destroyed, deregister all session scope channels and
// explicitly close any open web sockets associated with it to avoid stale websockets.
// If any, also deregister session users.
- for (String token : tokens.keySet())
+ for (Map.Entry<String, WebsocketChannelMetadata> entry : tokens.entrySet())
{
- sessionManager.removeSession(token);
+ // remove channelToken - only if it is session scope
+ if (WebsocketScopeManager.SCOPE_SESSION.equals(entry.getValue().getScope()))
+ {
+ sessionManager.removeChannelToken(entry.getKey());
+ }
}
// we dont need to destroy child sockets ("view")
@@ -174,12 +178,6 @@ public class WebsocketScopeManager
}
}
- // remove sessions
- for (String token : tokens.keySet())
- {
- sessionManager.removeSession(token);
- }
-
channelTokens.clear();
tokens.clear();
}
diff --git a/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketSessionManager.java b/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketSessionManager.java
index 96d9479bb..61caf12f6 100644
--- a/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketSessionManager.java
+++ b/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketSessionManager.java
@@ -21,17 +21,30 @@ package org.apache.myfaces.push.cdi;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
+
+import java.io.IOException;
+import java.io.Serializable;
import java.lang.ref.Reference;
import java.lang.ref.SoftReference;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
+import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
import jakarta.faces.context.ExternalContext;
+import jakarta.websocket.CloseReason;
import jakarta.websocket.Session;
import org.apache.myfaces.push.WebsocketSessionClusterSerializedRestore;
@@ -40,12 +53,20 @@ import org.apache.myfaces.push.Json;
import org.apache.myfaces.util.lang.ConcurrentLRUCache;
import org.apache.myfaces.util.lang.Lazy;
+import static jakarta.websocket.CloseReason.CloseCodes.NORMAL_CLOSURE;
+
@ApplicationScoped
public class WebsocketSessionManager
{
- private Lazy<ConcurrentLRUCache<String, Reference<Session>>> sessionMap;
+ private Lazy<ConcurrentLRUCache<String, Collection<Reference<Session>>>> sessionMap;
+
+ private Lazy<ConcurrentHashMap<UserChannelKey, Set<String>>> userMap;
private Queue<String> restoreQueue;
+ private static final CloseReason REASON_EXPIRED = new CloseReason(NORMAL_CLOSURE, "Expired");
+
+ private static final Logger LOG = Logger.getLogger(WebsocketSessionManager.class.getName());
+
@PostConstruct
public void init()
{
@@ -55,17 +76,63 @@ public class WebsocketSessionManager
return new ConcurrentLRUCache<>((size * 4 + 3) / 3, size);
});
restoreQueue = new ConcurrentLinkedQueue<>();
+ userMap = new Lazy<>(ConcurrentHashMap::new);
}
- public ConcurrentLRUCache<String, Reference<Session>> getSessionMap()
+ public ConcurrentLRUCache<String, Collection<Reference<Session>>> getSessionMap()
{
return sessionMap.get();
}
+ public ConcurrentMap<UserChannelKey, Set<String>> getUserMap()
+ {
+ return userMap.get();
+ }
+
+ public void registerSessionToken(String channelToken)
+ {
+ if (this.getSessionMap().get(channelToken) == null)
+ {
+ this.getSessionMap().put(channelToken, new ConcurrentLinkedQueue<>());
+ }
+ }
+
+ public void registerUser(Serializable user, String channel, String channelToken)
+ {
+ UserChannelKey userChannelKey = new UserChannelKey(user, channel);
+
+ Set<String> channelTokenSet = getUserMap().computeIfAbsent(userChannelKey, k -> new HashSet<>(1));
+ channelTokenSet.add(channelToken);
+ }
+
+ public void deregisterUser(Serializable user, String channel, String channelToken)
+ {
+ UserChannelKey userChannelKey = new UserChannelKey(user, channel);
+
+ synchronized (getUserMap())
+ {
+ Set<String> channelTokenSet = getUserMap().get(userChannelKey);
+ if (channelTokenSet != null)
+ {
+ channelTokenSet.remove(channelToken);
+ if (channelTokenSet.isEmpty())
+ {
+ getUserMap().remove(userChannelKey);
+ }
+ }
+ }
+ }
+
+ public Set<String> getChannelTokensForUser(Serializable user, String channel)
+ {
+ UserChannelKey userChannelKey = new UserChannelKey(user, channel);
+ return getUserMap().get(userChannelKey);
+ }
+
public void initSessionMap(ExternalContext context)
{
int size = MyfacesConfig.getCurrentInstance(context).getWebsocketMaxConnections();
- ConcurrentLRUCache<String, Reference<Session>> newSessionMap
+ ConcurrentLRUCache<String, Collection<Reference<Session>>> newSessionMap
= new ConcurrentLRUCache<>((size * 4 + 3) / 3, size);
synchronized (sessionMap)
@@ -75,13 +142,20 @@ public class WebsocketSessionManager
// If a Session has been restored, it could be already a lruCache instantiated, so in this case
// we need to fill the new one with the old instances, but only the instances that are active
// at the moment.
- Set<Map.Entry<String, Reference<Session>>> entries = sessionMap.get()
+ Set<Map.Entry<String, Collection<Reference<Session>>>> entries = sessionMap.get()
.getLatestAccessedItems(MyfacesConfig.WEBSOCKET_MAX_CONNECTIONS_DEFAULT).entrySet();
- for (Map.Entry<String, Reference<Session>> entry : entries)
+ for (Map.Entry<String, Collection<Reference<Session>>> entry : entries)
{
- if (entry.getValue() != null && entry.getValue().get() != null && entry.getValue().get().isOpen())
+ Collection<Reference<Session>> referenceCollection = entry.getValue();
+ if (referenceCollection != null)
{
- newSessionMap.put(entry.getKey(), entry.getValue());
+ Collection<Reference<Session>> newReferenceCollection =
+ referenceCollection
+ .stream()
+ .filter(p -> p.get() != null && p.get().isOpen())
+ .distinct()
+ .collect(Collectors.toCollection(ConcurrentLinkedQueue::new));
+ newSessionMap.put(entry.getKey(), newReferenceCollection);
}
}
}
@@ -101,14 +175,22 @@ public class WebsocketSessionManager
public boolean addOrUpdateSession(String channelToken, Session session)
{
- Reference oldInstance = getSessionMap().get(channelToken);
- if (oldInstance == null)
+ if (LOG.isLoggable(Level.FINE))
{
- getSessionMap().put(channelToken, new SoftReference<>(session));
+ LOG.log (Level.FINE, "WebsocketSessionManager: addOrUpdateSession for channelToken = {0}, " +
+ "session.id = {1}", new Object[] {channelToken ,session.getId()});
}
- else if (!session.equals(oldInstance.get()))
+ Collection<Reference<Session>> sessions = this.getSessionMap().get(channelToken);
+ if (sessions == null)
{
- getSessionMap().put(channelToken, new SoftReference<>(session));
+ registerSessionToken(channelToken);
+ }
+ Optional<Reference<Session>> referenceOptional =
+ sessions.stream().filter(p -> Objects.equals(p.get(), session)).findFirst();
+
+ if (!referenceOptional.isPresent())
+ {
+ return sessions.add(new SoftReference<>(session));
}
return true;
}
@@ -122,37 +204,89 @@ public class WebsocketSessionManager
* @param channelToken
* @return
*/
- public boolean removeSession(String channelToken)
+ public void removeSession(String channelToken, Session session)
{
+ if (LOG.isLoggable(Level.FINE))
+ {
+ LOG.log (Level.FINE, "WebsocketSessionManager: removeSession for channelToken = {0}, " +
+ "session.id = {1}", new Object[] {channelToken ,session.getId()});
+ }
+ Collection<Reference<Session>> collection = getSessionMap().get(channelToken);
+ Optional<Reference<Session>> referenceOptional =
+ collection.stream().filter(p -> Objects.equals(p.get(), session)).findFirst();
+ referenceOptional.ifPresent(collection::remove);
+ }
+
+ /**
+ * Remove the channelToken and close all sessions associated with it. Happens, when session scope
+ * or view scope is destroyed.
+ * @param channelToken
+ */
+ public void removeChannelToken(String channelToken)
+ {
+ // close all sessions associated with this channelToken
+ Collection<Reference<Session>> sessions = getSessionMap().get(channelToken);
+
+ if (sessions != null)
+ {
+ for (Reference<Session> sessionReference : sessions)
+ {
+ Session session = sessionReference.get();
+ if (session != null && session.isOpen())
+ {
+ try
+ {
+ session.close(REASON_EXPIRED);
+ }
+ catch (IOException ignore)
+ {
+ // ignored
+ }
+ }
+ }
+ }
+
getSessionMap().remove(channelToken);
- return false;
}
-
-
+
protected Set<Future<Void>> send(String channelToken, Object message)
{
// Before send, we need to check
synchronizeSessionInstances();
Set<Future<Void>> results = new HashSet<>(1);
- Reference<Session> sessionRef = (channelToken != null) ? getSessionMap().get(channelToken) : null;
+ Collection<Reference<Session>> sessions = (channelToken != null) ? getSessionMap().get(channelToken) : null;
- if (sessionRef != null && sessionRef.get() != null)
+ if (sessions != null && !sessions.isEmpty())
{
String json = Json.encode(message);
- Session session = sessionRef.get();
- if (session.isOpen())
- {
- send(session, json, results, 0);
- }
- else
- {
- //If session is not open, remove the session, because a websocket session after is closed cannot
- //be alive.
- getSessionMap().remove(channelToken);
- }
+
+ sessions.forEach (
+ sessionRef ->
+ {
+ if (sessionRef != null && sessionRef.get() != null)
+ {
+ Session session = sessionRef.get();
+ if (session.isOpen())
+ {
+ send(session, json, results, 0);
+ }
+ else
+ {
+ //If session is not open, remove the session, because a websocket
+ // session after is closed cannot
+ //be alive.
+ removeSession(channelToken, session);
+ }
+ }
+ }
+ );
+ return results;
+ }
+ else
+ {
+ return Collections.emptySet();
}
- return results;
}
private final String WARNING_TOMCAT_WEB_SOCKET_BOMBED =
@@ -204,7 +338,7 @@ public class WebsocketSessionManager
&& illegalStateException.getMessage().contains("[TEXT_FULL_WRITING]");
}
- private void synchronizeSessionInstances()
+ public void synchronizeSessionInstances()
{
Queue<String> queue = getRestoredQueue();
// The queue is always empty, unless a deserialization of Session instances happen. If that happens,
@@ -214,34 +348,42 @@ public class WebsocketSessionManager
if (!queue.isEmpty())
{
// It is necessary to have at least 1 registered Session instance to call getOpenSessions() and get all
- // instances associated to jakarta.faces.push Endpoint.
- Map<String, Reference<Session>> map = getSessionMap().getLatestAccessedItems(1);
+ // instances associated to javax.faces.push Endpoint.
+ Map<String, Collection<Reference<Session>>> map = getSessionMap().getLatestAccessedItems(1);
if (map != null && !map.isEmpty())
{
- Reference<Session> ref = map.values().iterator().next();
- if (ref != null)
- {
- Session s = ref.get();
- if (s != null)
- {
- Set<Session> set = s.getOpenSessions();
-
- for (Iterator<Session> it = set.iterator(); it.hasNext();)
+
+ Collection<Reference<Session>> collectionRef = map.values().iterator().next();
+
+ collectionRef.forEach( ref ->
{
- Session instance = it.next();
- WebsocketSessionClusterSerializedRestore r =
- (WebsocketSessionClusterSerializedRestore) instance.getUserProperties().get(
- WebsocketSessionClusterSerializedRestore.WEBSOCKET_SESSION_SERIALIZED_RESTORE);
- if (r != null && r.isDeserialized())
+ if (ref != null)
+ {
+ Session s = ref.get();
+ if (s != null)
{
- addOrUpdateSession(r.getChannelToken(), s);
+ Set<Session> set = s.getOpenSessions();
+
+ for (Iterator<Session> it = set.iterator(); it.hasNext(); )
+ {
+ Session instance = it.next();
+ WebsocketSessionClusterSerializedRestore r =
+ (WebsocketSessionClusterSerializedRestore) instance.
+ getUserProperties().get(
+ WebsocketSessionClusterSerializedRestore.WEBSOCKET_SESSION_SERIALIZED_RESTORE
+ );
+ if (r != null && r.isDeserialized())
+ {
+ addOrUpdateSession(r.getChannelToken(), s);
+ }
+ }
+
+ // Remove one element from the queue
+ queue.poll();
}
}
-
- // Remove one element from the queue
- queue.poll();
}
- }
+ );
}
}
}
@@ -250,5 +392,39 @@ public class WebsocketSessionManager
{
return restoreQueue;
}
-
+
+
+ private class UserChannelKey implements Serializable
+ {
+
+ private final Serializable user;
+ private final String channel;
+ public UserChannelKey(Serializable user, String channel)
+ {
+ this.user = user;
+ this.channel = channel;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+ UserChannelKey that = (UserChannelKey) o;
+ return Objects.equals(user, that.user) && Objects.equals(channel, that.channel);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(user, channel);
+ }
+ }
+
}