You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by be...@apache.org on 2010/03/13 00:04:58 UTC
svn commit: r922450 - in /mina/sandbox/vysper/trunk/server/core/src:
main/java/org/apache/vysper/xmpp/delivery/failure/
main/java/org/apache/vysper/xmpp/delivery/inbound/
main/java/org/apache/vysper/xmpp/server/
main/java/org/apache/vysper/xmpp/state/r...
Author: berndf
Date: Fri Mar 12 23:04:58 2010
New Revision: 922450
URL: http://svn.apache.org/viewvc?rev=922450&view=rev
Log:
VYSPER-179: relay message to either only highest prio sessions, or to all non-negative sessions (default), see http://xmpp.org/internet-drafts/draft-ietf-xmpp-3921bis-05.html#rules-barejid-resource-message
includes improvements in delivery error handling
Modified:
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java
mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java
mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java
Modified: mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/DeliveryFailureStrategy.java Fri Mar 12 23:04:58 2010
@@ -22,6 +22,8 @@ package org.apache.vysper.xmpp.delivery.
import org.apache.vysper.xmpp.stanza.Stanza;
import org.apache.vysper.xmpp.delivery.failure.DeliveryException;
+import java.util.List;
+
/**
* there are many reasons why a stanza may fail to deliver: remote server not answering, local addressee has
* become unavailable, the server has no more resources to process etc.
@@ -39,6 +41,6 @@ public interface DeliveryFailureStrategy
* @param deliveryException - optional: exception which occured during the failed delivery
* @throws org.apache.vysper.xmpp.delivery.failure.DeliveryException - exception which occured during failure strategy execution.
*/
- public void process(Stanza failedToDeliverStanza, DeliveryException deliveryException) throws DeliveryException;
+ public void process(Stanza failedToDeliverStanza, List<DeliveryException> deliveryException) throws DeliveryException;
}
Modified: mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/IgnoreFailureStrategy.java Fri Mar 12 23:04:58 2010
@@ -19,10 +19,10 @@
*/
package org.apache.vysper.xmpp.delivery.failure;
-import org.apache.vysper.xmpp.delivery.failure.DeliveryFailureStrategy;
-import org.apache.vysper.xmpp.delivery.failure.DeliveryException;
import org.apache.vysper.xmpp.stanza.Stanza;
+import java.util.List;
+
/**
*
* @author The Apache MINA Project (dev@mina.apache.org)
@@ -31,7 +31,7 @@ public class IgnoreFailureStrategy imple
public final static IgnoreFailureStrategy IGNORE_FAILURE_STRATEGY = new IgnoreFailureStrategy();
- public void process(Stanza failedToDeliverStanza, DeliveryException deliveryException) {
+ public void process(Stanza failedToDeliverStanza, List<DeliveryException> deliveryException) throws DeliveryException {
// do nothing
}
}
Modified: mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/failure/ReturnErrorToSenderFailureStrategy.java Fri Mar 12 23:04:58 2010
@@ -26,6 +26,9 @@ import static org.apache.vysper.xmpp.sta
import org.apache.vysper.xmpp.addressing.Entity;
import org.apache.vysper.compliance.SpecCompliance;
import org.apache.vysper.compliance.SpecCompliant;
+
+import java.util.List;
+
import static org.apache.vysper.compliance.SpecCompliant.ComplianceStatus.*;
import static org.apache.vysper.compliance.SpecCompliant.ComplianceCoverage.*;
@@ -46,7 +49,7 @@ public class ReturnErrorToSenderFailureS
@SpecCompliant(spec="rfc3921bis-08", section = "8.3.2", status = NOT_STARTED, coverage = UNKNOWN),
@SpecCompliant(spec="rfc3921bis-08", section = "4.3", status = NOT_STARTED, coverage = UNKNOWN)
})
- public void process(Stanza failedToDeliverStanza, DeliveryException deliveryException) throws DeliveryException {
+ public void process(Stanza failedToDeliverStanza, List<DeliveryException> deliveryExceptions) throws DeliveryException {
StanzaErrorCondition stanzaErrorCondition = StanzaErrorCondition.SERVICE_UNAVAILABLE;
StanzaErrorType errorType = StanzaErrorType.CANCEL;
@@ -59,7 +62,11 @@ public class ReturnErrorToSenderFailureS
return; // do not answer these
}
- if (deliveryException != null) {
+ if (deliveryExceptions == null) {
+ XMPPCoreStanza error = XMPPCoreStanza.getWrapper(ServerErrorResponses.getInstance().getStanzaError(stanzaErrorCondition, failedCoreStanza, errorType, "stanza could not be delivered", "en", null));
+ stanzaRelay.relay(error.getTo(), error, IgnoreFailureStrategy.IGNORE_FAILURE_STRATEGY);
+ } else if (deliveryExceptions.size() == 1) {
+ DeliveryException deliveryException = deliveryExceptions.get(0);
if (deliveryException instanceof LocalRecipientOfflineException) {
// TODO implement 8.2.3 here
stanzaErrorCondition = StanzaErrorCondition.RECIPIENT_UNAVAILABLE;
@@ -73,11 +80,11 @@ public class ReturnErrorToSenderFailureS
if (failedCoreStanza instanceof PresenceStanza) {
final PresenceStanzaType presenceStanzaType = ((PresenceStanza) failedCoreStanza).getPresenceType();
if (presenceStanzaType == null ||
- presenceStanzaType == SUBSCRIBED ||
- presenceStanzaType == UNSUBSCRIBE ||
- presenceStanzaType == UNSUBSCRIBED ||
- presenceStanzaType == UNAVAILABLE ||
- presenceStanzaType == ERROR) {
+ presenceStanzaType == SUBSCRIBED ||
+ presenceStanzaType == UNSUBSCRIBE ||
+ presenceStanzaType == UNSUBSCRIBED ||
+ presenceStanzaType == UNAVAILABLE ||
+ presenceStanzaType == ERROR) {
return; // silently ignore
}
// TODO what happens with PROBE? 8.1 is silent here, but see 4.3
@@ -92,9 +99,8 @@ public class ReturnErrorToSenderFailureS
}
}
}
+ } else if (deliveryExceptions.size() > 1) {
+ throw new RuntimeException("cannot return to sender for multiple failed deliveries");
}
-
- XMPPCoreStanza error = XMPPCoreStanza.getWrapper(ServerErrorResponses.getInstance().getStanzaError(stanzaErrorCondition, failedCoreStanza, errorType, "stanza could not be delivered", "en", null));
- stanzaRelay.relay(error.getTo(), error, IgnoreFailureStrategy.IGNORE_FAILURE_STRATEGY);
}
}
Modified: mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringInboundStanzaRelay.java Fri Mar 12 23:04:58 2010
@@ -20,9 +20,7 @@
package org.apache.vysper.xmpp.delivery.inbound;
import org.apache.vysper.storage.StorageProviderRegistry;
-import org.apache.vysper.storage.jcr.JcrStorageProviderRegistry;
import org.apache.vysper.xmpp.addressing.Entity;
-import org.apache.vysper.xmpp.addressing.EntityImpl;
import org.apache.vysper.xmpp.authorization.AccountManagement;
import org.apache.vysper.xmpp.protocol.SessionStateHolder;
import org.apache.vysper.xmpp.protocol.StanzaHandler;
@@ -50,6 +48,7 @@ import org.apache.vysper.compliance.Spec
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -58,6 +57,7 @@ import java.util.concurrent.ThreadPoolEx
import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* relays all 'incoming' stanzas to internal sessions, acts as a 'stage' by using a ThreadPoolExecutor
@@ -130,14 +130,14 @@ public class DeliveringInboundStanzaRela
public RelayResult call() {
RelayResult relayResult = deliver();
- if (relayResult == null || relayResult.isRelayed()) return relayResult;
+ if (relayResult == null || !relayResult.hasProcessingErrors()) return relayResult;
return runFailureStrategy(relayResult);
}
private RelayResult runFailureStrategy(RelayResult relayResult) {
if (deliveryFailureStrategy != null) {
try {
- deliveryFailureStrategy.process(stanza, relayResult.getProcessingError());
+ deliveryFailureStrategy.process(stanza, relayResult.getProcessingErrors());
} catch (DeliveryException e) {
return new RelayResult(e);
} catch (RuntimeException e) {
@@ -196,7 +196,8 @@ public class DeliveringInboundStanzaRela
switch (messageStanzaType) {
case CHAT:
case NORMAL:
- return relayToBestSession(false);
+ return serverRuntimeContext.getServerFeatures().isDeliveringMessageToHighestPriorityResourcesOnly() ?
+ relayToBestSessions(false) : relayToAllSessions(0);
case ERROR:
// silently ignore
@@ -229,7 +230,7 @@ public class DeliveringInboundStanzaRela
// TODO cannot deliver presence with type AVAIL or UNAVAIL: silently ignore
// TODO cannot deliver presence with type SUBSCRIBE: see 3921bis section 3.1.3
// TODO cannot deliver presence with type (UN)SUBSCRIBED, UNSUBSCRIBE: silently ignore
- return relayToBestSession(false);
+ return relayToBestSessions(false);
} else if (MessageStanza.isOfType(stanza)) {
MessageStanza messageStanza = (MessageStanza)xmppStanza;
MessageStanzaType messageStanzaType = messageStanza.getMessageType();
@@ -238,11 +239,11 @@ public class DeliveringInboundStanzaRela
messageStanzaType == MessageStanzaType.NORMAL;
// TODO cannot deliver ERROR: silently ignore
// TODO cannot deliver GROUPCHAT: service n/a
- return relayToBestSession(fallbackToBareJIDAllowed);
+ return relayToBestSessions(fallbackToBareJIDAllowed);
} else if (IQStanza.isOfType(stanza)) {
// TODO no resource matches: service n/a
- return relayToBestSession(false);
+ return relayToBestSessions(false);
}
// for any other type of stanza
@@ -262,54 +263,67 @@ public class DeliveringInboundStanzaRela
}
}
- protected RelayResult relayToBestSession(final boolean fallbackToBareJIDAllowed) {
- SessionContext receivingSession = resourceRegistry.getHighestPrioSession(receiver, PRIO_THRESHOLD);
+ protected RelayResult relayToBestSessions(final boolean fallbackToBareJIDAllowed) {
+ List<SessionContext> receivingSessions = resourceRegistry.getHighestPrioSessions(receiver, PRIO_THRESHOLD);
- if (receivingSession == null && receiver.isResourceSet() && fallbackToBareJIDAllowed) {
+ if (receivingSessions.size() == 0 && receiver.isResourceSet() && fallbackToBareJIDAllowed) {
// no concrete session for this resource has been found
// fall back to bare JID
- receivingSession = resourceRegistry.getHighestPrioSession(receiver.getBareJID(), PRIO_THRESHOLD);
+ receivingSessions = resourceRegistry.getHighestPrioSessions(receiver.getBareJID(), PRIO_THRESHOLD);
}
- if (receivingSession == null) {
+ if (receivingSessions.size() == 0) {
return relayNotPossible();
}
-
- if (receivingSession.getState() != SessionState.AUTHENTICATED) {
- return new RelayResult(new DeliveryException("no relay to non-authenticated sessions"));
- }
- try {
- StanzaHandler stanzaHandler = receivingSession.getServerRuntimeContext().getHandler(stanza);
- INBOUND_STANZA_PROTOCOL_WORKER.processStanza(receivingSession, sessionStateHolder, stanza, stanzaHandler);
- } catch (Exception e) {
- return new RelayResult(new DeliveryException(e));
+
+ RelayResult relayResult = new RelayResult();
+ for (SessionContext receivingSession : receivingSessions) {
+ if (receivingSession.getState() != SessionState.AUTHENTICATED) {
+ relayResult.addProcessingError(new DeliveryException("no relay to non-authenticated sessions"));
+ continue;
+ }
+ try {
+ StanzaHandler stanzaHandler = receivingSession.getServerRuntimeContext().getHandler(stanza);
+ INBOUND_STANZA_PROTOCOL_WORKER.processStanza(receivingSession, sessionStateHolder, stanza, stanzaHandler);
+ } catch (Exception e) {
+ relayResult.addProcessingError(new DeliveryException("no relay to non-authenticated sessions"));
+ continue;
+ }
+
}
- return new RelayResult(); // return success result
+ return relayResult;
}
protected RelayResult relayToAllSessions() {
- // the individual results are currently only recorded pro forma
- List<RelayResult> relayResults = new ArrayList<RelayResult>();
-
- List<SessionContext> receivingSessions = resourceRegistry.getSessions(receiver);
+ return relayToAllSessions(null);
+ }
+
+ protected RelayResult relayToAllSessions(Integer prioThreshold) {
+
+ List<SessionContext> receivingSessions = prioThreshold == null ?
+ resourceRegistry.getSessions(receiver) :
+ resourceRegistry.getSessions(receiver, 0);
if (receivingSessions.size() > 1) {
logger.warn("multiplexing: {} sessions will be processing {} ", receivingSessions.size(), stanza);
}
+
+ RelayResult relayResult = new RelayResult();
+
for (SessionContext sessionContext : receivingSessions) {
if (sessionContext.getState() != SessionState.AUTHENTICATED) {
- relayResults.add(new RelayResult(new DeliveryException("no relay to non-authenticated sessions")));
+ relayResult.addProcessingError(new DeliveryException("no relay to non-authenticated sessions"));
continue;
}
try {
StanzaHandler stanzaHandler = sessionContext.getServerRuntimeContext().getHandler(stanza);
INBOUND_STANZA_PROTOCOL_WORKER.processStanza(sessionContext, sessionStateHolder, stanza, stanzaHandler);
} catch (Exception e) {
- relayResults.add(new RelayResult(new DeliveryException(e)));
+ relayResult.addProcessingError(new DeliveryException(e));
}
}
-
- return new RelayResult(); // return success result
+
+ return relayResult; // return success result
}
}
@@ -327,24 +341,36 @@ public class DeliveringInboundStanzaRela
}
private static class RelayResult {
- private DeliveryException processingError;
- private boolean relayed;
+ private List<DeliveryException> processingErrors = null;
+ private AtomicInteger relayed = new AtomicInteger(0);
- public RelayResult(DeliveryException processingError) {
- this.processingError = processingError;
- this.relayed = false;
+ public RelayResult() {
+ // empty
}
- public RelayResult() {
- this.relayed = true;
+ public RelayResult(DeliveryException processingError) {
+ addProcessingError(processingError);
}
- public DeliveryException getProcessingError() {
- return processingError;
+ /*package*/ void addProcessingError(DeliveryException processingError) {
+ if (processingError == null) processingErrors = new ArrayList<DeliveryException>();
+ processingErrors.add(processingError);
}
public boolean isRelayed() {
- return relayed;
+ return relayed.get() > 0;
+ }
+
+ public List<DeliveryException> getProcessingErrors() {
+ if (processingErrors == null) {
+ return Collections.<DeliveryException>emptyList();
+ } else {
+ return Collections.unmodifiableList(processingErrors);
+ }
+ }
+
+ public boolean hasProcessingErrors() {
+ return processingErrors != null && processingErrors.size() > 0;
}
}
Modified: mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/server/ServerFeatures.java Fri Mar 12 23:04:58 2010
@@ -56,6 +56,16 @@ public class ServerFeatures {
*/
private int authenticationRetries = 3;
+ /**
+ * decide to which client's resources a message will be delivered:
+ * (a) to the highest-priority available resource(s)
+ * (b) to all available resources with non-negative presence priority
+ *
+ * see http://xmpp.org/internet-drafts/draft-ietf-xmpp-3921bis-05.html#rules-barejid-resource-message
+ *
+ */
+ private boolean deliverMessageToHighestPriorityResourcesOnly = false;
+
public ServerFeatures() {
// default constructor
}
@@ -103,4 +113,12 @@ public class ServerFeatures {
public void setRelayingToFederationServers(boolean relayToFederationServers) {
this.relayToFederationServers = relayToFederationServers;
}
+
+ public boolean isDeliveringMessageToHighestPriorityResourcesOnly() {
+ return deliverMessageToHighestPriorityResourcesOnly;
+ }
+
+ public void setDeliverMessageToHighestPriorityResourcesOnly(boolean deliverMessageToHighestPriorityResourcesOnly) {
+ this.deliverMessageToHighestPriorityResourcesOnly = deliverMessageToHighestPriorityResourcesOnly;
+ }
}
Modified: mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/main/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistry.java Fri Mar 12 23:04:58 2010
@@ -278,21 +278,44 @@ public class ResourceRegistry {
return sessionContexts;
}
+ /**
+ * retrieves sessions with same or above threshold
+ *
+ * @param entity all session for the bare jid will be considered.
+ * @param prioThreshold only resources will be returned having same or higher priority. a common value
+ * for the threshold is 0 (zero), which is also the default when param is NULL.
+ * @return returns the sessions matching the given JID (bare) with same or higher priority
+ */
+ public List<SessionContext> getSessions(Entity entity, Integer prioThreshold) {
+ if (prioThreshold == null) prioThreshold = 0;
+ List<SessionContext> results = new ArrayList<SessionContext>();
+
+ List<String> boundResourceIds = getBoundResources(entity, true);
+ for (String resourceId : boundResourceIds) {
+ SessionData sessionData = boundResources.get(resourceId);
+ if (sessionData == null) continue;
+
+ if (sessionData.priority >= prioThreshold) {
+ results.add(sessionData.context);
+ }
+ }
+ return results;
+ }
/**
- * retrieves the highest priorizes session for this entity.
+ * retrieves the highest prioritized session(s) for this entity.
*
* @param entity if this is not a bare JID, only the session for the JID's resource part will be returned, without
- * looking at other sessions for the resource's bare JID. otherwise, in case of a full JID, it will return the
- * highest priorized session.
- * @param prioThreshold if not NULL, only resources will be returned having same or higher priority. a common value
- * for the threshold is 0 (zero).
- * @return for a bare JID, it will return the hightest priorized session. for a full JID, it will return the
+ * looking at other sessions for the resource's bare JID. otherwise, in case of a full JID, it will return the
+ * highest prioritized sessions.
+ * @param prioThreshold if not NULL, only resources will be returned having same or higher priority. a common value
+ * for the threshold is 0 (zero).
+ * @return for a bare JID, it will return the highest prioritized sessions. for a full JID, it will return the
* related session.
*/
- public SessionContext getHighestPrioSession(Entity entity, Integer prioThreshold) {
- Integer currentPrio = Integer.MIN_VALUE;
- SessionData result = null;
+ public List<SessionContext> getHighestPrioSessions(Entity entity, Integer prioThreshold) {
+ Integer currentPrio = prioThreshold == null ? Integer.MIN_VALUE : prioThreshold;
+ List<SessionContext> results = new ArrayList<SessionContext>();
boolean isResourceSet = entity.isResourceSet();
@@ -301,20 +324,24 @@ public class ResourceRegistry {
SessionData sessionData = boundResources.get(resourceId);
if (sessionData == null) continue;
- if (isResourceSet) return sessionData.context; // no prio checks, take the first proper one
+ if (isResourceSet) {
+ // if resource id matches, there can only be one result
+ // this overrides even parameter prio threshold
+ results.clear();
+ results.add(sessionData.context);
+ return results;
+ }
if (sessionData.priority > currentPrio) {
+ results.clear(); // discard all accumulated lower prio sessions
currentPrio = sessionData.priority;
- result = sessionData;
+ results.add(sessionData.context);
+ } else if(sessionData.priority.intValue() == currentPrio.intValue()) {
+ results.add(sessionData.context);
}
}
- if (prioThreshold != null && prioThreshold > currentPrio) {
- // no session over threshold
- return null;
- }
-
- return result == null ? null : result.context;
+ return results;
}
/**
Modified: mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/delivery/inbound/DeliveringStanzaRelayTestCase.java Fri Mar 12 23:04:58 2010
@@ -29,10 +29,12 @@ import org.apache.vysper.xmpp.authorizat
import org.apache.vysper.xmpp.authorization.AccountManagement;
import org.apache.vysper.xmpp.delivery.failure.DeliveryException;
import org.apache.vysper.xmpp.delivery.failure.IgnoreFailureStrategy;
+import org.apache.vysper.xmpp.server.DefaultServerRuntimeContext;
import org.apache.vysper.xmpp.server.SessionState;
import org.apache.vysper.xmpp.server.TestSessionContext;
import org.apache.vysper.xmpp.stanza.Stanza;
import org.apache.vysper.xmpp.stanza.StanzaBuilder;
+import org.apache.vysper.xmpp.state.resourcebinding.BindException;
import org.apache.vysper.xmpp.state.resourcebinding.ResourceRegistry;
/**
@@ -98,4 +100,86 @@ public class DeliveringStanzaRelayTestCa
throw e;
}
}
+
+ public void testRelayToTwoRecepients_DeliverToALL() throws EntityFormatException, XMLSemanticError, DeliveryException, BindException {
+ DefaultServerRuntimeContext serverRuntimeContext = new DefaultServerRuntimeContext(null, null);
+
+ // !! DeliverMessageToHighestPriorityResourcesOnly = FALSE
+ serverRuntimeContext.getServerFeatures().setDeliverMessageToHighestPriorityResourcesOnly(false);
+
+ stanzaRelay.setServerRuntimeContext(serverRuntimeContext);
+
+ EntityImpl fromEntity = EntityImpl.parse("userFrom@vysper.org");
+
+ EntityImpl toEntity = EntityImpl.parse("userTo@vysper.org");
+
+
+ TestSessionContext sessionContextToEntity_1_prio3 = createSessionForTo(toEntity, 3); // NON-NEGATIVE
+ TestSessionContext sessionContextToEntity_2_prio0 = createSessionForTo(toEntity, 0); // NON-NEGATIVE
+ TestSessionContext sessionContextToEntity_3_prio3 = createSessionForTo(toEntity, 3); // NON-NEGATIVE
+ TestSessionContext sessionContextToEntity_4_prioMinus = createSessionForTo(toEntity, -1); // not receiving, negative
+
+ Stanza stanza = StanzaBuilder.createMessageStanza(fromEntity, toEntity, "en", "Hello").build();
+
+ try {
+ stanzaRelay.relay(toEntity, stanza, new IgnoreFailureStrategy());
+ Stanza recordedStanza_1 = sessionContextToEntity_1_prio3.getNextRecordedResponse(100);
+ assertNotNull("stanza 1 delivered", recordedStanza_1);
+ Stanza recordedStanza_2 = sessionContextToEntity_2_prio0.getNextRecordedResponse(100);
+ assertNotNull("stanza 2 delivered", recordedStanza_2);
+ Stanza recordedStanza_3 = sessionContextToEntity_3_prio3.getNextRecordedResponse(100);
+ assertNotNull("stanza 3 delivered", recordedStanza_3);
+ Stanza recordedStanza_4 = sessionContextToEntity_4_prioMinus.getNextRecordedResponse(100);
+ assertNull("stanza 4 delivered", recordedStanza_4);
+ } catch (DeliveryException e) {
+ throw e;
+ }
+
+ }
+
+ public void testRelayToTwoRecepients_DeliverToHIGHEST() throws EntityFormatException, XMLSemanticError, DeliveryException, BindException {
+ DefaultServerRuntimeContext serverRuntimeContext = new DefaultServerRuntimeContext(null, null);
+
+ // !! DeliverMessageToHighestPriorityResourcesOnly = TRUE
+ serverRuntimeContext.getServerFeatures().setDeliverMessageToHighestPriorityResourcesOnly(true);
+
+ stanzaRelay.setServerRuntimeContext(serverRuntimeContext);
+
+ EntityImpl fromEntity = EntityImpl.parse("userFrom@vysper.org");
+
+ EntityImpl toEntity = EntityImpl.parse("userTo@vysper.org");
+
+
+ TestSessionContext sessionContextToEntity_1_prio3 = createSessionForTo(toEntity, 3); // HIGHEST PRIO
+ TestSessionContext sessionContextToEntity_2_prio0 = createSessionForTo(toEntity, 1); // not receiving
+ TestSessionContext sessionContextToEntity_3_prio3 = createSessionForTo(toEntity, 3); // HIGHEST PRIO
+ TestSessionContext sessionContextToEntity_4_prioMinus = createSessionForTo(toEntity, -1); // not receiving
+
+ Stanza stanza = StanzaBuilder.createMessageStanza(fromEntity, toEntity, "en", "Hello").build();
+
+ try {
+ stanzaRelay.relay(toEntity, stanza, new IgnoreFailureStrategy());
+ Stanza recordedStanza_1 = sessionContextToEntity_1_prio3.getNextRecordedResponse(100);
+ assertNotNull("stanza 1 delivered", recordedStanza_1);
+ Stanza recordedStanza_2 = sessionContextToEntity_2_prio0.getNextRecordedResponse(100);
+ assertNull("stanza 2 not delivered", recordedStanza_2);
+ Stanza recordedStanza_3 = sessionContextToEntity_3_prio3.getNextRecordedResponse(100);
+ assertNotNull("stanza 3 delivered", recordedStanza_3);
+ Stanza recordedStanza_4 = sessionContextToEntity_4_prioMinus.getNextRecordedResponse(100);
+ assertNull("stanza 4 not delivered", recordedStanza_4);
+ } catch (DeliveryException e) {
+ throw e;
+ }
+
+ }
+
+ private TestSessionContext createSessionForTo(EntityImpl toEntity, final int priority) {
+ TestSessionContext sessionContextToEntity = TestSessionContext.createSessionContext(toEntity);
+ sessionContextToEntity.setSessionState(SessionState.AUTHENTICATED);
+ String toEntityRes = resourceRegistry.bindSession(sessionContextToEntity);
+ resourceRegistry.setResourcePriority(toEntityRes, priority);
+ return sessionContextToEntity;
+ }
+
+
}
Modified: mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java
URL: http://svn.apache.org/viewvc/mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java?rev=922450&r1=922449&r2=922450&view=diff
==============================================================================
--- mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java (original)
+++ mina/sandbox/vysper/trunk/server/core/src/test/java/org/apache/vysper/xmpp/state/resourcebinding/ResourceRegistryTestCase.java Fri Mar 12 23:04:58 2010
@@ -93,20 +93,37 @@ public class ResourceRegistryTestCase ex
assertEquals(2, resourceList.size());
assertTrue(sessionList.contains(sessionContext1));
assertTrue(sessionList.contains(sessionContext2));
-
- SessionContext hightestPrioSession = resourceRegistry.getHighestPrioSession(entity, null);
- assertSame(resourceRegistry.getSessionContext(resourceId2), hightestPrioSession);
-
+
+ List<SessionContext> highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, null);
+ assertEquals(1, highestPrioSessions.size());
+ SessionContext highestPrioSession = highestPrioSessions.get(0);
+ assertSame(resourceRegistry.getSessionContext(resourceId2), highestPrioSession);
+
resourceRegistry.setResourcePriority(resourceId1, 2); // make this highes prio
- hightestPrioSession = resourceRegistry.getHighestPrioSession(entity, null);
- assertSame(resourceRegistry.getSessionContext(resourceId1), hightestPrioSession);
+ highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, null);
+ assertEquals(1, highestPrioSessions.size());
+ highestPrioSession = highestPrioSessions.get(0);
+ assertSame(resourceRegistry.getSessionContext(resourceId1), highestPrioSession);
+
+ highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 2); // still highest prio
+ assertEquals(1, highestPrioSessions.size());
+ highestPrioSession = highestPrioSessions.get(0);
+ assertSame(resourceRegistry.getSessionContext(resourceId1), highestPrioSession);
+
+ highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 3); // now, all prios are below threshold
+ assertEquals(0, highestPrioSessions.size());
+
+ resourceRegistry.setResourcePriority(resourceId1, 4); // both are same
+ resourceRegistry.setResourcePriority(resourceId2, 4); // both are same
+ highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 3);
+ assertEquals(2, highestPrioSessions.size());
+ highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 4);
+ assertEquals(2, highestPrioSessions.size());
+ highestPrioSessions = resourceRegistry.getHighestPrioSessions(entity, 5);
+ assertEquals(0, highestPrioSessions.size());
+
+ sessionContext1.getServerRuntimeContext().getServerFeatures().setDeliverMessageToHighestPriorityResourcesOnly(false);
- hightestPrioSession = resourceRegistry.getHighestPrioSession(entity, 2); // still highest prio
- assertSame(resourceRegistry.getSessionContext(resourceId1), hightestPrioSession);
-
- hightestPrioSession = resourceRegistry.getHighestPrioSession(entity, 3); // now, all prios are below threshold
- assertNull(hightestPrioSession);
-
}
public void testAddOneEntityMultipleResources_TolerateResourceIds() throws EntityFormatException {