You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@myfaces.apache.org by ta...@apache.org on 2022/10/19 10:30:19 UTC

[myfaces] branch main updated: MYFACES-4445: websockets refactoring (#348)

This is an automated email from the ASF dual-hosted git repository.

tandraschko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/myfaces.git


The following commit(s) were added to refs/heads/main by this push:
     new 90ab05fa5 MYFACES-4445: websockets refactoring (#348)
90ab05fa5 is described below

commit 90ab05fa528fe412e6d8f81ed2810876de9ec1e8
Author: Milan Siebenbürger <mi...@gmail.com>
AuthorDate: Wed Oct 19 12:30:13 2022 +0200

    MYFACES-4445: websockets refactoring (#348)
    
    - reworked channel token generation - new token is generated only if needed (ie new session/view), just one token for application scope
    - fixed usage of user-specified channel - now also works even in application scope calls
    - changed default max idle timeout for websockets - according to omnifaces/mojarra spec
    - specified default websocket scope - according to documentation (session scope for user-specified websocket as default)
---
 .../java/org/apache/myfaces/push/EndpointImpl.java |  27 +-
 .../myfaces/push/WebsocketComponentRenderer.java   |  77 +++++-
 .../apache/myfaces/push/cdi/PushContextImpl.java   |  73 +-----
 .../myfaces/push/cdi/WebsocketScopeManager.java    |  14 +-
 .../myfaces/push/cdi/WebsocketSessionManager.java  | 282 +++++++++++++++++----
 5 files changed, 338 insertions(+), 135 deletions(-)

diff --git a/impl/src/main/java/org/apache/myfaces/push/EndpointImpl.java b/impl/src/main/java/org/apache/myfaces/push/EndpointImpl.java
index 5c9e1a970..be7a54dc6 100644
--- a/impl/src/main/java/org/apache/myfaces/push/EndpointImpl.java
+++ b/impl/src/main/java/org/apache/myfaces/push/EndpointImpl.java
@@ -62,11 +62,24 @@ public class EndpointImpl extends Endpoint
         if (Boolean.TRUE.equals(config.getUserProperties().get(WebsocketConfigurator.WEBSOCKET_VALID)) &&
                 sessionManager.addOrUpdateSession(channelToken, session))
         {
+            // default value 0, could be reconfigured if needed
             session.setMaxIdleTimeout((Long) config.getUserProperties().getOrDefault(
-                    WebsocketConfigurator.MAX_IDLE_TIMEOUT, 300000L));
+                    WebsocketConfigurator.MAX_IDLE_TIMEOUT, 0));
 
             Serializable user = (Serializable) session.getUserProperties().get(WebsocketConfigurator.WEBSOCKET_USER);
 
+            if (LOG.isLoggable(Level.FINE))
+            {
+                LOG.log(Level.FINE, "EndPointImpl.onOpen (channel = {0}, token = {1}, user = {2})",
+                        new Object[] {channel, channelToken, user});
+            }
+
+            // register user
+            if (user != null)
+            {
+                sessionManager.registerUser(user, channel, channelToken);
+            }
+
             beanManager.get().getEvent()
                     .select(WebsocketEvent.Opened.Literal.INSTANCE)
                     .fire(new WebsocketEvent(channel, user, null));
@@ -97,6 +110,11 @@ public class EndpointImpl extends Endpoint
         String channelToken = session.getQueryString();
 
         Serializable user = (Serializable) session.getUserProperties().get(WebsocketConfigurator.WEBSOCKET_USER);
+        if (LOG.isLoggable(Level.FINE))
+        {
+            LOG.log(Level.FINE, "EndPointImpl.onClose (channel = {0}, token = {1}, user = {2})",
+                    new Object[] {channel, channelToken, user});
+        }
 
         if (!beanManager.isInitialized())
         {
@@ -115,7 +133,12 @@ public class EndpointImpl extends Endpoint
         }
 
         WebsocketSessionManager sessionManager = CDIUtils.get(beanManager.get(), WebsocketSessionManager.class);
-        sessionManager.removeSession(channelToken);
+        sessionManager.removeSession(channelToken, session);
+        // deregister user
+        if (user != null)
+        {
+            sessionManager.deregisterUser(user, channel, channelToken);
+        }
 
         beanManager.get().getEvent()
                 .select(WebsocketEvent.Closed.Literal.INSTANCE)
diff --git a/impl/src/main/java/org/apache/myfaces/push/WebsocketComponentRenderer.java b/impl/src/main/java/org/apache/myfaces/push/WebsocketComponentRenderer.java
index 5aab3f419..cd7972ac1 100644
--- a/impl/src/main/java/org/apache/myfaces/push/WebsocketComponentRenderer.java
+++ b/impl/src/main/java/org/apache/myfaces/push/WebsocketComponentRenderer.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import jakarta.enterprise.inject.spi.BeanManager;
 import jakarta.faces.FacesWrapper;
 import jakarta.faces.component.UIComponent;
@@ -40,6 +43,7 @@ import org.apache.myfaces.cdi.util.CDIUtils;
 import org.apache.myfaces.push.cdi.WebsocketChannelMetadata;
 import org.apache.myfaces.push.cdi.WebsocketChannelTokenBuilder;
 import org.apache.myfaces.push.cdi.WebsocketScopeManager;
+import org.apache.myfaces.push.cdi.WebsocketSessionManager;
 import org.apache.myfaces.renderkit.html.util.ClientBehaviorRendererUtils;
 import org.apache.myfaces.renderkit.html.util.HTML;
 import org.apache.myfaces.renderkit.html.util.HtmlRendererUtils;
@@ -51,6 +55,7 @@ import org.apache.myfaces.renderkit.html.util.ResourceUtils;
 @ListenerFor(systemEventClass = PostAddToViewEvent.class)
 public class WebsocketComponentRenderer extends Renderer implements ComponentSystemEventListener
 {
+    private static final Logger LOG = Logger.getLogger(WebsocketComponentRenderer.class.getName());
 
     @Override
     public void processEvent(ComponentSystemEvent event)
@@ -125,16 +130,61 @@ public class WebsocketComponentRenderer extends Renderer implements ComponentSys
 
         // Create channel token 
         // TODO: Use ResponseStateManager to create the token
-        String scope = component.getScope() == null
-                ? WebsocketScopeManager.SCOPE_APPLICATION
-                : component.getScope();
+        // The default scope is application. When the user attribute is specified, then the default scope is session.
+        String scope = component.getScope();
+        if (scope == null)
+        {
+            scope = (component.getUser() == null ?
+                    WebsocketScopeManager.SCOPE_APPLICATION : WebsocketScopeManager.SCOPE_SESSION);
+        }
+
         WebsocketChannelMetadata metadata = new WebsocketChannelMetadata(
                 channel, scope, component.getUser(), component.isConnected());
 
         WebsocketScopeManager scopeManager = CDIUtils.get(beanManager, WebsocketScopeManager.class);
 
+        // try to find an existing channelToken
         String channelToken = null;
-        // Force a new channelToken if "connected" property is set to false, because in that case websocket creation 
+        if (scopeManager.getScope(scope, true).isChannelAvailable(channel))
+        {
+            if (component.getUser() == null)
+            {
+                List<String> channelTokenList = scopeManager.getScope (scope, true).getChannelTokens(channel);
+                if (LOG.isLoggable(Level.FINE))
+                {
+                    LOG.log(Level.FINE, "WebsocketComponentRenderer.encodeEnd: for channel = {0} found : ",
+                            channel);
+                    channelTokenList.forEach (p -> LOG.log(Level.FINE, "  {0}", p));
+                }
+                // should be just one
+                if (channelTokenList.size() == 1)
+                {
+                    channelToken = channelTokenList.get(0);
+                }
+            }
+            else
+            {
+                List<String> channelTokenList = scopeManager.getScope (scope, true)
+                        .getChannelTokens(channel, component.getUser());
+                if (LOG.isLoggable(Level.FINE))
+                {
+                    LOG.log(Level.FINE,
+                            "WebsocketComponentRenderer.encodeEnd: for channel = {0}, user = {1} found : ",
+                            new Object[] {channel, component.getUser()});
+                    channelTokenList.forEach (p -> LOG.log(Level.FINE, "  {0}", p));
+                }
+                // should be just one for combination channel / user
+                if (channelTokenList.size() == 1)
+                {
+                    channelToken = channelTokenList.get(0);
+                }
+
+            }
+        }
+
+        // Create channel token if needed
+        // TODO: Use ResponseStateManager to create the token
+        // Force a new channelToken if "connected" property is set to false, because in that case websocket creation
         if (!component.isConnected())
         {
             // This bean is required because you always need to register the token, so it can be properly destroyed
@@ -144,15 +194,18 @@ public class WebsocketComponentRenderer extends Renderer implements ComponentSys
         {
             // No channel token found for that combination, create a new token for this view
             channelToken = channelTokenBuilder.createChannelToken(facesContext, channel);
-            
-            // Register channel in view scope to chain discard view algorithm using @PreDestroy
-            scopeManager.getViewScope(true).registerToken(channelToken, metadata);
-            
-            // Register channel in session scope to allow validation on handshake  WebsocketConfigurator)
-            scopeManager.getSessionScope(true).registerToken(channelToken, metadata);
-        }        
+            scopeManager.getScope(scope, true).registerWebsocketSession(channelToken, metadata);
+        }
+
+        // Register channel in view scope to chain discard view algorithm using @PreDestroy
+        scopeManager.getViewScope(true).registerToken(channelToken, metadata);
+
+        // Register channel in session scope to allow validation on handshake  WebsocketConfigurator)
+        scopeManager.getSessionScope(true).registerToken(channelToken, metadata);
 
-        scopeManager.getScope(scope, true).registerWebsocketSession(channelToken, metadata);
+        // Prepare channelToken to websocket sessionMap (real session will be connected later)
+        WebsocketSessionManager sessionManager = CDIUtils.get(beanManager, WebsocketSessionManager.class);
+        sessionManager.registerSessionToken(channelToken);
 
         writer.startElement(HTML.SCRIPT_ELEM, component);
         HtmlRendererUtils.renderScriptType(facesContext, writer);
diff --git a/impl/src/main/java/org/apache/myfaces/push/cdi/PushContextImpl.java b/impl/src/main/java/org/apache/myfaces/push/cdi/PushContextImpl.java
index 36d367440..2294e64a1 100644
--- a/impl/src/main/java/org/apache/myfaces/push/cdi/PushContextImpl.java
+++ b/impl/src/main/java/org/apache/myfaces/push/cdi/PushContextImpl.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -46,17 +47,10 @@ public class PushContextImpl implements PushContext
         this.sessionManager = CDIUtils.get(beanManager, WebsocketSessionManager.class);
     }
 
-    public String getChannel()
-    {
-        return channel;
-    }
-
     @Override
     public Set<Future<Void>> send(Object message)
     {
         //1. locate the channel and define the context
-        String channel = getChannel();
-
         WebsocketScopeManager.AbstractScope applicationScope = scopeManager.getApplicationScope(false);
         WebsocketScopeManager.AbstractScope viewScope = null;
         WebsocketScopeManager.AbstractScope sessionScope = null;
@@ -86,12 +80,12 @@ public class PushContextImpl implements PushContext
             // Use view scope for context
             channelTokens = viewScope.getChannelTokens(channel);
         }
-        else if (sessionScope != null && sessionScope.isChannelAvailable(getChannel()))
+        else if (sessionScope != null && sessionScope.isChannelAvailable(channel))
         {
             // Use session scope for context
             channelTokens = sessionScope.getChannelTokens(channel);
         }
-        else if (applicationScope != null && applicationScope.isChannelAvailable(getChannel()))
+        else if (applicationScope.isChannelAvailable(channel))
         {
             // Use application scope for context
             channelTokens = applicationScope.getChannelTokens(channel);
@@ -131,64 +125,23 @@ public class PushContextImpl implements PushContext
     @Override
     public <S extends Serializable> Map<S, Set<Future<Void>>> send(Object message, Collection<S> users)
     {
-        //1. locate the channel and define the context
-        String channel = getChannel();
 
-        WebsocketScopeManager.AbstractScope applicationScope = scopeManager.getApplicationScope(false);
-        WebsocketScopeManager.AbstractScope viewScope = null;
-        WebsocketScopeManager.AbstractScope sessionScope = null;
+        Map<S, Set<Future<Void>>> resultsByUser = new HashMap<>(users.size());
 
-        if (CDIUtils.isRequestScopeActive(beanManager))
+        for (S user : users)
         {
-            if (CDIUtils.isSessionScopeActive(beanManager))
-            {
-                sessionScope = scopeManager.getSessionScope(false);
-                if (CDIUtils.isViewScopeActive(beanManager))
-                {
-                    viewScope = scopeManager.getViewScope(false);
-                }
-            }
-        }
-        
-        if (applicationScope == null)
-        {
-            // No base bean to push message
-            return Collections.emptyMap();
-        }
+            Set<String> channelTokenSet = sessionManager.getChannelTokensForUser(user, channel);
+            Set<Future<Void>> results = new HashSet<>(channelTokenSet.size());
 
-        Map<S, Set<Future<Void>>> result = new HashMap<>();
-
-        if (viewScope != null && viewScope.isChannelAvailable(channel))
-        {
-            // Use view scope for context
-            for (S user : users)
-            {
-                result.put(user, send(viewScope.getChannelTokens(channel, user), message));
-            }
-        }
-        else if (sessionScope != null && sessionScope.isChannelAvailable(getChannel()))
-        {
-            // Use session scope for context
-            for (S user : users)
-            {
-                result.put(user, send(sessionScope.getChannelTokens(channel, user), message));
-            }
-        }
-        else if (applicationScope != null && applicationScope.isChannelAvailable(getChannel()))
-        {
-            // Use application scope for context
-            for (S user : users)
+            for (String channelToken : channelTokenSet)
             {
-                result.put(user, send(applicationScope.getChannelTokens(channel, user), message));
+                results.addAll(sessionManager.send(channelToken, message));
             }
+
+            resultsByUser.put(user, results);
         }
-        else
-        {
-            throw new FacesException("CDI bean not found for push message");
-        }
-        
-        //2. send the message
-        return result;
+
+        return resultsByUser;
     }
     
     private Set<Future<Void>> send(List<String> channelTokens, Object message)
diff --git a/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketScopeManager.java b/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketScopeManager.java
index 1c6a1cb22..a08c77db6 100644
--- a/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketScopeManager.java
+++ b/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketScopeManager.java
@@ -103,9 +103,13 @@ public class WebsocketScopeManager
             // When current session scope is about to be destroyed, deregister all session scope channels and
             // explicitly close any open web sockets associated with it to avoid stale websockets.
             // If any, also deregister session users.
-            for (String token : tokens.keySet())
+            for (Map.Entry<String, WebsocketChannelMetadata> entry : tokens.entrySet())
             {
-                sessionManager.removeSession(token);
+                // remove channelToken - only if it is session scope
+                if (WebsocketScopeManager.SCOPE_SESSION.equals(entry.getValue().getScope()))
+                {
+                    sessionManager.removeChannelToken(entry.getKey());
+                }
             }
 
             // we dont need to destroy child sockets ("view")
@@ -174,12 +178,6 @@ public class WebsocketScopeManager
                 }
             }
 
-            // remove sessions
-            for (String token : tokens.keySet())
-            {
-                sessionManager.removeSession(token);
-            }
-
             channelTokens.clear();
             tokens.clear();
        }
