You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@myfaces.apache.org by me...@apache.org on 2022/10/06 12:17:30 UTC

[myfaces] branch 2.3-next updated: MYFACES-4445: websockets refactoring (#327)

This is an automated email from the ASF dual-hosted git repository.

melloware pushed a commit to branch 2.3-next
in repository https://gitbox.apache.org/repos/asf/myfaces.git


The following commit(s) were added to refs/heads/2.3-next by this push:
     new fe3227b57 MYFACES-4445: websockets refactoring (#327)
fe3227b57 is described below

commit fe3227b570b7ef34b492187355bbdf1bc4e3dbea
Author: milansie <43...@users.noreply.github.com>
AuthorDate: Thu Oct 6 14:17:25 2022 +0200

    MYFACES-4445: websockets refactoring (#327)
    
    - reworked channel token generation - new token is generated only if needed (ie new session/view), just one token for application scope
    - fixed usage of user-specified channel - now also works even in application scope calls
    - changed default max idle timeout for websockets - according to omnifaces/mojarra spec
    - specified default websocket scope - according to documentation (session scope for user-specified websocket as default)
    
    Co-authored-by: siebenburger <mi...@aura.cz>
---
 .../java/org/apache/myfaces/push/EndpointImpl.java |  27 +-
 .../myfaces/push/WebsocketComponentRenderer.java   |  75 +++++-
 .../apache/myfaces/push/cdi/PushContextImpl.java   |  73 +-----
 .../myfaces/push/cdi/WebsocketScopeManager.java    |   9 +-
 .../myfaces/push/cdi/WebsocketSessionManager.java  | 276 +++++++++++++++++----
 5 files changed, 329 insertions(+), 131 deletions(-)

diff --git a/impl/src/main/java/org/apache/myfaces/push/EndpointImpl.java b/impl/src/main/java/org/apache/myfaces/push/EndpointImpl.java
index b17786f9a..5a34b1982 100644
--- a/impl/src/main/java/org/apache/myfaces/push/EndpointImpl.java
+++ b/impl/src/main/java/org/apache/myfaces/push/EndpointImpl.java
@@ -77,11 +77,24 @@ public class EndpointImpl extends Endpoint
         if (Boolean.TRUE.equals(config.getUserProperties().get(WebsocketConfigurator.WEBSOCKET_VALID)) &&
                 sessionManager.addOrUpdateSession(channelToken, session))
         {
+            // default value 0, could be reconfigured if needed
             session.setMaxIdleTimeout((Long) config.getUserProperties().getOrDefault(
-                    WebsocketConfigurator.MAX_IDLE_TIMEOUT, 300000L));
+                    WebsocketConfigurator.MAX_IDLE_TIMEOUT, 0));
 
             Serializable user = (Serializable) session.getUserProperties().get(WebsocketConfigurator.WEBSOCKET_USER);
 
+            if (LOG.isLoggable(Level.FINE))
+            {
+                LOG.log(Level.FINE, "EndPointImpl.onOpen (channel = {0}, token = {1}, user = {2})",
+                        new Object[] {channel, channelToken, user});
+            }
+
+            // register user
+            if (user != null)
+            {
+                sessionManager.registerUser(user, channel, channelToken);
+            }
+
             beanManager.get().fireEvent(new WebsocketEvent(channel, user, null), OPENED);
             
             session.getUserProperties().put(
@@ -110,6 +123,11 @@ public class EndpointImpl extends Endpoint
         String channelToken = session.getQueryString();
 
         Serializable user = (Serializable) session.getUserProperties().get(WebsocketConfigurator.WEBSOCKET_USER);
+        if (LOG.isLoggable(Level.FINE))
+        {
+            LOG.log(Level.FINE, "EndPointImpl.onClose (channel = {0}, token = {1}, user = {2})",
+                    new Object[] {channel, channelToken, user});
+        }
 
         if (!beanManager.isInitialized())
         {
@@ -128,7 +146,12 @@ public class EndpointImpl extends Endpoint
         }
 
         WebsocketSessionManager sessionManager = CDIUtils.get(beanManager.get(), WebsocketSessionManager.class);
-        sessionManager.removeSession(channelToken);
+        sessionManager.removeSession(channelToken, session);
+        // deregister user
+        if (user != null)
+        {
+            sessionManager.deregisterUser(user, channel, channelToken);
+        }
         
         beanManager.get().fireEvent(
                 new WebsocketEvent(channel, user, closeReason.getCloseCode()), CLOSED);
diff --git a/impl/src/main/java/org/apache/myfaces/push/WebsocketComponentRenderer.java b/impl/src/main/java/org/apache/myfaces/push/WebsocketComponentRenderer.java
index 248d593ca..0e872efaa 100644
--- a/impl/src/main/java/org/apache/myfaces/push/WebsocketComponentRenderer.java
+++ b/impl/src/main/java/org/apache/myfaces/push/WebsocketComponentRenderer.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 import javax.enterprise.inject.spi.BeanManager;
 import javax.faces.FacesWrapper;
 import javax.faces.component.UIComponent;
@@ -40,6 +42,7 @@ import org.apache.myfaces.cdi.util.CDIUtils;
 import org.apache.myfaces.push.cdi.WebsocketChannelMetadata;
 import org.apache.myfaces.push.cdi.WebsocketChannelTokenBuilder;
 import org.apache.myfaces.push.cdi.WebsocketScopeManager;
+import org.apache.myfaces.push.cdi.WebsocketSessionManager;
 import org.apache.myfaces.renderkit.html.util.ClientBehaviorRendererUtils;
 import org.apache.myfaces.renderkit.html.util.HTML;
 import org.apache.myfaces.renderkit.html.util.ResourceUtils;
@@ -54,6 +57,7 @@ import org.apache.myfaces.renderkit.html.util.ResourceUtils;
 @ListenerFor(systemEventClass = PostAddToViewEvent.class)
 public class WebsocketComponentRenderer extends Renderer implements ComponentSystemEventListener
 {
+    private static final Logger LOG = Logger.getLogger(WebsocketComponentRenderer.class.getName());
 
     @Override
     public void processEvent(ComponentSystemEvent event)
@@ -128,14 +132,60 @@ public class WebsocketComponentRenderer extends Renderer implements ComponentSys
 
         // Create channel token 
         // TODO: Use ResponseStateManager to create the token
-        String scope = component.getScope() == null ? "application" : component.getScope();
+        // The default scope is application. When the user attribute is specified, then the default scope is session.
+        String scope = component.getScope();
+        if (scope == null)
+        {
+            scope = (component.getUser() == null ? "application" : "session");
+        }
+
         WebsocketChannelMetadata metadata = new WebsocketChannelMetadata(
                 channel, scope, component.getUser(), component.isConnected());
 
         WebsocketScopeManager scopeManager = CDIUtils.get(beanManager, WebsocketScopeManager.class);
-        
+
+        // try to find an existing channelToken
         String channelToken = null;
-        // Force a new channelToken if "connected" property is set to false, because in that case websocket creation 
+        if (scopeManager.getScope(scope, true).isChannelAvailable(channel))
+        {
+            if (component.getUser() == null)
+            {
+                List<String> channelTokenList = scopeManager.getScope (scope, true).getChannelTokens(channel);
+                if (LOG.isLoggable(Level.FINE))
+                {
+                    LOG.log(Level.FINE, "WebsocketComponentRenderer.encodeEnd: for channel = {0} found : ",
+                            channel);
+                    channelTokenList.forEach (p -> LOG.log(Level.FINE, "  {0}", p));
+                }
+                // should be just one
+                if (channelTokenList.size() == 1)
+                {
+                    channelToken = channelTokenList.get(0);
+                }
+            }
+            else
+            {
+                List<String> channelTokenList = scopeManager.getScope (scope, true)
+                        .getChannelTokens(channel, component.getUser());
+                if (LOG.isLoggable(Level.FINE))
+                {
+                    LOG.log(Level.FINE,
+                            "WebsocketComponentRenderer.encodeEnd: for channel = {0}, user = {1} found : ",
+                            new Object[] {channel, component.getUser()});
+                    channelTokenList.forEach (p -> LOG.log(Level.FINE, "  {0}", p));
+                }
+                // should be just one for combination channel / user
+                if (channelTokenList.size() == 1)
+                {
+                    channelToken = channelTokenList.get(0);
+                }
+
+            }
+        }
+
+        // Create channel token if needed
+        // TODO: Use ResponseStateManager to create the token
+        // Force a new channelToken if "connected" property is set to false, because in that case websocket creation
         if (!component.isConnected())
         {
             // This bean is required because you always need to register the token, so it can be properly destroyed
@@ -145,15 +195,18 @@ public class WebsocketComponentRenderer extends Renderer implements ComponentSys
         {
             // No channel token found for that combination, create a new token for this view
             channelToken = channelTokenBuilder.createChannelToken(facesContext, channel);
-            
-            // Register channel in view scope to chain discard view algorithm using @PreDestroy
-            scopeManager.getViewScope(true).registerToken(channelToken, metadata);
-            
-            // Register channel in session scope to allow validation on handshake  WebsocketConfigurator)
-            scopeManager.getSessionScope(true).registerToken(channelToken, metadata);
-        }  
+            scopeManager.getScope(scope, true).registerWebsocketSession(channelToken, metadata);
+        }
+
+        // Register channel in view scope to chain discard view algorithm using @PreDestroy
+        scopeManager.getViewScope(true).registerToken(channelToken, metadata);
+
+        // Register channel in session scope to allow validation on handshake  WebsocketConfigurator)
+        scopeManager.getSessionScope(true).registerToken(channelToken, metadata);
 
-        scopeManager.getScope(scope, true).registerWebsocketSession(channelToken, metadata);
+        // Prepare channelToken to websocket sessionMap (real session will be connected later)
+        WebsocketSessionManager sessionManager = CDIUtils.get(beanManager, WebsocketSessionManager.class);
+        sessionManager.registerSessionToken(channelToken);
 
         writer.startElement(HTML.SCRIPT_ELEM, component);
         writer.writeAttribute(HTML.SCRIPT_TYPE_ATTR, HTML.SCRIPT_TYPE_TEXT_JAVASCRIPT, null);
diff --git a/impl/src/main/java/org/apache/myfaces/push/cdi/PushContextImpl.java b/impl/src/main/java/org/apache/myfaces/push/cdi/PushContextImpl.java
index ffadad2de..45ac72e4b 100644
--- a/impl/src/main/java/org/apache/myfaces/push/cdi/PushContextImpl.java
+++ b/impl/src/main/java/org/apache/myfaces/push/cdi/PushContextImpl.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -46,17 +47,10 @@ public class PushContextImpl implements PushContext
         this.sessionManager = CDIUtils.get(beanManager, WebsocketSessionManager.class);
     }
 
-    public String getChannel()
-    {
-        return channel;
-    }
-
     @Override
     public Set<Future<Void>> send(Object message)
     {
         //1. locate the channel and define the context
-        String channel = getChannel();
-
         WebsocketScopeManager.AbstractScope applicationScope = scopeManager.getApplicationScope(false);
         WebsocketScopeManager.AbstractScope viewScope = null;
         WebsocketScopeManager.AbstractScope sessionScope = null;
@@ -86,12 +80,12 @@ public class PushContextImpl implements PushContext
             // Use view scope for context
             channelTokens = viewScope.getChannelTokens(channel);
         }
-        else if (sessionScope != null && sessionScope.isChannelAvailable(getChannel()))
+        else if (sessionScope != null && sessionScope.isChannelAvailable(channel))
         {
             // Use session scope for context
             channelTokens = sessionScope.getChannelTokens(channel);
         }
-        else if (applicationScope != null && applicationScope.isChannelAvailable(getChannel()))
+        else if (applicationScope.isChannelAvailable(channel))
         {
             // Use application scope for context
             channelTokens = applicationScope.getChannelTokens(channel);
@@ -131,64 +125,23 @@ public class PushContextImpl implements PushContext
     @Override
     public <S extends Serializable> Map<S, Set<Future<Void>>> send(Object message, Collection<S> users)
     {
-        //1. locate the channel and define the context
-        String channel = getChannel();
 
-        WebsocketScopeManager.AbstractScope applicationScope = scopeManager.getApplicationScope(false);
-        WebsocketScopeManager.AbstractScope viewScope = null;
-        WebsocketScopeManager.AbstractScope sessionScope = null;
+        Map<S, Set<Future<Void>>> resultsByUser = new HashMap<>(users.size());
 
-        if (CDIUtils.isRequestScopeActive(beanManager))
+        for (S user : users)
         {
-            if (CDIUtils.isSessionScopeActive(beanManager))
-            {
-                sessionScope = scopeManager.getSessionScope(false);
-                if (CDIUtils.isViewScopeActive(beanManager))
-                {
-                    viewScope = scopeManager.getViewScope(false);
-                }
-            }
-        }
-        
-        if (applicationScope == null)
-        {
-            // No base bean to push message
-            return Collections.emptyMap();
-        }
+            Set<String> channelTokenSet = sessionManager.getChannelTokensForUser(user, channel);
+            Set<Future<Void>> results = new HashSet<>(channelTokenSet.size());
 
-        Map<S, Set<Future<Void>>> result = new HashMap<>();
-
-        if (viewScope != null && viewScope.isChannelAvailable(channel))
-        {
-            // Use view scope for context
-            for (S user : users)
-            {
-                result.put(user, send(viewScope.getChannelTokens(channel, user), message));
-            }
-        }
-        else if (sessionScope != null && sessionScope.isChannelAvailable(getChannel()))
-        {
-            // Use session scope for context
-            for (S user : users)
-            {
-                result.put(user, send(sessionScope.getChannelTokens(channel, user), message));
-            }
-        }
-        else if (applicationScope != null && applicationScope.isChannelAvailable(getChannel()))
-        {
-            // Use application scope for context
-            for (S user : users)
+            for (String channelToken : channelTokenSet)
             {
-                result.put(user, send(applicationScope.getChannelTokens(channel, user), message));
+                results.addAll(sessionManager.send(channelToken, message));
             }
+
+            resultsByUser.put(user, results);
         }
-        else
-        {
-            throw new FacesException("CDI bean not found for push message");
-        }
-        
-        //2. send the message
-        return result;
+
+        return resultsByUser;
     }
     
     private Set<Future<Void>> send(List<String> channelTokens, Object message)
diff --git a/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketScopeManager.java b/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketScopeManager.java
index 72c948abb..06cd6b41c 100644
--- a/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketScopeManager.java
+++ b/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketScopeManager.java
@@ -105,7 +105,8 @@ public class WebsocketScopeManager
             // If any, also deregister session users.
             for (String token : tokens.keySet())
             {
-                sessionManager.removeSession(token);
+                // remove channelToken
+                sessionManager.removeChannelToken(token);
             }
 
             // we dont need to destroy child sockets ("view")
@@ -174,12 +175,6 @@ public class WebsocketScopeManager
                 }
             }
 
-            // remove sessions
-            for (String token : tokens.keySet())
-            {
-                sessionManager.removeSession(token);
-            }
-
             channelTokens.clear();
             tokens.clear();
        }
diff --git a/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketSessionManager.java b/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketSessionManager.java
index 96a720458..e0eda3921 100644
--- a/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketSessionManager.java
+++ b/impl/src/main/java/org/apache/myfaces/push/cdi/WebsocketSessionManager.java
@@ -21,17 +21,28 @@ package org.apache.myfaces.push.cdi;
 
 import javax.annotation.PostConstruct;
 import javax.enterprise.context.ApplicationScoped;
+import java.io.IOException;
+import java.io.Serializable;
 import java.lang.ref.Reference;
 import java.lang.ref.SoftReference;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
+import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 import javax.faces.context.ExternalContext;
+import javax.websocket.CloseReason;
 import javax.websocket.Session;
 import org.apache.myfaces.config.MyfacesConfig;
 import org.apache.myfaces.push.WebsocketSessionClusterSerializedRestore;
@@ -39,12 +50,20 @@ import org.apache.myfaces.push.Json;
 import org.apache.myfaces.util.lang.ConcurrentLRUCache;
 import org.apache.myfaces.util.lang.Lazy;
 
+import static javax.websocket.CloseReason.CloseCodes.NORMAL_CLOSURE;
+
 @ApplicationScoped
 public class WebsocketSessionManager
 {
-    private Lazy<ConcurrentLRUCache<String, Reference<Session>>> sessionMap;
+    private Lazy<ConcurrentLRUCache<String, Collection<Reference<Session>>>> sessionMap;
+
+    private Lazy<ConcurrentHashMap<UserChannelKey, Set<String>>> userMap;
     private Queue<String> restoreQueue;
 
+    private static final CloseReason REASON_EXPIRED = new CloseReason(NORMAL_CLOSURE, "Expired");
+
+    private static final Logger LOG = Logger.getLogger(WebsocketSessionManager.class.getName());
+
     @PostConstruct
     public void init()
     {
@@ -54,17 +73,63 @@ public class WebsocketSessionManager
             return new ConcurrentLRUCache<>((size * 4 + 3) / 3, size);
         });
         restoreQueue = new ConcurrentLinkedQueue<>();
+        userMap = new Lazy<>(ConcurrentHashMap::new);
     }
 
