You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by be...@apache.org on 2010/03/13 00:04:58 UTC

svn commit: r922450 - in /mina/sandbox/vysper/trunk/server/core/src: main/java/org/apache/vysper/xmpp/delivery/failure/ main/java/org/apache/vysper/xmpp/delivery/inbound/ main/java/org/apache/vysper/xmpp/server/ main/java/org/apache/vysper/xmpp/state/r...

Author: berndf
Date: Fri Mar 12 23:04:58 2010
New Revision: 922450

URL: http://svn.apache.org/viewvc?rev=922450&view=rev
Log:
VYSPER-179: relay message to either only highest prio sessions, or to all non-negative sessions (default), see http://xmpp.org/internet-drafts/draft-ietf-xmpp-3921bis-05.html#rules-barejid-resource-message
includes improvements in delivery error handling

Modified:
    mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java
    mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java
    mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java
    mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java
    mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java
    mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java
    mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java
    mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java

Modified: mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java Fri Mar 12 23:04:58 2010
@@ -22,6 +22,8 @@ package org.apache.vysper.xmpp.delivery.
 import org.apache.vysper.xmpp.stanza.Stanza;
 import org.apache.vysper.xmpp.delivery.failure.DeliveryException;
 
+import java.util.List;
+
 /**
  * there are many reasons why a stanza may fail to deliver: remote server not answering, local addressee has
  * become unavailable, the server has no more resources to process etc.
@@ -39,6 +41,6 @@ public interface DeliveryFailureStrategy
      * @param deliveryException - optional: exception which occured during the failed delivery
      * @throws org.apache.vysper.xmpp.delivery.failure.DeliveryException - exception which occured during failure strategy execution.
      */
-    public void process(Stanza failedToDeliverStanza, DeliveryException deliveryException) throws DeliveryException;
+    public void process(Stanza failedToDeliverStanza, List<DeliveryException> deliveryException) throws DeliveryException;
 
 }

Modified: mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java Fri Mar 12 23:04:58 2010
@@ -19,10 +19,10 @@
  */
 package org.apache.vysper.xmpp.delivery.failure;
 
-import org.apache.vysper.xmpp.delivery.failure.DeliveryFailureStrategy;
-import org.apache.vysper.xmpp.delivery.failure.DeliveryException;
 import org.apache.vysper.xmpp.stanza.Stanza;
 