diff --git a/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketSessionManager.java b/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketSessionManager.java
index 96d9479bb..61caf12f6 100644
--- a/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketSessionManager.java
+++ b/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketSessionManager.java
@@ -21,17 +21,30 @@ package org.apache.myfaces.push.cdi;
 
 import jakarta.annotation.PostConstruct;
 import jakarta.enterprise.context.ApplicationScoped;
+
+import java.io.IOException;
+import java.io.Serializable;
 import java.lang.ref.Reference;
 import java.lang.ref.SoftReference;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
+import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
 import jakarta.faces.context.ExternalContext;
+import jakarta.websocket.CloseReason;
 import jakarta.websocket.Session;
 
 import org.apache.myfaces.push.WebsocketSessionClusterSerializedRestore;
@@ -40,12 +53,20 @@ import org.apache.myfaces.push.Json;
 import org.apache.myfaces.util.lang.ConcurrentLRUCache;
 import org.apache.myfaces.util.lang.Lazy;
 
+import static jakarta.websocket.CloseReason.CloseCodes.NORMAL_CLOSURE;
+
 @ApplicationScoped
 public class WebsocketSessionManager
 {
-    private Lazy<ConcurrentLRUCache<String, Reference<Session>>> sessionMap;
+    private Lazy<ConcurrentLRUCache<String, Collection<Reference<Session>>>> sessionMap;
+
+    private Lazy<ConcurrentHashMap<UserChannelKey, Set<String>>> userMap;
     private Queue<String> restoreQueue;
 
+    private static final CloseReason REASON_EXPIRED = new CloseReason(NORMAL_CLOSURE, "Expired");
+
+    private static final Logger LOG = Logger.getLogger(WebsocketSessionManager.class.getName());
+
     @PostConstruct
     public void init()
     {
@@ -55,17 +76,63 @@ public class WebsocketSessionManager
             return new ConcurrentLRUCache<>((size * 4 + 3) / 3, size);
         });
         restoreQueue = new ConcurrentLinkedQueue<>();
