You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by an...@apache.org on 2007/03/15 13:22:46 UTC
svn commit: r518603 - in /incubator/cxf/trunk/rt/ws/rm/src:
main/java/org/apache/cxf/ws/rm/ main/java/org/apache/cxf/ws/rm/policy/
main/java/org/apache/cxf/ws/rm/soap/ test/java/org/apache/cxf/ws/rm/
test/java/org/apache/cxf/ws/rm/policy/ test/java/org...
Author: andreasmyth
Date: Thu Mar 15 05:22:45 2007
New Revision: 518603
URL: http://svn.apache.org/viewvc?view=rev&rev=518603
Log:
[JIRA CXF-280] Use of message specific acknowledgement interval.
[JIRA CXF-459] Use of daemon thread in RM's timer.
Added:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java (with props)
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java (with props)
Modified:
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java?view=diff&rev=518603&r1=518602&r2=518603
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java Thu Mar 15 05:22:45 2007
@@ -26,9 +26,10 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.cxf.common.i18n.Message;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
import org.apache.cxf.ws.rm.persistence.RMStore;
@@ -84,11 +85,15 @@
* @param replyToAddress the replyTo address of the message that carried this sequence information
* @throws SequenceFault if the sequence specified in <code>sequenceType</code> does not exist
*/
- public void acknowledge(SequenceType sequenceType, String replyToAddress)
+ public void acknowledge(Message message)
throws SequenceFault {
+ SequenceType sequenceType = RMContextUtils.retrieveRMProperties(message, false).getSequence();
+ if (null == sequenceType) {
+ return;
+ }
DestinationSequence seq = getSequence(sequenceType.getIdentifier());
if (null != seq) {
- seq.acknowledge(sequenceType.getMessageNumber());
+ seq.acknowledge(message);
if (null != sequenceType.getLastMessage()) {
@@ -98,14 +103,18 @@
// if we cannot expect an outgoing message to which the acknowledgement
// can be added we need to send an out-of-band SequenceAcknowledgement message
-
+
+ AddressingPropertiesImpl maps = RMContextUtils.retrieveMAPs(message, false, false);
+ String replyToAddress = null;
+ if (null != maps.getReplyTo()) {
+ replyToAddress = maps.getReplyTo().getAddress().getValue();
+ }
if (!(seq.getAcksTo().getAddress().getValue().equals(replyToAddress)
|| seq.canPiggybackAckOnPartialResponse())) {
try {
getReliableEndpoint().getProxy().acknowledge(seq);
} catch (IOException ex) {
- Message msg = new Message("SEQ_ACK_SEND_EXC", LOG, seq);
- LOG.log(Level.SEVERE, msg.toString(), ex);
+ LogUtils.log(LOG, Level.SEVERE, "SEQ_ACK_SEND_EXC", ex, seq);
}
}
}
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java?view=diff&rev=518603&r1=518602&r2=518603
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java Thu Mar 15 05:22:45 2007
@@ -29,13 +29,15 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.cxf.common.i18n.Message;
+// import org.apache.cxf.common.i18n.Message;
import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.message.Message;
import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
import org.apache.cxf.ws.rm.manager.AcksPolicyType;
import org.apache.cxf.ws.rm.manager.DeliveryAssuranceType;
import org.apache.cxf.ws.rm.persistence.RMStore;
+import org.apache.cxf.ws.rm.policy.PolicyUtils;
import org.apache.cxf.ws.rm.policy.RMAssertion;
public class DestinationSequence extends AbstractSequence {
@@ -104,12 +106,15 @@
return destination.getName().toString();
}
- public void acknowledge(BigInteger messageNumber) throws SequenceFault {
+ public void acknowledge(Message message) throws SequenceFault {
+ SequenceType st = RMContextUtils.retrieveRMProperties(message, false).getSequence();
+ BigInteger messageNumber = st.getMessageNumber();
LOG.fine("Acknowledging message: " + messageNumber);
if (null != lastMessageNumber && messageNumber.compareTo(lastMessageNumber) > 0) {
SequenceFaultType sf = RMUtils.getWSRMFactory().createSequenceFaultType();
sf.setFaultCode(RMConstants.getLastMessageNumberExceededFaultCode());
- Message msg = new Message("LAST_MESSAGE_NUMBER_EXCEEDED_EXC", LOG, this);
+ org.apache.cxf.common.i18n.Message msg =
+ new org.apache.cxf.common.i18n.Message("LAST_MESSAGE_NUMBER_EXCEEDED_EXC", LOG, this);
throw new SequenceFault(msg.toString(), sf);
}
@@ -153,7 +158,7 @@
purgeAcknowledged(messageNumber);
- scheduleAcknowledgement();
+ scheduleAcknowledgement(message);
}
@@ -212,7 +217,8 @@
boolean applyDeliveryAssurance(BigInteger mn) {
DeliveryAssuranceType da = destination.getManager().getDeliveryAssurance();
if (da.isSetAtMostOnce() && isAcknowledged(mn)) {
- Message msg = new Message("MESSAGE_ALREADY_DELIVERED", LOG, mn, getIdentifier().getValue());
+ org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message(
+ "MESSAGE_ALREADY_DELIVERED", LOG, mn, getIdentifier().getValue());
LOG.log(Level.SEVERE, msg.toString());
return false;
}
@@ -272,11 +278,18 @@
return correlationID;
}
- void scheduleAcknowledgement() {
- RMAssertion rma = destination.getManager().getRMAssertion();
- int delay = 0;
- if (null != rma.getAcknowledgementInterval()) {
- delay = rma.getAcknowledgementInterval().getMilliseconds().intValue();
+ void scheduleAcknowledgement(Message message) {
+ BigInteger interval = PolicyUtils.getAcknowledgmentInterval(message);
+ if (null == interval) {
+ RMAssertion rma = destination.getManager().getRMAssertion();
+ if (null != rma.getAcknowledgementInterval()) {
+ interval = rma.getAcknowledgementInterval().getMilliseconds();
+ }
+ }
+
+ long delay = 0;
+ if (null != interval) {
+ delay = interval.longValue();
}
AcksPolicyType ap = destination.getManager().getDestinationPolicy().getAcksPolicy();
@@ -294,7 +307,7 @@
acknowledgeOnNextOccasion = true;
}
- synchronized void scheduleDeferredAcknowledgement(int delay) {
+ synchronized void scheduleDeferredAcknowledgement(long delay) {
if (null == deferredAcknowledgments) {
deferredAcknowledgments = new ArrayList<DeferredAcknowledgment>();
@@ -333,8 +346,7 @@
Proxy proxy = rme.getProxy();
proxy.acknowledge(DestinationSequence.this);
} catch (IOException ex) {
- Message msg = new Message("SEQ_ACK_SEND_EXC", LOG, DestinationSequence.this);
- LOG.log(Level.SEVERE, msg.toString(), ex);
+ LogUtils.log(LOG, Level.SEVERE, "SEQ_ACK_SEND_EXC", ex, DestinationSequence.this);
} finally {
synchronized (DestinationSequence.this) {
DestinationSequence.this.deferredAcknowledgments.remove(this);
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java?view=diff&rev=518603&r1=518602&r2=518603
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java Thu Mar 15 05:22:45 2007
@@ -29,7 +29,6 @@
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.message.Message;
-import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
import org.apache.cxf.ws.addressing.MAPAggregator;
@@ -115,7 +114,7 @@
processAcknowledgmentRequests(rmps);
- processSequence(destination, rmps, maps);
+ processSequence(destination, message);
processDeliveryAssurance(rmps);
}
@@ -138,18 +137,13 @@
}
void processAcknowledgmentRequests(RMProperties rmps) {
-
+ // TODO
}
- void processSequence(Destination destination, RMProperties rmps, AddressingProperties maps)
+ void processSequence(Destination destination, Message message)
throws SequenceFault {
- SequenceType s = rmps.getSequence();
- if (null == s) {
- return;
- }
- destination.acknowledge(s,
- null == maps.getReplyTo() ? null : maps.getReplyTo().getAddress().getValue());
+ destination.acknowledge(message);
}
void processDeliveryAssurance(RMProperties rmps) {
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=diff&rev=518603&r1=518602&r2=518603
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Thu Mar 15 05:22:45 2007
@@ -62,7 +62,7 @@
private RetransmissionQueue retransmissionQueue;
private Map<Endpoint, RMEndpoint> reliableEndpoints = new HashMap<Endpoint, RMEndpoint>();
private Map<String, SourceSequence> sourceSequences;
- private Timer timer = new Timer();
+ private Timer timer = new Timer(true);
public Bus getBus() {
return bus;
Added: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java?view=auto&rev=518603
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java Thu Mar 15 05:22:45 2007
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.policy;
+
+import java.math.BigInteger;
+import java.util.Collection;
+
+import org.apache.cxf.message.Message;
+import org.apache.cxf.ws.policy.AssertionInfo;
+import org.apache.cxf.ws.policy.AssertionInfoMap;
+import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
+import org.apache.cxf.ws.rm.RMConstants;
+
+/**
+ *
+ */
+public final class PolicyUtils {
+
+ /**
+ * Prevents instantiation.
+ *
+ */
+ private PolicyUtils() {
+ }
+
+
+ /**
+ * Returns the base retransmission interval for the specified message.
+ * This is obtained as the minimum base retransmission interval in all RMAssertions pertaining
+ * to the message, or null if there are no such policy assertions.
+ * @param message the message
+ * @return the base retransmission interval for the message
+ */
+ public static BigInteger getBaseRetransmissionInterval(Message message) {
+ AssertionInfoMap amap = message.get(AssertionInfoMap.class);
+ BigInteger result = null;
+ if (null != amap) {
+ Collection<AssertionInfo> ais = amap.get(RMConstants.getRMAssertionQName());
+ if (null != ais) {
+ for (AssertionInfo ai : ais) {
+ JaxbAssertion<RMAssertion> ja = getAssertion(ai);
+ RMAssertion rma = ja.getData();
+ RMAssertion.BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
+ if (null == bri) {
+ continue;
+ }
+ BigInteger val = bri.getMilliseconds();
+ if (null == result || val.compareTo(result) < 0) {
+ result = val;
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Determines if exponential backoff should be used in repeated attempts to
+ * resend the specified message. Returns false if there is at least one
+ * RMAssertion for this message indicating that no exponential backoff
+ * algorithm should be used, or true otherwise.
+ *
+ * @param message the message
+ * @return true iff the exponential backoff algorithm should be used for the
+ * message
+ */
+ public static boolean useExponentialBackoff(Message message) {
+ AssertionInfoMap amap = message.get(AssertionInfoMap.class);
+ if (null != amap) {
+ Collection<AssertionInfo> ais = amap.get(RMConstants.getRMAssertionQName());
+ if (null != ais) {
+ for (AssertionInfo ai : ais) {
+ JaxbAssertion<RMAssertion> ja = getAssertion(ai);
+ RMAssertion rma = ja.getData();
+ if (null == rma.getExponentialBackoff()) {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Returns the acknowledgment interval for the specified message.
+ * This is obtained as the minimum acknowledgment interval in all RMAssertions pertaining
+ * to the message, or null of if there are no such policy assertions.
+ * @param message the message
+ * @return the base retransmission interval for the message
+ */
+ public static BigInteger getAcknowledgmentInterval(Message message) {
+ AssertionInfoMap amap = message.get(AssertionInfoMap.class);
+ BigInteger result = null;
+ if (null != amap) {
+ Collection<AssertionInfo> ais = amap.get(RMConstants.getRMAssertionQName());
+ if (null != ais) {
+ for (AssertionInfo ai : ais) {
+ JaxbAssertion<RMAssertion> ja = getAssertion(ai);
+ RMAssertion rma = ja.getData();
+ RMAssertion.AcknowledgementInterval interval = rma.getAcknowledgementInterval();
+ if (null == interval) {
+ continue;
+ }
+ BigInteger val = interval.getMilliseconds();
+ if (null == result || val.compareTo(result) < 0) {
+ result = val;
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private static JaxbAssertion<RMAssertion> getAssertion(AssertionInfo ai) {
+ return (JaxbAssertion<RMAssertion>)ai.getAssertion();
+ }
+}
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/policy/PolicyUtils.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?view=diff&rev=518603&r1=518602&r2=518603
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Thu Mar 15 05:22:45 2007
@@ -43,10 +43,8 @@
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.ws.policy.AssertionInfo;
-import org.apache.cxf.ws.policy.AssertionInfoMap;
import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
import org.apache.cxf.ws.rm.Identifier;
-import org.apache.cxf.ws.rm.RMConstants;
import org.apache.cxf.ws.rm.RMContextUtils;
import org.apache.cxf.ws.rm.RMManager;
import org.apache.cxf.ws.rm.RMMessageConstants;
@@ -56,6 +54,7 @@
import org.apache.cxf.ws.rm.SequenceType;
import org.apache.cxf.ws.rm.SourceSequence;
import org.apache.cxf.ws.rm.persistence.RMStore;
+import org.apache.cxf.ws.rm.policy.PolicyUtils;
import org.apache.cxf.ws.rm.policy.RMAssertion;
/**
@@ -64,15 +63,15 @@
public class RetransmissionQueueImpl implements RetransmissionQueue {
private static final Logger LOG = LogUtils.getL7dLogger(RetransmissionQueueImpl.class);
-
+
private Map<String, List<ResendCandidate>> candidates = new HashMap<String, List<ResendCandidate>>();
private Resender resender;
private RMManager manager;
-
+
public RetransmissionQueueImpl(RMManager m) {
manager = m;
}
-
+
public RMManager getManager() {
return manager;
}
@@ -80,78 +79,47 @@
public void setManager(RMManager m) {
manager = m;
}
-
+
/**
- * Returns the base retransmission interval for the specified message.
- * This is obtained as the minimum base retransmission interval in all RMAssertions pertaining
- * to the message, or the default configured for the RMManager if there are no such policy
- * assertions.
+ * Returns the base retransmission interval for the specified message. This
+ * is obtained as the minimum base retransmission interval in all
+ * RMAssertions pertaining to the message, or the default configured for the
+ * RMManager if there are no such policy assertions.
+ *
* @param message the message
* @return the base retransmission interval for the message
*/
public long getBaseRetransmissionInterval(Message message) {
- AssertionInfoMap amap = message.get(AssertionInfoMap.class);
- boolean initialised = false;
- long baseRetransmissionInterval = 0;
- if (null != amap) {
- Collection<AssertionInfo> ais = amap.get(RMConstants.getRMAssertionQName());
- if (null != ais) {
- for (AssertionInfo ai : ais) {
- JaxbAssertion<RMAssertion> ja = getAssertion(ai);
- RMAssertion rma = ja.getData();
- RMAssertion.BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
- if (null == bri) {
- continue;
- }
- BigInteger bival = bri.getMilliseconds();
- if (null == bival) {
- continue;
- }
- long lval = bival.longValue();
- if (initialised && lval < baseRetransmissionInterval) {
- baseRetransmissionInterval = lval;
- } else {
- baseRetransmissionInterval = lval;
- }
- initialised = true;
+ BigInteger val = PolicyUtils.getBaseRetransmissionInterval(message);
+ if (null != val) {
+ return val.longValue();
+ } else {
- }
- }
- }
- if (!initialised) {
RMAssertion rma = manager.getRMAssertion();
RMAssertion.BaseRetransmissionInterval bri = rma.getBaseRetransmissionInterval();
if (null != bri) {
- BigInteger bival = bri.getMilliseconds();
- if (null != bival) {
- baseRetransmissionInterval = bival.longValue();
- }
+ val = bri.getMilliseconds();
}
}
- return baseRetransmissionInterval;
+ if (null != val) {
+ return val.longValue();
+ }
+ return 0;
}
-
+
/**
- * Determines if exponential backoff should be used in repeated attemprs to resend
- * the specified message.
- * Returns false if there is at least one RMAssertion for this message indicating that no
- * exponential backoff algorithm should be used, or true otherwise.
+ * Determines if exponential backoff should be used in repeated attemprs to
+ * resend the specified message. Returns false if there is at least one
+ * RMAssertion for this message indicating that no exponential backoff
+ * algorithm should be used, or true otherwise.
+ *
* @param message the message
- * @return true iff the exponential backoff algorithm should be used for the message
+ * @return true iff the exponential backoff algorithm should be used for the
+ * message
*/
public boolean useExponentialBackoff(Message message) {
- AssertionInfoMap amap = message.get(AssertionInfoMap.class);
- if (null != amap) {
- Collection<AssertionInfo> ais = amap.get(RMConstants.getRMAssertionQName());
- if (null != ais) {
- for (AssertionInfo ai : ais) {
- JaxbAssertion<RMAssertion> ja = getAssertion(ai);
- RMAssertion rma = ja.getData();
- if (null == rma.getExponentialBackoff()) {
- return false;
- }
- }
- }
+ if (!PolicyUtils.useExponentialBackoff(message)) {
+ return false;
}
RMAssertion rma = manager.getRMAssertion();
if (null == rma.getExponentialBackoff()) {
@@ -161,7 +129,7 @@
}
public void addUnacknowledged(Message message) {
- cacheUnacknowledged(message);
+ cacheUnacknowledged(message);
}
/**
@@ -176,7 +144,7 @@
/**
* @return true if there are no unacknowledged messages in the queue
*/
- public boolean isEmpty() {
+ public boolean isEmpty() {
return 0 == getUnacknowledged().size();
}
@@ -226,26 +194,26 @@
return;
}
LOG.fine("Starting retransmission queue");
-
+
// setup resender
-
+
resender = getDefaultResender();
}
/**
* Stops retransmission queue.
- */
+ */
public void stop() {
// no-op
}
-
+
/**
* @return the exponential backoff
*/
protected int getExponentialBackoff() {
return DEFAULT_EXPONENTIAL_BACKOFF;
}
-
+
/**
* @param message the message context
* @return a ResendCandidate
@@ -253,7 +221,7 @@
protected ResendCandidate createResendCandidate(Message message) {
return new ResendCandidate(message);
}
-
+
/**
* Accepts a new resend candidate.
*
@@ -262,7 +230,7 @@
*/
protected ResendCandidate cacheUnacknowledged(Message message) {
ResendCandidate candidate = null;
- RMProperties rmps = RMContextUtils.retrieveRMProperties(message, true);
+ RMProperties rmps = RMContextUtils.retrieveRMProperties(message, true);
SequenceType st = rmps.getSequence();
Identifier sid = st.getIdentifier();
synchronized (this) {
@@ -278,7 +246,7 @@
LOG.fine("Cached unacknowledged message.");
return candidate;
}
-
+
/**
* @return a map relating sequence ID to a lists of un-acknowledged messages
* for that sequence
@@ -295,7 +263,7 @@
protected List<ResendCandidate> getSequenceCandidates(SourceSequence seq) {
return getSequenceCandidates(seq.getIdentifier().getValue());
}
-
+
/**
* @param key the sequence identifier under consideration
* @return the list of resend candidates for that sequence
@@ -304,22 +272,23 @@
protected List<ResendCandidate> getSequenceCandidates(String key) {
return candidates.get(key);
}
-
- private void clientResend(Message message) {
+
+ private void clientResend(Message message) {
Conduit c = message.getExchange().getConduit();
try {
-
- // get registered callbacks, create new output stream and re-register
+
+ // get registered callbacks, create new output stream and
+ // re-register
// all callbacks except the retransmission callback
-
+
OutputStream os = message.getContent(OutputStream.class);
- List<CachedOutputStreamCallback> callbacks = null;
+ List<CachedOutputStreamCallback> callbacks = null;
if (os instanceof AbstractCachedOutputStream) {
callbacks = ((AbstractCachedOutputStream)os).getCallbacks();
}
-
+
c.send(message);
-
+
os = message.getContent(OutputStream.class);
if (os instanceof AbstractCachedOutputStream && callbacks.size() > 1) {
for (CachedOutputStreamCallback cb : callbacks) {
@@ -328,23 +297,23 @@
}
}
}
- ByteArrayOutputStream savedOutputStream =
- (ByteArrayOutputStream)message.get(RMMessageConstants.SAVED_OUTPUT_STREAM);
+ ByteArrayOutputStream savedOutputStream = (ByteArrayOutputStream)message
+ .get(RMMessageConstants.SAVED_OUTPUT_STREAM);
ByteArrayInputStream bis = new ByteArrayInputStream(savedOutputStream.toByteArray());
-
+
// copy saved output stream to new output stream in chunks of 1024
AbstractCachedOutputStream.copyStream(bis, os, 1024);
os.flush();
os.close();
} catch (IOException ex) {
- LOG.log(Level.SEVERE, "RESEND_FAILED_MSG", ex);
+ LOG.log(Level.SEVERE, "RESEND_FAILED_MSG", ex);
}
}
-
+
private void serverResend(Message message) {
// TODO
}
-
+
/**
* Represents a candidate for resend, i.e. an unacked outgoing message.
*/
@@ -366,14 +335,13 @@
resends = 0;
long baseRetransmissionInterval = getBaseRetransmissionInterval(m);
backoff = useExponentialBackoff(m) ? RetransmissionQueue.DEFAULT_EXPONENTIAL_BACKOFF : 1;
- next = new Date(System.currentTimeMillis() + baseRetransmissionInterval);
- nextInterval = baseRetransmissionInterval * backoff;
+ next = new Date(System.currentTimeMillis() + baseRetransmissionInterval);
+ nextInterval = baseRetransmissionInterval * backoff;
if (null != manager.getTimer()) {
schedule();
}
}
-
/**
* Initiate resend asynchronsly.
*
@@ -395,7 +363,7 @@
LOG.log(Level.SEVERE, "RESEND_INITIATION_FAILED_MSG", ex);
}
}
-
+
public void run() {
try {
// ensure ACK wasn't received while this task was enqueued
@@ -409,14 +377,13 @@
}
}
-
/**
* @return number of resend attempts
*/
protected int getResends() {
return resends;
}
-
+
/**
* @return date of next resend
*/
@@ -461,20 +428,22 @@
schedule();
}
}
-
+
protected final synchronized void schedule() {
if (null == manager.getTimer()) {
return;
}
class ResendTask extends TimerTask {
ResendCandidate candidate;
+
ResendTask(ResendCandidate c) {
candidate = c;
- }
+ }
+
@Override
public void run() {
if (!candidate.isPending()) {
- candidate.initiate(includeAckRequested);
+ candidate.initiate(includeAckRequested);
}
}
}
@@ -482,12 +451,11 @@
try {
manager.getTimer().schedule(nextTask, next);
} catch (IllegalStateException ex) {
- LOG.log(Level.WARNING, "SCHEDULE_RESEND_FAILED_MSG", ex);
+ LOG.log(Level.WARNING, "SCHEDULE_RESEND_FAILED_MSG", ex);
}
}
}
-
-
+
/**
* Encapsulates actual resend logic (pluggable to facilitate unit testing)
*/
@@ -500,7 +468,7 @@
*/
void resend(Message message, boolean requestAcknowledge);
}
-
+
/**
* Create default Resender logic.
*
@@ -508,7 +476,7 @@
*/
protected final Resender getDefaultResender() {
return new Resender() {
- public void resend(Message message, boolean requestAcknowledge) {
+ public void resend(Message message, boolean requestAcknowledge) {
RMProperties properties = RMContextUtils.retrieveRMProperties(message, true);
SequenceType st = properties.getSequence();
if (st != null) {
@@ -517,7 +485,7 @@
try {
// TODO: remove previously added acknowledgments and update
// message id (to avoid duplicates)
-
+
if (RMContextUtils.isRequestor(message)) {
clientResend(message);
} else {
@@ -529,7 +497,7 @@
}
};
}
-
+
/**
* Plug in replacement resend logic (facilitates unit testing).
*
@@ -538,7 +506,7 @@
protected void replaceResender(Resender replacement) {
resender = replacement;
}
-
+
@SuppressWarnings("unchecked")
protected JaxbAssertion<RMAssertion> getAssertion(AssertionInfo ai) {
return (JaxbAssertion<RMAssertion>)ai.getAssertion();
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java?view=diff&rev=518603&r1=518602&r2=518603
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/DestinationSequenceTest.java Thu Mar 15 05:22:45 2007
@@ -26,9 +26,9 @@
import javax.xml.namespace.QName;
-
import junit.framework.TestCase;
-
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
import org.apache.cxf.ws.rm.SequenceAcknowledgement.AcknowledgementRange;
import org.apache.cxf.ws.rm.manager.AcksPolicyType;
@@ -55,8 +55,7 @@
public void setUp() {
control = EasyMock.createNiceControl();
factory = new ObjectFactory();
-
- ref = control.createMock(EndpointReferenceType.class);
+ ref = control.createMock(EndpointReferenceType.class);
id = factory.createIdentifier();
id.setValue("seq");
}
@@ -129,19 +128,21 @@
public void testAcknowledgeBasic() throws SequenceFault {
setUpDestination();
+ Message message1 = setUpMessage("1");
+ Message message2 = setUpMessage("2");
control.replay();
DestinationSequence seq = new DestinationSequence(id, ref, destination);
List<AcknowledgementRange> ranges = seq.getAcknowledgment().getAcknowledgementRange();
assertEquals(0, ranges.size());
-
- seq.acknowledge(new BigInteger("1"));
+
+ seq.acknowledge(message1);
assertEquals(1, ranges.size());
AcknowledgementRange r1 = ranges.get(0);
assertEquals(1, r1.getLower().intValue());
assertEquals(1, r1.getUpper().intValue());
- seq.acknowledge(new BigInteger("2"));
+ seq.acknowledge(message2);
assertEquals(1, ranges.size());
r1 = ranges.get(0);
assertEquals(1, r1.getLower().intValue());
@@ -152,14 +153,16 @@
public void testAcknowledgeLastMessageNumberExceeded() throws SequenceFault {
setUpDestination();
+ Message message1 = setUpMessage("1");
+ Message message2 = setUpMessage("2");
control.replay();
DestinationSequence seq = new DestinationSequence(id, ref, destination);
- seq.acknowledge(BigInteger.ONE);
+ seq.acknowledge(message1);
seq.setLastMessageNumber(BigInteger.ONE);
try {
- seq.acknowledge(new BigInteger("2"));
+ seq.acknowledge(message2);
fail("Expected SequenceFault not thrown.");
} catch (SequenceFault sf) {
assertEquals("LastMessageNumberExceeded", sf.getFaultInfo().getFaultCode().getLocalPart());
@@ -170,15 +173,21 @@
public void testAcknowledgeAppendRange() throws SequenceFault {
setUpDestination();
+ Message[] messages = new Message [] {
+ setUpMessage("1"),
+ setUpMessage("2"),
+ setUpMessage("5"),
+ setUpMessage("4"),
+ setUpMessage("6")
+ };
+
control.replay();
DestinationSequence seq = new DestinationSequence(id, ref, destination);
- List<AcknowledgementRange> ranges = seq.getAcknowledgment().getAcknowledgementRange();
- seq.acknowledge(new BigInteger("1"));
- seq.acknowledge(new BigInteger("2"));
- seq.acknowledge(new BigInteger("5"));
- seq.acknowledge(new BigInteger("4"));
- seq.acknowledge(new BigInteger("6"));
+ List<AcknowledgementRange> ranges = seq.getAcknowledgment().getAcknowledgementRange();
+ for (int i = 0; i < messages.length; i++) {
+ seq.acknowledge(messages[i]);
+ }
assertEquals(2, ranges.size());
AcknowledgementRange r = ranges.get(0);
assertEquals(1, r.getLower().intValue());
@@ -192,17 +201,22 @@
public void testAcknowledgeInsertRange() throws SequenceFault {
setUpDestination();
+ Message[] messages = new Message [] {
+ setUpMessage("1"),
+ setUpMessage("2"),
+ setUpMessage("9"),
+ setUpMessage("10"),
+ setUpMessage("4"),
+ setUpMessage("9"),
+ setUpMessage("2")
+ };
control.replay();
DestinationSequence seq = new DestinationSequence(id, ref, destination);
- List<AcknowledgementRange> ranges = seq.getAcknowledgment().getAcknowledgementRange();
- seq.acknowledge(new BigInteger("1"));
- seq.acknowledge(new BigInteger("2"));
- seq.acknowledge(new BigInteger("9"));
- seq.acknowledge(new BigInteger("10"));
- seq.acknowledge(new BigInteger("4"));
- seq.acknowledge(new BigInteger("9"));
- seq.acknowledge(new BigInteger("2"));
+ List<AcknowledgementRange> ranges = seq.getAcknowledgment().getAcknowledgementRange();
+ for (int i = 0; i < messages.length; i++) {
+ seq.acknowledge(messages[i]);
+ }
assertEquals(3, ranges.size());
AcknowledgementRange r = ranges.get(0);
@@ -220,16 +234,21 @@
public void testAcknowledgePrependRange() throws SequenceFault {
setUpDestination();
+ Message[] messages = new Message [] {
+ setUpMessage("4"),
+ setUpMessage("5"),
+ setUpMessage("6"),
+ setUpMessage("4"),
+ setUpMessage("2"),
+ setUpMessage("2")
+ };
control.replay();
DestinationSequence seq = new DestinationSequence(id, ref, destination);
List<AcknowledgementRange> ranges = seq.getAcknowledgment().getAcknowledgementRange();
- seq.acknowledge(new BigInteger("4"));
- seq.acknowledge(new BigInteger("5"));
- seq.acknowledge(new BigInteger("6"));
- seq.acknowledge(new BigInteger("4"));
- seq.acknowledge(new BigInteger("2"));
- seq.acknowledge(new BigInteger("2"));
+ for (int i = 0; i < messages.length; i++) {
+ seq.acknowledge(messages[i]);
+ }
assertEquals(2, ranges.size());
AcknowledgementRange r = ranges.get(0);
assertEquals(2, r.getLower().intValue());
@@ -286,6 +305,10 @@
public void testMonitor() throws SequenceFault {
setUpDestination();
+ Message[] messages = new Message[15];
+ for (int i = 0; i < messages.length; i++) {
+ messages[i] = setUpMessage(Integer.toString(i + 1));
+ }
control.replay();
DestinationSequence seq = new DestinationSequence(id, ref, destination);
@@ -295,11 +318,8 @@
assertEquals(0, monitor.getMPM());
- BigInteger mn = BigInteger.ONE;
-
for (int i = 0; i < 10; i++) {
- seq.acknowledge(mn);
- mn = mn.add(BigInteger.ONE);
+ seq.acknowledge(messages[i]);
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
@@ -309,9 +329,8 @@
int mpm1 = monitor.getMPM();
assertTrue(mpm1 > 0);
- for (int i = 0; i < 5; i++) {
- seq.acknowledge(mn);
- mn = mn.add(BigInteger.ONE);
+ for (int i = 10; i < messages.length; i++) {
+ seq.acknowledge(messages[i]);
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
@@ -325,14 +344,16 @@
control.verify();
}
+
public void testAcknowledgeImmediate() throws SequenceFault {
setUpDestination();
+ Message message = setUpMessage("1");
control.replay();
DestinationSequence seq = new DestinationSequence(id, ref, destination);
assertTrue(!seq.sendAcknowledgement());
- seq.acknowledge(new BigInteger("1"));
+ seq.acknowledge(message);
assertTrue(seq.sendAcknowledgement());
seq.acknowledgmentSent();
@@ -353,6 +374,11 @@
proxy.acknowledge(seq);
EasyMock.expectLastCall();
+ Message[] messages = new Message[] {
+ setUpMessage("1"),
+ setUpMessage("2"),
+ setUpMessage("3")
+ };
control.replay();
ap.setIntraMessageThreshold(0);
@@ -363,9 +389,9 @@
assertTrue(!seq.sendAcknowledgement());
- seq.acknowledge(new BigInteger("1"));
- seq.acknowledge(new BigInteger("2"));
- seq.acknowledge(new BigInteger("3"));
+ for (int i = 0; i < messages.length; i++) {
+ seq.acknowledge(messages[i]);
+ }
assertFalse(seq.sendAcknowledgement());
@@ -453,6 +479,10 @@
public void testInOrderWait() {
setUpDestination();
+ Message[] messages = new Message[5];
+ for (int i = 0; i < messages.length; i++) {
+ messages[i] = setUpMessage(Integer.toString(i + 1));
+ }
DeliveryAssuranceType da = control.createMock(DeliveryAssuranceType.class);
EasyMock.expect(manager.getDeliveryAssurance()).andReturn(da).anyTimes();
@@ -462,25 +492,27 @@
SequenceAcknowledgement ack = factory.createSequenceAcknowledgement();
List<AcknowledgementRange> ranges = new ArrayList<AcknowledgementRange>();
- final int n = 5;
+
final AcknowledgementRange r =
factory.createSequenceAcknowledgementAcknowledgementRange();
- r.setUpper(new BigInteger(Integer.toString(n)));
+ r.setUpper(new BigInteger(Integer.toString(messages.length)));
ranges.add(r);
final DestinationSequence ds = new DestinationSequence(id, ref, null, ack);
ds.setDestination(destination);
class Acknowledger extends Thread {
- BigInteger mn;
+ Message message;
+ BigInteger messageNr;
- Acknowledger(String mnStr) {
- mn = new BigInteger(mnStr);
+ Acknowledger(Message m, BigInteger mn) {
+ message = m;
+ messageNr = mn;
}
public void run() {
try {
- ds.acknowledge(mn);
- ds.applyDeliveryAssurance(mn);
+ ds.acknowledge(message);
+ ds.applyDeliveryAssurance(messageNr);
} catch (SequenceFault ex) {
// ignore
}
@@ -489,9 +521,9 @@
control.replay();
- Thread[] threads = new Thread[n];
- for (int i = n - 1; i >= 0; i--) {
- threads[i] = new Acknowledger(Integer.toString(i + 1));
+ Thread[] threads = new Thread[messages.length];
+ for (int i = messages.length - 1; i >= 0; i--) {
+ threads[i] = new Acknowledger(messages[i], new BigInteger(Integer.toString(i + 1)));
threads[i].start();
try {
Thread.sleep(100);
@@ -501,7 +533,7 @@
}
boolean timedOut = false;
- for (int i = 0; i < n; i++) {
+ for (int i = 0; i < messages.length; i++) {
try {
threads[i].join(1000);
} catch (InterruptedException ex) {
@@ -590,6 +622,21 @@
EasyMock.expect(manager.getTimer()).andReturn(timer).anyTimes();
}
+ }
+
+ private Message setUpMessage(String messageNr) {
+ Message message = control.createMock(Message.class);
+ Exchange exchange = control.createMock(Exchange.class);
+ EasyMock.expect(message.getExchange()).andReturn(exchange);
+ EasyMock.expect(exchange.getOutMessage()).andReturn(null);
+ EasyMock.expect(exchange.getOutFaultMessage()).andReturn(null);
+ RMProperties rmps = control.createMock(RMProperties.class);
+ EasyMock.expect(message.get(RMMessageConstants.RM_PROPERTIES_INBOUND)).andReturn(rmps);
+ SequenceType st = control.createMock(SequenceType.class);
+ EasyMock.expect(rmps.getSequence()).andReturn(st);
+ BigInteger val = new BigInteger(messageNr);
+ EasyMock.expect(st.getMessageNumber()).andReturn(val);
+ return message;
}
Added: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java?view=auto&rev=518603
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java (added)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java Thu Mar 15 05:22:45 2007
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.policy;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import junit.framework.TestCase;
+
+import org.apache.cxf.message.Message;
+import org.apache.cxf.ws.policy.AssertionInfo;
+import org.apache.cxf.ws.policy.AssertionInfoMap;
+import org.apache.cxf.ws.policy.builder.jaxb.JaxbAssertion;
+import org.apache.cxf.ws.rm.RMConstants;
+import org.easymock.IMocksControl;
+import org.easymock.classextension.EasyMock;
+
+/**
+ *
+ */
+public class PolicyUtilsTest extends TestCase {
+
+ private IMocksControl control;
+
+ public void setUp() {
+ control = EasyMock.createNiceControl();
+ }
+
+ public void tearDown() {
+ control.verify();
+ }
+
+ public void testGetBaseRetranmissionInterval() {
+ Message message = control.createMock(Message.class);
+ AssertionInfoMap aim = control.createMock(AssertionInfoMap.class);
+ EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
+ AssertionInfo ai1 = control.createMock(AssertionInfo.class);
+ AssertionInfo ai2 = control.createMock(AssertionInfo.class);
+ AssertionInfo ai3 = control.createMock(AssertionInfo.class);
+ AssertionInfo ai4 = control.createMock(AssertionInfo.class);
+ Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
+ ais.add(ai1);
+ ais.add(ai2);
+ ais.add(ai3);
+ ais.add(ai4);
+ EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
+ JaxbAssertion ja1 = control.createMock(JaxbAssertion.class);
+ EasyMock.expect(ai1.getAssertion()).andReturn(ja1);
+ RMAssertion rma1 = control.createMock(RMAssertion.class);
+ EasyMock.expect(ja1.getData()).andReturn(rma1);
+ EasyMock.expect(rma1.getBaseRetransmissionInterval()).andReturn(null);
+ JaxbAssertion ja2 = control.createMock(JaxbAssertion.class);
+ EasyMock.expect(ai2.getAssertion()).andReturn(ja2);
+ RMAssertion rma2 = control.createMock(RMAssertion.class);
+ EasyMock.expect(ja2.getData()).andReturn(rma2);
+ RMAssertion.BaseRetransmissionInterval bri2 =
+ control.createMock(RMAssertion.BaseRetransmissionInterval.class);
+ EasyMock.expect(rma2.getBaseRetransmissionInterval()).andReturn(bri2);
+ EasyMock.expect(bri2.getMilliseconds()).andReturn(null);
+ JaxbAssertion ja3 = control.createMock(JaxbAssertion.class);
+ EasyMock.expect(ai3.getAssertion()).andReturn(ja3);
+ RMAssertion rma3 = control.createMock(RMAssertion.class);
+ EasyMock.expect(ja3.getData()).andReturn(rma3);
+ RMAssertion.BaseRetransmissionInterval bri3 =
+ control.createMock(RMAssertion.BaseRetransmissionInterval.class);
+ EasyMock.expect(rma3.getBaseRetransmissionInterval()).andReturn(bri3);
+ EasyMock.expect(bri3.getMilliseconds()).andReturn(new BigInteger("10000"));
+ JaxbAssertion ja4 = control.createMock(JaxbAssertion.class);
+ EasyMock.expect(ai4.getAssertion()).andReturn(ja4);
+ RMAssertion rma4 = control.createMock(RMAssertion.class);
+ EasyMock.expect(ja4.getData()).andReturn(rma4);
+ RMAssertion.BaseRetransmissionInterval bri4 =
+ control.createMock(RMAssertion.BaseRetransmissionInterval.class);
+ EasyMock.expect(rma4.getBaseRetransmissionInterval()).andReturn(bri4);
+ EasyMock.expect(bri4.getMilliseconds()).andReturn(new BigInteger("5000"));
+
+ control.replay();
+ assertEquals("Unexpected value for base retransmission interval",
+ 5000, PolicyUtils.getBaseRetransmissionInterval(message).intValue());
+ }
+
+ public void testUseExponentialBackoff() {
+ Message message = control.createMock(Message.class);
+ AssertionInfoMap aim = control.createMock(AssertionInfoMap.class);
+ EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
+ AssertionInfo ai = control.createMock(AssertionInfo.class);
+ Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
+ EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
+ ais.add(ai);
+ JaxbAssertion ja = control.createMock(JaxbAssertion.class);
+ EasyMock.expect(ai.getAssertion()).andReturn(ja);
+ RMAssertion rma = control.createMock(RMAssertion.class);
+ EasyMock.expect(ja.getData()).andReturn(rma);
+ EasyMock.expect(rma.getExponentialBackoff()).andReturn(null);
+ control.replay();
+ assertTrue("Should not use exponential backoff", !PolicyUtils.useExponentialBackoff(message));
+ control.verify();
+ control.reset();
+ EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);
+ control.replay();
+ assertTrue("Should use exponential backoff", PolicyUtils.useExponentialBackoff(message));
+ }
+
+ public void testGetAcknowledgmentInterval() {
+ Message message = control.createMock(Message.class);
+ AssertionInfoMap aim = control.createMock(AssertionInfoMap.class);
+ EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
+ AssertionInfo ai1 = control.createMock(AssertionInfo.class);
+ AssertionInfo ai2 = control.createMock(AssertionInfo.class);
+ AssertionInfo ai3 = control.createMock(AssertionInfo.class);
+ AssertionInfo ai4 = control.createMock(AssertionInfo.class);
+ Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
+ ais.add(ai1);
+ ais.add(ai2);
+ ais.add(ai3);
+ ais.add(ai4);
+ EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
+ JaxbAssertion ja1 = control.createMock(JaxbAssertion.class);
+ EasyMock.expect(ai1.getAssertion()).andReturn(ja1);
+ RMAssertion rma1 = control.createMock(RMAssertion.class);
+ EasyMock.expect(ja1.getData()).andReturn(rma1);
+ EasyMock.expect(rma1.getAcknowledgementInterval()).andReturn(null);
+ JaxbAssertion ja2 = control.createMock(JaxbAssertion.class);
+ EasyMock.expect(ai2.getAssertion()).andReturn(ja2);
+ RMAssertion rma2 = control.createMock(RMAssertion.class);
+ EasyMock.expect(ja2.getData()).andReturn(rma2);
+ RMAssertion.AcknowledgementInterval aint2 =
+ control.createMock(RMAssertion.AcknowledgementInterval.class);
+ EasyMock.expect(rma2.getAcknowledgementInterval()).andReturn(aint2);
+ EasyMock.expect(aint2.getMilliseconds()).andReturn(null);
+ JaxbAssertion ja3 = control.createMock(JaxbAssertion.class);
+ EasyMock.expect(ai3.getAssertion()).andReturn(ja3);
+ RMAssertion rma3 = control.createMock(RMAssertion.class);
+ EasyMock.expect(ja3.getData()).andReturn(rma3);
+ RMAssertion.AcknowledgementInterval aint3 =
+ control.createMock(RMAssertion.AcknowledgementInterval.class);
+ EasyMock.expect(rma3.getAcknowledgementInterval()).andReturn(aint3);
+ EasyMock.expect(aint3.getMilliseconds()).andReturn(new BigInteger("10000"));
+ JaxbAssertion ja4 = control.createMock(JaxbAssertion.class);
+ EasyMock.expect(ai4.getAssertion()).andReturn(ja4);
+ RMAssertion rma4 = control.createMock(RMAssertion.class);
+ EasyMock.expect(ja4.getData()).andReturn(rma4);
+ RMAssertion.AcknowledgementInterval aint4 =
+ control.createMock(RMAssertion.AcknowledgementInterval.class);
+ EasyMock.expect(rma4.getAcknowledgementInterval()).andReturn(aint4);
+ EasyMock.expect(aint4.getMilliseconds()).andReturn(new BigInteger("5000"));
+
+ control.replay();
+ assertEquals("Unexpected value for acknowledgment interval",
+ 5000, PolicyUtils.getAcknowledgmentInterval(message).intValue());
+ }
+
+
+}
Propchange: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/policy/PolicyUtilsTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java?view=diff&rev=518603&r1=518602&r2=518603
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java Thu Mar 15 05:22:45 2007
@@ -103,55 +103,6 @@
assertSame("Unexpected RMManager", manager, queue.getManager());
}
- public void testGetBaseRetranmissionIntervalFromPolicies() {
- Message message = createMock(Message.class);
- AssertionInfoMap aim = createMock(AssertionInfoMap.class);
- EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(aim);
- AssertionInfo ai1 = createMock(AssertionInfo.class);
- AssertionInfo ai2 = createMock(AssertionInfo.class);
- AssertionInfo ai3 = createMock(AssertionInfo.class);
- AssertionInfo ai4 = createMock(AssertionInfo.class);
- Collection<AssertionInfo> ais = new ArrayList<AssertionInfo>();
- ais.add(ai1);
- ais.add(ai2);
- ais.add(ai3);
- ais.add(ai4);
- EasyMock.expect(aim.get(RMConstants.getRMAssertionQName())).andReturn(ais);
- JaxbAssertion ja1 = createMock(JaxbAssertion.class);
- EasyMock.expect(ai1.getAssertion()).andReturn(ja1);
- RMAssertion rma1 = createMock(RMAssertion.class);
- EasyMock.expect(ja1.getData()).andReturn(rma1);
- EasyMock.expect(rma1.getBaseRetransmissionInterval()).andReturn(null);
- JaxbAssertion ja2 = createMock(JaxbAssertion.class);
- EasyMock.expect(ai2.getAssertion()).andReturn(ja2);
- RMAssertion rma2 = createMock(RMAssertion.class);
- EasyMock.expect(ja2.getData()).andReturn(rma2);
- RMAssertion.BaseRetransmissionInterval bri2 =
- createMock(RMAssertion.BaseRetransmissionInterval.class);
- EasyMock.expect(rma2.getBaseRetransmissionInterval()).andReturn(bri2);
- EasyMock.expect(bri2.getMilliseconds()).andReturn(null);
- JaxbAssertion ja3 = createMock(JaxbAssertion.class);
- EasyMock.expect(ai3.getAssertion()).andReturn(ja3);
- RMAssertion rma3 = createMock(RMAssertion.class);
- EasyMock.expect(ja3.getData()).andReturn(rma3);
- RMAssertion.BaseRetransmissionInterval bri3 =
- createMock(RMAssertion.BaseRetransmissionInterval.class);
- EasyMock.expect(rma3.getBaseRetransmissionInterval()).andReturn(bri3);
- EasyMock.expect(bri3.getMilliseconds()).andReturn(new BigInteger("10000"));
- JaxbAssertion ja4 = createMock(JaxbAssertion.class);
- EasyMock.expect(ai4.getAssertion()).andReturn(ja4);
- RMAssertion rma4 = createMock(RMAssertion.class);
- EasyMock.expect(ja4.getData()).andReturn(rma4);
- RMAssertion.BaseRetransmissionInterval bri4 =
- createMock(RMAssertion.BaseRetransmissionInterval.class);
- EasyMock.expect(rma4.getBaseRetransmissionInterval()).andReturn(bri4);
- EasyMock.expect(bri4.getMilliseconds()).andReturn(new BigInteger("5000"));
-
- control.replay();
- assertEquals("Unexpected value for base retransmission interval",
- 5000, queue.getBaseRetransmissionInterval(message));
- }
-
public void testGetBaseRetransmissionIntervalFromManager() {
Message message = createMock(Message.class);
EasyMock.expect(message.get(AssertionInfoMap.class)).andReturn(null);