-    public ConcurrentLRUCache<String, Reference<Session>> getSessionMap()
+    public ConcurrentLRUCache<String, Collection<Reference<Session>>> getSessionMap()
     {
         return sessionMap.get();
     }
 
+    public ConcurrentMap<UserChannelKey, Set<String>> getUserMap()
+    {
+        return userMap.get();
+    }
+
+    public void registerSessionToken(String channelToken)
+    {
+        if (this.getSessionMap().get(channelToken) == null)
+        {
+            this.getSessionMap().put(channelToken, new ConcurrentLinkedQueue<>());
+        }
+    }
+
+    public void registerUser(Serializable user, String channel, String channelToken)
+    {
+        UserChannelKey userChannelKey = new UserChannelKey(user, channel);
+
+        Set<String> channelTokenSet = getUserMap().computeIfAbsent(userChannelKey, k -> new HashSet<>(1));
+        channelTokenSet.add(channelToken);
+    }
+
+    public void deregisterUser(Serializable user, String channel, String channelToken)
+    {
+        UserChannelKey userChannelKey = new UserChannelKey(user, channel);
+
+        synchronized (getUserMap())
+        {
+            Set<String> channelTokenSet = getUserMap().get(userChannelKey);
+            if (channelTokenSet != null)
+            {
+                channelTokenSet.remove(channelToken);
+                if (channelTokenSet.isEmpty())
+                {
+                    getUserMap().remove(userChannelKey);
+                }
+            }
+        }
+    }
+
+    public Set<String> getChannelTokensForUser(Serializable user, String channel)
+    {
+        UserChannelKey userChannelKey = new UserChannelKey(user, channel);
+        return getUserMap().get(userChannelKey);
+    }
+
     public void initSessionMap(ExternalContext context)
     {
         int size = MyfacesConfig.getCurrentInstance(context).getWebsocketMaxConnections();
-        ConcurrentLRUCache<String, Reference<Session>> newSessionMap
+        ConcurrentLRUCache<String, Collection<Reference<Session>>> newSessionMap
                 = new ConcurrentLRUCache<>((size * 4 + 3) / 3, size);
         
         synchronized (sessionMap)
@@ -74,13 +139,20 @@ public class WebsocketSessionManager
                 // If a Session has been restored, it could be already a lruCache instantiated, so in this case
                 // we need to fill the new one with the old instances, but only the instances that are active
                 // at the moment.
-                Set<Map.Entry<String, Reference<Session>>> entries = sessionMap.get()
+                Set<Map.Entry<String, Collection<Reference<Session>>>> entries = sessionMap.get()
                         .getLatestAccessedItems(MyfacesConfig.WEBSOCKET_MAX_CONNECTIONS_DEFAULT).entrySet();
-                for (Map.Entry<String, Reference<Session>> entry : entries)
+                for (Map.Entry<String, Collection<Reference<Session>>> entry : entries)
                 {
-                    if (entry.getValue() != null && entry.getValue().get() != null && entry.getValue().get().isOpen())
+                    Collection<Reference<Session>> referenceCollection = entry.getValue();
+                    if (referenceCollection != null)
                     {
-                        newSessionMap.put(entry.getKey(), entry.getValue());
+                        Collection<Reference<Session>> newReferenceCollection =
+                                referenceCollection
+                                        .stream()
+                                        .filter(p -> p.get() != null && p.get().isOpen())
+                                        .distinct()
+                                        .collect(Collectors.toCollection(ConcurrentLinkedQueue::new));
+                        newSessionMap.put(entry.getKey(), newReferenceCollection);
                     }
                 }
             }