+        userMap = new Lazy<>(ConcurrentHashMap::new);
     }
 
-    public ConcurrentLRUCache<String, Reference<Session>> getSessionMap()
+    public ConcurrentLRUCache<String, Collection<Reference<Session>>> getSessionMap()
     {
         return sessionMap.get();
     }
 
+    public ConcurrentMap<UserChannelKey, Set<String>> getUserMap()
+    {
+        return userMap.get();
+    }
+
+    public void registerSessionToken(String channelToken)
+    {
+        if (this.getSessionMap().get(channelToken) == null)
+        {
+            this.getSessionMap().put(channelToken, new ConcurrentLinkedQueue<>());
+        }
+    }
+
+    public void registerUser(Serializable user, String channel, String channelToken)
+    {
+        UserChannelKey userChannelKey = new UserChannelKey(user, channel);
+
+        Set<String> channelTokenSet = getUserMap().computeIfAbsent(userChannelKey, k -> new HashSet<>(1));
+        channelTokenSet.add(channelToken);
+    }
+
+    public void deregisterUser(Serializable user, String channel, String channelToken)
+    {
+        UserChannelKey userChannelKey = new UserChannelKey(user, channel);
+
+        synchronized (getUserMap())
+        {
+            Set<String> channelTokenSet = getUserMap().get(userChannelKey);
+            if (channelTokenSet != null)
+            {
+                channelTokenSet.remove(channelToken);
+                if (channelTokenSet.isEmpty())
+                {
+                    getUserMap().remove(userChannelKey);
+                }
+            }
+        }
+    }
+
+    public Set<String> getChannelTokensForUser(Serializable user, String channel)
+    {
+        UserChannelKey userChannelKey = new UserChannelKey(user, channel);
+        return getUserMap().get(userChannelKey);
+    }
+
     public void initSessionMap(ExternalContext context)
     {
         int size = MyfacesConfig.getCurrentInstance(context).getWebsocketMaxConnections();
-        ConcurrentLRUCache<String, Reference<Session>> newSessionMap
+        ConcurrentLRUCache<String, Collection<Reference<Session>>> newSessionMap
                 = new ConcurrentLRUCache<>((size * 4 + 3) / 3, size);
         
         synchronized (sessionMap)
@@ -75,13 +142,20 @@ public class WebsocketSessionManager
                 // If a Session has been restored, it could be already a lruCache instantiated, so in this case
                 // we need to fill the new one with the old instances, but only the instances that are active
                 // at the moment.
-                Set<Map.Entry<String, Reference<Session>>> entries = sessionMap.get()
+                Set<Map.Entry<String, Collection<Reference<Session>>>> entries = sessionMap.get()
                         .getLatestAccessedItems(MyfacesConfig.WEBSOCKET_MAX_CONNECTIONS_DEFAULT).entrySet();
-                for (Map.Entry<String, Reference<Session>> entry : entries)
+                for (Map.Entry<String, Collection<Reference<Session>>> entry : entries)
                 {
-                    if (entry.getValue() != null && entry.getValue().get() != null && entry.getValue().get().isOpen())
+                    Collection<Reference<Session>> referenceCollection = entry.getValue();
+                    if (referenceCollection != null)
                     {
-                        newSessionMap.put(entry.getKey(), entry.getValue());
+                        Collection<Reference<Session>> newReferenceCollection =
+                                referenceCollection
+                                        .stream()
+                                        .filter(p -> p.get() != null && p.get().isOpen())
+                                        .distinct()
+                                        .collect(Collectors.toCollection(ConcurrentLinkedQueue::new));
+                        newSessionMap.put(entry.getKey(), newReferenceCollection);
                     }
                 }
             }