+import java.util.List;
+
 /**
  *
  * @author The Apache MINA Project (dev@mina.apache.org)
@@ -31,7 +31,7 @@ public class IgnoreFailureStrategy imple
 
     public final static IgnoreFailureStrategy IGNORE_FAILURE_STRATEGY = new IgnoreFailureStrategy();
 
-    public void process(Stanza failedToDeliverStanza, DeliveryException deliveryException) {
+    public void process(Stanza failedToDeliverStanza, List<DeliveryException> deliveryException) throws DeliveryException {
         // do nothing
     }
 }

Modified: mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java Fri Mar 12 23:04:58 2010
@@ -26,6 +26,9 @@ import static org.apache.vysper.xmpp.sta
 import org.apache.vysper.xmpp.addressing.Entity;
 import org.apache.vysper.compliance.SpecCompliance;
 import org.apache.vysper.compliance.SpecCompliant;
+
+import java.util.List;
+
 import static org.apache.vysper.compliance.SpecCompliant.ComplianceStatus.*;
 import static org.apache.vysper.compliance.SpecCompliant.ComplianceCoverage.*;
 
@@ -46,7 +49,7 @@ public class ReturnErrorToSenderFailureS
         @SpecCompliant(spec="rfc3921bis-08", section = "8.3.2", status = NOT_STARTED, coverage = UNKNOWN),
         @SpecCompliant(spec="rfc3921bis-08", section = "4.3", status = NOT_STARTED, coverage = UNKNOWN)
     })
-    public void process(Stanza failedToDeliverStanza, DeliveryException deliveryException) throws DeliveryException {
+    public void process(Stanza failedToDeliverStanza, List<DeliveryException> deliveryExceptions) throws DeliveryException {
 
         StanzaErrorCondition stanzaErrorCondition = StanzaErrorCondition.SERVICE_UNAVAILABLE;
         StanzaErrorType errorType = StanzaErrorType.CANCEL;
@@ -59,7 +62,11 @@ public class ReturnErrorToSenderFailureS
             return; // do not answer these
         }
 
-        if (deliveryException != null) {
+        if (deliveryExceptions == null) {
+            XMPPCoreStanza error = XMPPCoreStanza.getWrapper(ServerErrorResponses.getInstance().getStanzaError(stanzaErrorCondition, failedCoreStanza, errorType, "stanza could not be delivered", "en", null));
+            stanzaRelay.relay(error.getTo(), error, IgnoreFailureStrategy.IGNORE_FAILURE_STRATEGY);
+        } else if (deliveryExceptions.size() == 1) {
+            DeliveryException deliveryException = deliveryExceptions.get(0);
             if (deliveryException instanceof LocalRecipientOfflineException) {
                 // TODO implement 8.2.3 here
                 stanzaErrorCondition = StanzaErrorCondition.RECIPIENT_UNAVAILABLE;
@@ -73,11 +80,11 @@ public class ReturnErrorToSenderFailureS
                 if (failedCoreStanza instanceof PresenceStanza) {
                     final PresenceStanzaType presenceStanzaType = ((PresenceStanza) failedCoreStanza).getPresenceType();
                     if (presenceStanzaType == null ||
-                        presenceStanzaType == SUBSCRIBED ||
-                        presenceStanzaType == UNSUBSCRIBE ||
-                        presenceStanzaType == UNSUBSCRIBED ||
-                        presenceStanzaType == UNAVAILABLE ||
-                        presenceStanzaType == ERROR) {
+                            presenceStanzaType == SUBSCRIBED ||
+                            presenceStanzaType == UNSUBSCRIBE ||
+                            presenceStanzaType == UNSUBSCRIBED ||
+                            presenceStanzaType == UNAVAILABLE ||
+                            presenceStanzaType == ERROR) {
                         return; // silently ignore
                     }
                     // TODO what happens with PROBE? 8.1 is silent here, but see 4.3
@@ -92,9 +99,8 @@ public class ReturnErrorToSenderFailureS
                     }
                 }
             }
+        } else if (deliveryExceptions.size() > 1) {
+            throw new RuntimeException("cannot return to sender for multiple failed deliveries");
         }
-
-        XMPPCoreStanza error = XMPPCoreStanza.getWrapper(ServerErrorResponses.getInstance().getStanzaError(stanzaErrorCondition, failedCoreStanza, errorType, "stanza could not be delivered", "en", null));
-        stanzaRelay.relay(error.getTo(), error, IgnoreFailureStrategy.IGNORE_FAILURE_STRATEGY);
     }
 }

Modified: mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java Fri Mar 12 23:04:58 2010
@@ -20,9 +20,7 @@
 package org.apache.vysper.xmpp.delivery.inbound;
 
 import org.apache.vysper.storage.StorageProviderRegistry;
-import org.apache.vysper.storage.jcr.JcrStorageProviderRegistry;
 import org.apache.vysper.xmpp.addressing.Entity;
-import org.apache.vysper.xmpp.addressing.EntityImpl;
 import org.apache.vysper.xmpp.authorization.AccountManagement;
 import org.apache.vysper.xmpp.protocol.SessionStateHolder;
 import org.apache.vysper.xmpp.protocol.StanzaHandler;
@@ -50,6 +48,7 @@ import org.apache.vysper.compliance.Spec
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -58,6 +57,7 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * relays all 'incoming' stanzas to internal sessions, acts as a 'stage' by using a ThreadPoolExecutor
@@ -130,14 +130,14 @@ public class DeliveringInboundStanzaRela
 
         public RelayResult call() {
             RelayResult relayResult = deliver();
-            if (relayResult == null || relayResult.isRelayed()) return relayResult;
+            if (relayResult == null || !relayResult.hasProcessingErrors()) return relayResult;
             return runFailureStrategy(relayResult);
         }
 
         private RelayResult runFailureStrategy(RelayResult relayResult) {
             if (deliveryFailureStrategy != null) {
                 try {
-                    deliveryFailureStrategy.process(stanza, relayResult.getProcessingError());
+                    deliveryFailureStrategy.process(stanza, relayResult.getProcessingErrors());
                 } catch (DeliveryException e) {
                     return new RelayResult(e);
                 } catch (RuntimeException e) {
@@ -196,7 +196,8 @@ public class DeliveringInboundStanzaRela
                 switch (messageStanzaType) {
                     case CHAT:
                     case NORMAL:
-                        return relayToBestSession(false);
+                        return serverRuntimeContext.getServerFeatures().isDeliveringMessageToHighestPriorityResourcesOnly() ?
+                                relayToBestSessions(false) : relayToAllSessions(0);
                     
                     case ERROR:
                         // silently ignore
@@ -229,7 +230,7 @@ public class DeliveringInboundStanzaRela
                 // TODO cannot deliver presence with type  AVAIL or UNAVAIL: silently ignore
                 // TODO cannot deliver presence with type  SUBSCRIBE: see 3921bis section 3.1.3
                 // TODO cannot deliver presence with type  (UN)SUBSCRIBED, UNSUBSCRIBE: silently ignore
-                return relayToBestSession(false);
+                return relayToBestSessions(false);
             } else if (MessageStanza.isOfType(stanza)) {
                 MessageStanza messageStanza = (MessageStanza)xmppStanza;
                 MessageStanzaType messageStanzaType = messageStanza.getMessageType();
@@ -238,11 +239,11 @@ public class DeliveringInboundStanzaRela
                                                    messageStanzaType == MessageStanzaType.NORMAL;
                 // TODO cannot deliver ERROR: silently ignore
                 // TODO cannot deliver GROUPCHAT: service n/a
-                return relayToBestSession(fallbackToBareJIDAllowed);
+                return relayToBestSessions(fallbackToBareJIDAllowed);
 
             } else if (IQStanza.isOfType(stanza)) {
                 // TODO no resource matches: service n/a
-                return relayToBestSession(false);
+                return relayToBestSessions(false);
             }
  
             // for any other type of stanza 
@@ -262,54 +263,67 @@ public class DeliveringInboundStanzaRela
             }
         }
 
-        protected RelayResult relayToBestSession(final boolean fallbackToBareJIDAllowed) {
-            SessionContext receivingSession = resourceRegistry.getHighestPrioSession(receiver, PRIO_THRESHOLD);
+        protected RelayResult relayToBestSessions(final boolean fallbackToBareJIDAllowed) {
+            List<SessionContext> receivingSessions = resourceRegistry.getHighestPrioSessions(receiver, PRIO_THRESHOLD);
 
-            if (receivingSession == null && receiver.isResourceSet() && fallbackToBareJIDAllowed) {
+            if (receivingSessions.size() == 0 && receiver.isResourceSet() && fallbackToBareJIDAllowed) {
                 // no concrete session for this resource has been found
                 // fall back to bare JID
-                receivingSession = resourceRegistry.getHighestPrioSession(receiver.getBareJID(), PRIO_THRESHOLD);
+                receivingSessions = resourceRegistry.getHighestPrioSessions(receiver.getBareJID(), PRIO_THRESHOLD);
             }
 
-            if (receivingSession == null) {
+            if (receivingSessions.size() == 0) {
                 return relayNotPossible();
             }
-            
-            if (receivingSession.getState() != SessionState.AUTHENTICATED) {
-                return new RelayResult(new DeliveryException("no relay to non-authenticated sessions"));
-            }
-            try {
-                StanzaHandler stanzaHandler = receivingSession.getServerRuntimeContext().getHandler(stanza);
-                INBOUND_STANZA_PROTOCOL_WORKER.processStanza(receivingSession, sessionStateHolder, stanza, stanzaHandler);
-            } catch (Exception e) {
-                return new RelayResult(new DeliveryException(e));
+
+            RelayResult relayResult = new RelayResult();
+            for (SessionContext receivingSession : receivingSessions) {
+                if (receivingSession.getState() != SessionState.AUTHENTICATED) {
+                    relayResult.addProcessingError(new DeliveryException("no relay to non-authenticated sessions"));
+                    continue;
+                }
+                try {
+                    StanzaHandler stanzaHandler = receivingSession.getServerRuntimeContext().getHandler(stanza);
+                    INBOUND_STANZA_PROTOCOL_WORKER.processStanza(receivingSession, sessionStateHolder, stanza, stanzaHandler);
+                } catch (Exception e) {
+                    relayResult.addProcessingError(new DeliveryException("no relay to non-authenticated sessions"));
+                    continue;
+                }
+
             }
-            return new RelayResult(); // return success result
+            return relayResult;
         }
         
         protected RelayResult relayToAllSessions() {
-            // the individual results are currently only recorded pro forma
-            List<RelayResult> relayResults = new ArrayList<RelayResult>();
-            
-            List<SessionContext> receivingSessions = resourceRegistry.getSessions(receiver);
+            return relayToAllSessions(null);
+        }
+
+        protected RelayResult relayToAllSessions(Integer prioThreshold) {
+
+            List<SessionContext> receivingSessions = prioThreshold == null ?
+                                                            resourceRegistry.getSessions(receiver) :
+                                                            resourceRegistry.getSessions(receiver, 0);
 
             if (receivingSessions.size() > 1) {
                  logger.warn("multiplexing: {} sessions will be processing {} ", receivingSessions.size(), stanza);
              }
+
+            RelayResult relayResult = new RelayResult();
+
              for (SessionContext sessionContext : receivingSessions) {
                  if (sessionContext.getState() != SessionState.AUTHENTICATED) {
-                     relayResults.add(new RelayResult(new DeliveryException("no relay to non-authenticated sessions")));
+                     relayResult.addProcessingError(new DeliveryException("no relay to non-authenticated sessions"));
                      continue;
                  }
                  try {
                      StanzaHandler stanzaHandler = sessionContext.getServerRuntimeContext().getHandler(stanza);
                      INBOUND_STANZA_PROTOCOL_WORKER.processStanza(sessionContext, sessionStateHolder, stanza, stanzaHandler);
                  } catch (Exception e) {
-                     relayResults.add(new RelayResult(new DeliveryException(e)));
+                     relayResult.addProcessingError(new DeliveryException(e));
                  }
              }
-                
-             return new RelayResult(); // return success result
+
+            return relayResult; // return success result
          }
     }
 
@@ -327,24 +341,36 @@ public class DeliveringInboundStanzaRela
     }
 
     private static class RelayResult {
-        private DeliveryException processingError;
-        private boolean relayed;
+        private List<DeliveryException> processingErrors = null;
+        private AtomicInteger relayed = new AtomicInteger(0);
 
-        public RelayResult(DeliveryException processingError) {
-            this.processingError = processingError;
-            this.relayed = false;
+        public RelayResult() {
+            // empty
         }
 
-        public RelayResult() {
-            this.relayed = true;
+        public RelayResult(DeliveryException processingError) {
+            addProcessingError(processingError);
         }
 
-        public DeliveryException getProcessingError() {
-            return processingError;
+        /*package*/ void addProcessingError(DeliveryException processingError) {
+            if (processingError == null) processingErrors = new ArrayList<DeliveryException>();
+            processingErrors.add(processingError);
         }
 
         public boolean isRelayed() {
-            return relayed;
+            return relayed.get() > 0;
+        }
+
+        public List<DeliveryException> getProcessingErrors() {
+            if (processingErrors == null) {
+                return Collections.<DeliveryException>emptyList();
+            } else {
+                return Collections.unmodifiableList(processingErrors);
+            }
+        }
+
+        public boolean hasProcessingErrors() {
+            return processingErrors != null && processingErrors.size() > 0;
         }
     }
 