@@ -100,14 +172,22 @@ public class WebsocketSessionManager
     
     public boolean addOrUpdateSession(String channelToken, Session session)
     {
-        Reference oldInstance = getSessionMap().get(channelToken);
-        if (oldInstance == null)
+        if (LOG.isLoggable(Level.FINE))
+        {
+            LOG.log (Level.FINE, "WebsocketSessionManager: addOrUpdateSession for channelToken = {0}, " +
+                    "session.id = {1}", new Object[] {channelToken ,session.getId()});
+        }
+        Collection<Reference<Session>> sessions = this.getSessionMap().get(channelToken);
+        if (sessions == null)
         {
-            getSessionMap().put(channelToken, new SoftReference<>(session));
+            registerSessionToken(channelToken);
         }
-        else if (!session.equals(oldInstance.get()))
+        Optional<Reference<Session>> referenceOptional =
+                sessions.stream().filter(p -> Objects.equals(p.get(), session)).findFirst();
+
+        if (!referenceOptional.isPresent())
         {
-            getSessionMap().put(channelToken, new SoftReference<>(session));
+            return sessions.add(new SoftReference<>(session));
         }
         return true;
     }
@@ -121,37 +201,89 @@ public class WebsocketSessionManager
      * @param channelToken
      * @return 
      */