@@ -101,14 +175,22 @@ public class WebsocketSessionManager
     
     public boolean addOrUpdateSession(String channelToken, Session session)
     {
-        Reference oldInstance = getSessionMap().get(channelToken);
-        if (oldInstance == null)
+        if (LOG.isLoggable(Level.FINE))
         {
-            getSessionMap().put(channelToken, new SoftReference<>(session));
+            LOG.log (Level.FINE, "WebsocketSessionManager: addOrUpdateSession for channelToken = {0}, " +
+                    "session.id = {1}", new Object[] {channelToken ,session.getId()});
         }
-        else if (!session.equals(oldInstance.get()))
+        Collection<Reference<Session>> sessions = this.getSessionMap().get(channelToken);
+        if (sessions == null)
         {
-            getSessionMap().put(channelToken, new SoftReference<>(session));
+            registerSessionToken(channelToken);
+        }
+        Optional<Reference<Session>> referenceOptional =
+                sessions.stream().filter(p -> Objects.equals(p.get(), session)).findFirst();
+
+        if (!referenceOptional.isPresent())
+        {
+            return sessions.add(new SoftReference<>(session));
         }
         return true;
     }
@@ -122,37 +204,89 @@ public class WebsocketSessionManager
      * @param channelToken
      * @return 
      */