Modified: mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java Fri Mar 12 23:04:58 2010
@@ -56,6 +56,16 @@ public class ServerFeatures {
      */
     private int authenticationRetries = 3;
 
+    /**
+     * decide to which client's resources a message will be delivered:
+     * (a) to the highest-priority available resource(s)
+     * (b) to all available resources with non-negative presence priority 
+     *
+     * see http://xmpp.org/internet-drafts/draft-ietf-xmpp-3921bis-05.html#rules-barejid-resource-message
+     *
+     */
+    private boolean deliverMessageToHighestPriorityResourcesOnly = false; 
+
     public ServerFeatures() {
         // default constructor
     }
@@ -103,4 +113,12 @@ public class ServerFeatures {
     public void setRelayingToFederationServers(boolean relayToFederationServers) {
         this.relayToFederationServers = relayToFederationServers;
     }
+
+    public boolean isDeliveringMessageToHighestPriorityResourcesOnly() {
+        return deliverMessageToHighestPriorityResourcesOnly;
+    }
+
+    public void setDeliverMessageToHighestPriorityResourcesOnly(boolean deliverMessageToHighestPriorityResourcesOnly) {
+        this.deliverMessageToHighestPriorityResourcesOnly = deliverMessageToHighestPriorityResourcesOnly;
+    }
 }

Modified: mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java Fri Mar 12 23:04:58 2010
@@ -278,21 +278,44 @@ public class ResourceRegistry {
 		return sessionContexts;
 	}
 
+    /**
+     * retrieves sessions with same or above threshold
+     *
+     * @param entity all session for the bare jid will be considered.
+     * @param prioThreshold only resources will be returned having same or higher priority. a common value
+     * for the threshold is 0 (zero), which is also the default when param is NULL.
+     * @return returns the sessions matching the given JID (bare) with same or higher priority
+     */
+    public List<SessionContext> getSessions(Entity entity, Integer prioThreshold) {
+        if (prioThreshold == null) prioThreshold = 0;
+        List<SessionContext> results = new ArrayList<SessionContext>();
+
+        List<String> boundResourceIds = getBoundResources(entity, true);
+		for (String resourceId : boundResourceIds) {
+            SessionData sessionData = boundResources.get(resourceId);
+            if (sessionData == null) continue;
+
+            if (sessionData.priority >= prioThreshold) {
+                results.add(sessionData.context);
+            }
+        }
+        return results;
+	}
     
     /**
-     * retrieves the highest priorizes session for this entity. 
+     * retrieves the highest prioritized session(s) for this entity.
      * 
      * @param entity if this is not a bare JID, only the session for the JID's resource part will be returned, without
-     * looking at other sessions for the resource's bare JID. otherwise, in case of a full JID, it will return the 
-     * highest priorized session.
-     * @param prioThreshold if not NULL, only resources will be returned having same or higher priority. a common value 
-     * for the threshold is 0 (zero). 
-     * @return for a bare JID, it will return the hightest priorized session. for a full JID, it will return the 
+     * looking at other sessions for the resource's bare JID. otherwise, in case of a full JID, it will return the
+     * highest prioritized sessions.
+     * @param prioThreshold if not NULL, only resources will be returned having same or higher priority. a common value
+     * for the threshold is 0 (zero).
+     * @return for a bare JID, it will return the highest prioritized sessions. for a full JID, it will return the
      * related session.
      */
-    public SessionContext getHighestPrioSession(Entity entity, Integer prioThreshold) {
-        Integer currentPrio = Integer.MIN_VALUE;
-        SessionData result = null;
+    public List<SessionContext> getHighestPrioSessions(Entity entity, Integer prioThreshold) {
+        Integer currentPrio = prioThreshold == null ? Integer.MIN_VALUE : prioThreshold;
+        List<SessionContext> results = new ArrayList<SessionContext>();
 
         boolean isResourceSet = entity.isResourceSet();
 
@@ -301,20 +324,24 @@ public class ResourceRegistry {
             SessionData sessionData = boundResources.get(resourceId);
             if (sessionData == null) continue;
             
-            if (isResourceSet) return sessionData.context; // no prio checks, take the first proper one
+            if (isResourceSet) {
+                // if resource id matches, there can only be one result
+                // this overrides even parameter prio threshold
+                results.clear();
+                results.add(sessionData.context);
+                return results;
+            }
             
             if (sessionData.priority > currentPrio) {
+                results.clear(); // discard all accumulated lower prio sessions
                 currentPrio = sessionData.priority;
-                result = sessionData;
+                results.add(sessionData.context);
+            } else if(sessionData.priority.intValue() == currentPrio.intValue()) {
+                results.add(sessionData.context);
             }
         }
 
-        if (prioThreshold != null && prioThreshold > currentPrio) {
-            // no session over threshold 
-            return null;
-        }
-
-		return result == null ? null : result.context;
+        return results;
 	}
 
     /**

Modified: mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java Fri Mar 12 23:04:58 2010
@@ -29,10 +29,12 @@ import org.apache.vysper.xmpp.authorizat
 import org.apache.vysper.xmpp.authorization.AccountManagement;
 import org.apache.vysper.xmpp.delivery.failure.DeliveryException;
 import org.apache.vysper.xmpp.delivery.failure.IgnoreFailureStrategy;
+import org.apache.vysper.xmpp.server.DefaultServerRuntimeContext;
 import org.apache.vysper.xmpp.server.SessionState;
 import org.apache.vysper.xmpp.server.TestSessionContext;
 import org.apache.vysper.xmpp.stanza.Stanza;
 import org.apache.vysper.xmpp.stanza.StanzaBuilder;
+import org.apache.vysper.xmpp.state.resourcebinding.BindException;
 import org.apache.vysper.xmpp.state.resourcebinding.ResourceRegistry;
 
 /**
@@ -98,4 +100,86 @@ public class DeliveringStanzaRelayTestCa
             throw e;
         }
     }
+
+    public void testRelayToTwoRecepients_DeliverToALL() throws EntityFormatException, XMLSemanticError, DeliveryException, BindException {
+        DefaultServerRuntimeContext serverRuntimeContext = new DefaultServerRuntimeContext(null, null);
+        
+        // !! DeliverMessageToHighestPriorityResourcesOnly = FALSE
+        serverRuntimeContext.getServerFeatures().setDeliverMessageToHighestPriorityResourcesOnly(false);
+
+        stanzaRelay.setServerRuntimeContext(serverRuntimeContext);
+
+        EntityImpl fromEntity = EntityImpl.parse("userFrom@vysper.org");
+
+        EntityImpl toEntity = EntityImpl.parse("userTo@vysper.org");
+
+
+        TestSessionContext sessionContextToEntity_1_prio3 = createSessionForTo(toEntity, 3); // NON-NEGATIVE
+        TestSessionContext sessionContextToEntity_2_prio0 = createSessionForTo(toEntity, 0); // NON-NEGATIVE
+        TestSessionContext sessionContextToEntity_3_prio3 = createSessionForTo(toEntity, 3); // NON-NEGATIVE
+        TestSessionContext sessionContextToEntity_4_prioMinus = createSessionForTo(toEntity, -1); // not receiving, negative
+
+        Stanza stanza = StanzaBuilder.createMessageStanza(fromEntity, toEntity, "en", "Hello").build();
+
+        try {
+            stanzaRelay.relay(toEntity, stanza, new IgnoreFailureStrategy());
+            Stanza recordedStanza_1 = sessionContextToEntity_1_prio3.getNextRecordedResponse(100);
+            assertNotNull("stanza 1 delivered", recordedStanza_1);
+            Stanza recordedStanza_2 = sessionContextToEntity_2_prio0.getNextRecordedResponse(100);
+            assertNotNull("stanza 2 delivered", recordedStanza_2);
+            Stanza recordedStanza_3 = sessionContextToEntity_3_prio3.getNextRecordedResponse(100);
+            assertNotNull("stanza 3 delivered", recordedStanza_3);
+            Stanza recordedStanza_4 = sessionContextToEntity_4_prioMinus.getNextRecordedResponse(100);
+            assertNull("stanza 4 delivered", recordedStanza_4);
+        } catch (DeliveryException e) {
+            throw e;
+        }
+
+    }
+
+    public void testRelayToTwoRecepients_DeliverToHIGHEST() throws EntityFormatException, XMLSemanticError, DeliveryException, BindException {
+        DefaultServerRuntimeContext serverRuntimeContext = new DefaultServerRuntimeContext(null, null);
+
+        // !! DeliverMessageToHighestPriorityResourcesOnly = TRUE
+        serverRuntimeContext.getServerFeatures().setDeliverMessageToHighestPriorityResourcesOnly(true);
+
+        stanzaRelay.setServerRuntimeContext(serverRuntimeContext);
+
+        EntityImpl fromEntity = EntityImpl.parse("userFrom@vysper.org");
+
+        EntityImpl toEntity = EntityImpl.parse("userTo@vysper.org");
+
+
+        TestSessionContext sessionContextToEntity_1_prio3 = createSessionForTo(toEntity, 3); // HIGHEST PRIO
+        TestSessionContext sessionContextToEntity_2_prio0 = createSessionForTo(toEntity, 1); // not receiving
+        TestSessionContext sessionContextToEntity_3_prio3 = createSessionForTo(toEntity, 3); // HIGHEST PRIO
+        TestSessionContext sessionContextToEntity_4_prioMinus = createSessionForTo(toEntity, -1); // not receiving
+
+        Stanza stanza = StanzaBuilder.createMessageStanza(fromEntity, toEntity, "en", "Hello").build();
+
+        try {
+            stanzaRelay.relay(toEntity, stanza, new IgnoreFailureStrategy());
+            Stanza recordedStanza_1 = sessionContextToEntity_1_prio3.getNextRecordedResponse(100);
+            assertNotNull("stanza 1 delivered", recordedStanza_1);
+            Stanza recordedStanza_2 = sessionContextToEntity_2_prio0.getNextRecordedResponse(100);
+            assertNull("stanza 2 not delivered", recordedStanza_2);
+            Stanza recordedStanza_3 = sessionContextToEntity_3_prio3.getNextRecordedResponse(100);
+            assertNotNull("stanza 3 delivered", recordedStanza_3);
+            Stanza recordedStanza_4 = sessionContextToEntity_4_prioMinus.getNextRecordedResponse(100);
+            assertNull("stanza 4 not delivered", recordedStanza_4);
+        } catch (DeliveryException e) {
+            throw e;
+        }
+
+    }
+
+    private TestSessionContext createSessionForTo(EntityImpl toEntity, final int priority) {
+        TestSessionContext sessionContextToEntity = TestSessionContext.createSessionContext(toEntity);
+        sessionContextToEntity.setSessionState(SessionState.AUTHENTICATED);
+        String toEntityRes = resourceRegistry.bindSession(sessionContextToEntity);
+        resourceRegistry.setResourcePriority(toEntityRes, priority);
+        return sessionContextToEntity;
+    }
+
+
 }

Modified: mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java Fri Mar 12 23:04:58 2010
@@ -93,20 +93,37 @@ public class ResourceRegistryTestCase ex
         assertEquals(2, resourceList.size());
         assertTrue(sessionList.contains(sessionContext1));
         assertTrue(sessionList.contains(sessionContext2));
-        
-        SessionContext hightestPrioSession = resourceRegistry.getHighestPrioSession(entity, null);
-        assertSame(resourceRegistry.getSessionContext(resourceId2), hightestPrioSession);
-        
+
+        List<SessionContext> highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, null);
+        assertEquals(1, highestPrioSessions.size());
+        SessionContext highestPrioSession = highestPrioSessions.get(0);
+        assertSame(resourceRegistry.getSessionContext(resourceId2), highestPrioSession);
+
         resourceRegistry.setResourcePriority(resourceId1, 2); // make this highes prio
-        hightestPrioSession = resourceRegistry.getHighestPrioSession(entity, null);
-        assertSame(resourceRegistry.getSessionContext(resourceId1), hightestPrioSession);
+        highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, null);
+        assertEquals(1, highestPrioSessions.size());
+        highestPrioSession = highestPrioSessions.get(0);
+        assertSame(resourceRegistry.getSessionContext(resourceId1), highestPrioSession);
+
+        highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 2); // still highest prio
+        assertEquals(1, highestPrioSessions.size());
+        highestPrioSession = highestPrioSessions.get(0);
+        assertSame(resourceRegistry.getSessionContext(resourceId1), highestPrioSession);
+        
+        highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 3); // now, all prios are below threshold
+        assertEquals(0, highestPrioSessions.size());
+
+        resourceRegistry.setResourcePriority(resourceId1, 4); // both are same
+        resourceRegistry.setResourcePriority(resourceId2, 4); // both are same
+        highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 3);
+        assertEquals(2, highestPrioSessions.size());
+        highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 4);
+        assertEquals(2, highestPrioSessions.size());
+        highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 5);
+        assertEquals(0, highestPrioSessions.size());
+
+        sessionContext1.getServerRuntimeContext().getServerFeatures().setDeliverMessageToHighestPriorityResourcesOnly(false);
 
-        hightestPrioSession = resourceRegistry.getHighestPrioSession(entity, 2); // still highest prio
-        assertSame(resourceRegistry.getSessionContext(resourceId1), hightestPrioSession);
-        
-        hightestPrioSession = resourceRegistry.getHighestPrioSession(entity, 3); // now, all prios are below threshold
-        assertNull(hightestPrioSession);
-        
     }
 
     public void testAddOneEntityMultipleResources_TolerateResourceIds() throws EntityFormatException {