-    public boolean removeSession(String channelToken)
+    public void removeSession(String channelToken, Session session)
+    {
+        if (LOG.isLoggable(Level.FINE))
+        {
+            LOG.log (Level.FINE, "WebsocketSessionManager: removeSession for channelToken = {0}, " +
+                    "session.id = {1}", new Object[] {channelToken ,session.getId()});
+        }
+        Collection<Reference<Session>> collection = getSessionMap().get(channelToken);
+        Optional<Reference<Session>> referenceOptional =
+                collection.stream().filter(p -> Objects.equals(p.get(), session)).findFirst();
+        referenceOptional.ifPresent(collection::remove);
+    }
+
+    /**
+     * Remove the channelToken and close all sessions associated with it. Happens, when session scope
+     * or view scope is destroyed.
+     * @param channelToken
+     */
+    public void removeChannelToken(String channelToken)
     {
+        // close all sessions associated with this channelToken
+        Collection<Reference<Session>> sessions = getSessionMap().get(channelToken);
+
+        if (sessions != null)
+        {
+            for (Reference<Session> sessionReference : sessions)
+            {
+                Session session = sessionReference.get();
+                if (session != null && session.isOpen())
+                {
+                    try
+                    {
+                        session.close(REASON_EXPIRED);
+                    }
+                    catch (IOException ignore)
+                    {
+                        // ignored
+                    }
+                }
+            }
+        }
+
         getSessionMap().remove(channelToken);
-        return false;
     }