-    public boolean removeSession(String channelToken)
+    public void removeSession(String channelToken, Session session)
     {
+        if (LOG.isLoggable(Level.FINE))
+        {
+            LOG.log (Level.FINE, "WebsocketSessionManager: removeSession for channelToken = {0}, " +
+                    "session.id = {1}", new Object[] {channelToken ,session.getId()});
+        }
+        Collection<Reference<Session>> collection = getSessionMap().get(channelToken);
+        Optional<Reference<Session>> referenceOptional =
+                collection.stream().filter(p -> Objects.equals(p.get(), session)).findFirst();
+        referenceOptional.ifPresent(collection::remove);
+    }
+
+    /**
+     * Remove the channelToken and close all sessions associated with it. Happens, when session scope
+     * or view scope is destroyed.
+     * @param channelToken
+     */
+    public void removeChannelToken(String channelToken)
+    {
+        // close all sessions associated with this channelToken
+        Collection<Reference<Session>> sessions = getSessionMap().get(channelToken);
+
+        if (sessions != null)
+        {
+            for (Reference<Session> sessionReference : sessions)
+            {
+                Session session = sessionReference.get();
+                if (session != null && session.isOpen())
+                {
+                    try
+                    {
+                        session.close(REASON_EXPIRED);
+                    }
+                    catch (IOException ignore)
+                    {
+                        // ignored
+                    }
+                }
+            }
+        }
+
         getSessionMap().remove(channelToken);
-        return false;
     }
-    
-    
+
     protected Set<Future<Void>> send(String channelToken, Object message)
     {
         // Before send, we need to check 
         synchronizeSessionInstances();
 
         Set<Future<Void>> results = new HashSet<>(1);
-        Reference<Session> sessionRef = (channelToken != null) ? getSessionMap().get(channelToken) : null;
+        Collection<Reference<Session>> sessions = (channelToken != null) ? getSessionMap().get(channelToken) : null;
 
-        if (sessionRef != null && sessionRef.get() != null)
+        if (sessions != null && !sessions.isEmpty())
         {
             String json = Json.encode(message);
-            Session session = sessionRef.get();
-            if (session.isOpen())
-            {
-                send(session, json, results, 0);
-            }
-            else
-            {
-                //If session is not open, remove the session, because a websocket session after is closed cannot
-                //be alive.
-                getSessionMap().remove(channelToken);
-            }
+
+            sessions.forEach (
+                    sessionRef ->
+                    {
+                        if (sessionRef != null && sessionRef.get() != null)
+                        {
+                            Session session = sessionRef.get();
+                            if (session.isOpen())
+                            {
+                                send(session, json, results, 0);
+                            }
+                            else
+                            {
+                                //If session is not open, remove the session, because a websocket
+                                // session after is closed cannot
+                                //be alive.
+                                removeSession(channelToken, session);
+                            }
+                        }
+                    }
+            );
+            return results;
+        }
+        else
+        {
+            return Collections.emptySet();
         }
-        return results;
     }
 
     private final String WARNING_TOMCAT_WEB_SOCKET_BOMBED =