-    
-    
+
     protected Set<Future<Void>> send(String channelToken, Object message)
     {
         // Before send, we need to check 
         synchronizeSessionInstances();
 
         Set<Future<Void>> results = new HashSet<>(1);
-        Reference<Session> sessionRef = (channelToken != null) ? getSessionMap().get(channelToken) : null;
+        Collection<Reference<Session>> sessions = (channelToken != null) ? getSessionMap().get(channelToken) : null;
 
-        if (sessionRef != null && sessionRef.get() != null)
+        if (sessions != null && !sessions.isEmpty())
         {
             String json = Json.encode(message);
-            Session session = sessionRef.get();
-            if (session.isOpen())
-            {
-                send(session, json, results, 0);
-            }
-            else
-            {
-                //If session is not open, remove the session, because a websocket session after is closed cannot
-                //be alive.
-                getSessionMap().remove(channelToken);
-            }
+
+            sessions.forEach (
+                    sessionRef ->
+                    {
+                        if (sessionRef != null && sessionRef.get() != null)
+                        {
+                            Session session = sessionRef.get();
+                            if (session.isOpen())
+                            {
+                                send(session, json, results, 0);
+                            }
+                            else
+                            {
+                                //If session is not open, remove the session, because a websocket
+                                // session after is closed cannot
+                                //be alive.
+                                removeSession(channelToken, session);
+                            }
+                        }
+                    }
+            );
+            return results;
+        }
+        else
+        {
+            return Collections.emptySet();
         }
-        return results;
     }
 
     private final String WARNING_TOMCAT_WEB_SOCKET_BOMBED =