@@ -204,7 +338,7 @@ public class WebsocketSessionManager
                 && illegalStateException.getMessage().contains("[TEXT_FULL_WRITING]");
     }
     
-    private void synchronizeSessionInstances()
+    public void synchronizeSessionInstances()
     {
         Queue<String> queue = getRestoredQueue();
         // The queue is always empty, unless a deserialization of Session instances happen. If that happens, 
@@ -214,34 +348,42 @@ public class WebsocketSessionManager
         if (!queue.isEmpty())
         {
             // It is necessary to have at least 1 registered Session instance to call getOpenSessions() and get all
-            // instances associated to jakarta.faces.push Endpoint.
-            Map<String, Reference<Session>> map = getSessionMap().getLatestAccessedItems(1);
+            // instances associated to javax.faces.push Endpoint.
+            Map<String, Collection<Reference<Session>>> map = getSessionMap().getLatestAccessedItems(1);
             if (map != null && !map.isEmpty())
             {
-                Reference<Session> ref = map.values().iterator().next();
-                if (ref != null)
-                {
-                    Session s = ref.get();
-                    if (s != null)
-                    {
-                        Set<Session> set = s.getOpenSessions();
-                        
-                        for (Iterator<Session> it = set.iterator(); it.hasNext();)
+
+                Collection<Reference<Session>> collectionRef = map.values().iterator().next();
+
+                collectionRef.forEach( ref ->
                         {
-                            Session instance = it.next();
-                            WebsocketSessionClusterSerializedRestore r = 
-                                    (WebsocketSessionClusterSerializedRestore) instance.getUserProperties().get(
-                                        WebsocketSessionClusterSerializedRestore.WEBSOCKET_SESSION_SERIALIZED_RESTORE);
-                            if (r != null && r.isDeserialized())
+                        if (ref != null)
+                        {
+                            Session s = ref.get();
+                            if (s != null)
                             {
-                                addOrUpdateSession(r.getChannelToken(), s);
+                                Set<Session> set = s.getOpenSessions();
+
+                                for (Iterator<Session> it = set.iterator(); it.hasNext(); )
+                                {
+                                    Session instance = it.next();
+                                    WebsocketSessionClusterSerializedRestore r =
+                                            (WebsocketSessionClusterSerializedRestore) instance.
+                                                    getUserProperties().get(
+                            WebsocketSessionClusterSerializedRestore.WEBSOCKET_SESSION_SERIALIZED_RESTORE
+                                                    );
+                                    if (r != null && r.isDeserialized())
+                                    {
+                                        addOrUpdateSession(r.getChannelToken(), s);
+                                    }
+                                }
+
+                                // Remove one element from the queue
+                                queue.poll();
                             }
                         }
-                        
-                        // Remove one element from the queue
-                        queue.poll();
                     }
-                }
+                );
             }
         }
     }
@@ -250,5 +392,39 @@ public class WebsocketSessionManager
     {
         return restoreQueue;
     }
-        
+
+
+    private class UserChannelKey implements Serializable
+    {
+
+        private final Serializable user;
+        private final String channel;
+        public UserChannelKey(Serializable user, String channel)
+        {
+            this.user = user;
+            this.channel = channel;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+            UserChannelKey that = (UserChannelKey) o;
+            return Objects.equals(user, that.user) && Objects.equals(channel, that.channel);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(user, channel);
+        }
+    }
+
 }