@@ -203,7 +335,7 @@ public class WebsocketSessionManager
                 && illegalStateException.getMessage().contains("[TEXT_FULL_WRITING]");
     }
     
-    private void synchronizeSessionInstances()
+    public void synchronizeSessionInstances()
     {
         Queue<String> queue = getRestoredQueue();
         // The queue is always empty, unless a deserialization of Session instances happen. If that happens, 
@@ -214,33 +346,41 @@ public class WebsocketSessionManager
         {
             // It is necessary to have at least 1 registered Session instance to call getOpenSessions() and get all
             // instances associated to javax.faces.push Endpoint.
-            Map<String, Reference<Session>> map = getSessionMap().getLatestAccessedItems(1);
+            Map<String, Collection<Reference<Session>>> map = getSessionMap().getLatestAccessedItems(1);
             if (map != null && !map.isEmpty())
             {
-                Reference<Session> ref = map.values().iterator().next();
-                if (ref != null)
-                {
-                    Session s = ref.get();
-                    if (s != null)
-                    {
-                        Set<Session> set = s.getOpenSessions();
-                        
-                        for (Iterator<Session> it = set.iterator(); it.hasNext();)
+
+                Collection<Reference<Session>> collectionRef = map.values().iterator().next();
+
+                collectionRef.forEach( ref ->
                         {
-                            Session instance = it.next();
-                            WebsocketSessionClusterSerializedRestore r = 
-                                    (WebsocketSessionClusterSerializedRestore) instance.getUserProperties().get(
-                                        WebsocketSessionClusterSerializedRestore.WEBSOCKET_SESSION_SERIALIZED_RESTORE);
-                            if (r != null && r.isDeserialized())
+                        if (ref != null)
+                        {
+                            Session s = ref.get();
+                            if (s != null)
                             {
-                                addOrUpdateSession(r.getChannelToken(), s);
+                                Set<Session> set = s.getOpenSessions();
+
+                                for (Iterator<Session> it = set.iterator(); it.hasNext(); )
+                                {
+                                    Session instance = it.next();
+                                    WebsocketSessionClusterSerializedRestore r =
+                                            (WebsocketSessionClusterSerializedRestore) instance.
+                                                    getUserProperties().get(
+                            WebsocketSessionClusterSerializedRestore.WEBSOCKET_SESSION_SERIALIZED_RESTORE
+                                                    );
+                                    if (r != null && r.isDeserialized())
+                                    {
+                                        addOrUpdateSession(r.getChannelToken(), s);
+                                    }
+                                }
+
+                                // Remove one element from the queue
+                                queue.poll();
                             }
                         }
-                        
-                        // Remove one element from the queue
-                        queue.poll();
                     }
-                }
+                );
             }
         }
     }
@@ -249,5 +389,39 @@ public class WebsocketSessionManager
     {
         return restoreQueue;
     }
+
+
+    private class UserChannelKey implements Serializable
+    {
+
+        private final Serializable user;
+        private final String channel;
+        public UserChannelKey(Serializable user, String channel)
+        {
+            this.user = user;
+            this.channel = channel;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+            UserChannelKey that = (UserChannelKey) o;
+            return Objects.equals(user, that.user) && Objects.equals(channel, that.channel);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(user, channel);
+        }
+    }
         
 }