You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2012/03/27 15:42:48 UTC
svn commit: r1305836 - in /cxf/trunk:
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/
rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/
rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/ systests/ws-specs/src/te...
Author: ay
Date: Tue Mar 27 13:42:47 2012
New Revision: 1305836
URL: http://svn.apache.org/viewvc?rev=1305836&view=rev
Log:
unifying the retry interface for CXF-4209
Added:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetryStatus.java
Removed:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionStatus.java
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java?rev=1305836&r1=1305835&r2=1305836&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java Tue Mar 27 13:42:47 2012
@@ -69,18 +69,18 @@ public class ManagedRMEndpoint implement
{SimpleType.STRING, SimpleType.LONG, SimpleType.STRING,
SimpleType.STRING};
- private static final String[] RETRANSMISSION_STATUS_NAMES =
- {"messageNumber", "resends", "previous", "next", "nextInterval", "backOff", "pending", "suspended"};
- private static final String[] RETRANSMISSION_STATUS_DESCRIPTIONS = RETRANSMISSION_STATUS_NAMES;
+ private static final String[] RETRY_STATUS_NAMES =
+ {"messageNumber", "retries", "previous", "next", "nextInterval", "backOff", "pending", "suspended"};
+ private static final String[] RETRY_STATUS_DESCRIPTIONS = RETRY_STATUS_NAMES;
@SuppressWarnings("rawtypes") // needed as OpenType isn't generic on Java5
- private static final OpenType[] RETRANSMISSION_STATUS_TYPES =
+ private static final OpenType[] RETRY_STATUS_TYPES =
{SimpleType.LONG, SimpleType.INTEGER, SimpleType.DATE, SimpleType.DATE, SimpleType.LONG, SimpleType.LONG,
SimpleType.BOOLEAN, SimpleType.BOOLEAN};
private static CompositeType sourceSequenceType;
private static CompositeType destinationSequenceType;
- private static CompositeType retransmissionStatusType;
+ private static CompositeType retryStatusType;
private RMEndpoint endpoint;
@@ -99,11 +99,11 @@ public class ManagedRMEndpoint implement
DESTINATION_SEQUENCE_DESCRIPTIONS,
DESTINATION_SEQUENCE_TYPES);
- retransmissionStatusType = new CompositeType("retransmissionStatus",
- "retransmissionStatus",
- RETRANSMISSION_STATUS_NAMES,
- RETRANSMISSION_STATUS_DESCRIPTIONS,
- RETRANSMISSION_STATUS_TYPES);
+ retryStatusType = new CompositeType("retryStatus",
+ "retryStatus",
+ RETRY_STATUS_NAMES,
+ RETRY_STATUS_DESCRIPTIONS,
+ RETRY_STATUS_TYPES);
} catch (OpenDataException e) {
// ignore and handle it later
@@ -122,12 +122,23 @@ public class ManagedRMEndpoint implement
}
@ManagedOperation(description = "Total Number of Queued Messages")
- public int getQueuedMessageTotalCount() {
- Source source = endpoint.getSource();
- RetransmissionQueue queue = endpoint.getManager().getRetransmissionQueue();
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "outbound", description = "The outbound direction")
+ })
+ public int getQueuedMessageTotalCount(boolean outbound) {
int count = 0;
- for (SourceSequence ss : source.getAllSequences()) {
- count += queue.countUnacknowledged(ss);
+ if (outbound) {
+ Source source = endpoint.getSource();
+ RetransmissionQueue queue = endpoint.getManager().getRetransmissionQueue();
+ for (SourceSequence ss : source.getAllSequences()) {
+ count += queue.countUnacknowledged(ss);
+ }
+ } else {
+// Destination destination = endpoint.getDestination();
+// RedeliveryQueue queue = endpoint.getManager().getRedeliveryQueue();
+// for (DestinationSequence ds : destination.getAllSequences()) {
+// count += queue.countUndelivered(ds);
+// }
}
return count;
@@ -135,16 +146,26 @@ public class ManagedRMEndpoint implement
@ManagedOperation(description = "Number of Queued Messages")
@ManagedOperationParameters({
- @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier"),
+ @ManagedOperationParameter(name = "outbound", description = "The outbound direction")
})
- public int getQueuedMessageCount(String sid) {
+ public int getQueuedMessageCount(String sid, boolean outbound) {
RMManager manager = endpoint.getManager();
- SourceSequence ss = getSourceSeq(sid);
- if (null == ss) {
- throw new IllegalArgumentException("no sequence");
+ int count = 0;
+ if (outbound) {
+ SourceSequence ss = getSourceSeq(sid);
+ if (null == ss) {
+ throw new IllegalArgumentException("no sequence");
+ }
+ count = manager.getRetransmissionQueue().countUnacknowledged(ss);
+ } else {
+// DestinationSequence ds = getDestinationSeq(sid);
+// if (null == ds) {
+// throw new IllegalArgumentException("no sequence");
+// }
+// count = manager.getRedeliveryQueue().countUndelivered(ds);
}
-
- return manager.getRetransmissionQueue().countUnacknowledged(ss);
+ return count;
}
@ManagedOperation(description = "List of UnAcknowledged Message Numbers")
@@ -239,8 +260,8 @@ public class ManagedRMEndpoint implement
throw new IllegalArgumentException("no sequence");
}
RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
- RetransmissionStatus rs = rq.getRetransmissionStatus(ss, num);
- return getRetransmissionStatusProperties(num, rs);
+ RetryStatus rs = rq.getRetransmissionStatus(ss, num);
+ return getRetryStatusProperties(num, rs);
}
@ManagedOperation(description = "Retransmission Statuses")
@@ -253,16 +274,66 @@ public class ManagedRMEndpoint implement
throw new IllegalArgumentException("no sequence");
}
RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
- Map<Long, RetransmissionStatus> rsmap = rq.getRetransmissionStatuses(ss);
+ Map<Long, RetryStatus> rsmap = rq.getRetransmissionStatuses(ss);
CompositeData[] rsps = new CompositeData[rsmap.size()];
int i = 0;
- for (Map.Entry<Long, RetransmissionStatus> rs : rsmap.entrySet()) {
- rsps[i++] = getRetransmissionStatusProperties(rs.getKey(), rs.getValue());
+ for (Map.Entry<Long, RetryStatus> rs : rsmap.entrySet()) {
+ rsps[i++] = getRetryStatusProperties(rs.getKey(), rs.getValue());
}
return rsps;
}
+// @ManagedOperation(description = "Redelivery Status")
+// @ManagedOperationParameters({
+// @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier"),
+// @ManagedOperationParameter(name = "messageNumber", description = "The message number")
+// })
+// public CompositeData getRedeliveryStatus(String sid, long num) throws JMException {
+// DestinationSequence ds = getDestinationSeq(sid);
+// if (null == ds) {
+// throw new IllegalArgumentException("no sequence");
+// }
+// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+// RetryStatus rs = rq.getRedeliveryStatus(ds, num);
+// return getRetryStatusProperties(num, rs);
+// }
+
+// @ManagedOperation(description = "Redelivery Statuses")
+// @ManagedOperationParameters({
+// @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+// })
+// public CompositeData[] getRedeliveryStatuses(String sid) throws JMException {
+// DestinationSequence ds = getDestinationSeq(sid);
+// if (null == ds) {
+// throw new IllegalArgumentException("no sequence");
+// }
+// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+// Map<Long, RetryStatus> rsmap = rq.getRedeliveryStatuses(ds);
+//
+// CompositeData[] rsps = new CompositeData[rsmap.size()];
+// int i = 0;
+// for (Map.Entry<Long, RetryStatus> rs : rsmap.entrySet()) {
+// rsps[i++] = getRetryStatusProperties(rs.getKey(), rs.getValue());
+// }
+// return rsps;
+// }
+
+// @ManagedOperation(description = "List of UnDelivered Message Numbers")
+// @ManagedOperationParameters({
+// @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+// })
+// public Long[] getUnDeliveredMessageIdentifiers(String sid) {
+// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+// DestinationSequence ds = getDestinationSeq(sid);
+// if (null == ds) {
+// throw new IllegalArgumentException("no sequence");
+// }
+//
+// List<Long> numbers = rq.getUndeliveredMessageNumbers(ds);
+// return numbers.toArray(new Long[numbers.size()]);
+// }
+
@ManagedOperation(description = "List of Source Sequence IDs")
@ManagedOperationParameters({
@ManagedOperationParameter(name = "expired", description = "The expired sequences included")
@@ -316,6 +387,32 @@ public class ManagedRMEndpoint implement
rq.resume(ss);
}
+// @ManagedOperation(description = "Suspend Redelivery Queue")
+// @ManagedOperationParameters({
+// @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+// })
+// public void suspendDestinationQueue(String sid) throws JMException {
+// DestinationSequence ds = getDestinationSeq(sid);
+// if (null == ds) {
+// throw new IllegalArgumentException("no sequence");
+// }
+// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+// rq.suspend(ds);
+// }
+
+// @ManagedOperation(description = "Resume Redelivery Queue")
+// @ManagedOperationParameters({
+// @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+// })
+// public void resumeDestinationQueue(String sid) throws JMException {
+// DestinationSequence ds = getDestinationSeq(sid);
+// if (null == ds) {
+// throw new JMException("no source sequence");
+// }
+// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+// rq.resume(ds);
+// }
+
@ManagedOperation(description = "Current Source Sequence Properties")
public CompositeData getCurrentSourceSequence() throws JMException {
Source source = endpoint.getSource();
@@ -363,7 +460,6 @@ public class ManagedRMEndpoint implement
return sps.toArray(new CompositeData[sps.size()]);
}
-
@ManagedOperation(description = "Destination Sequence Properties")
@ManagedOperationParameters({
@ManagedOperationParameter(name = "sequenceId", description = "The destination identifier")
@@ -400,6 +496,36 @@ public class ManagedRMEndpoint implement
return destination.getSequence(identifier);
}
+ @ManagedOperation(description = "Remove Source Sequence")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The destination identifier")
+ })
+ public void removeSourceSequence(String sid) throws JMException {
+ SourceSequence ss = getSourceSeq(sid);
+ if (null == ss) {
+ throw new JMException("no source sequence");
+ }
+ //TODO use cancel insted of suspend
+ RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
+ rq.suspend(ss);
+ ss.getSource().removeSequence(ss);
+ }
+
+ @ManagedOperation(description = "Remove Destination Sequence")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The destination identifier")
+ })
+ public void removeDestinationSequence(String sid) throws JMException {
+ DestinationSequence ds = getDestinationSeq(sid);
+ if (null == ds) {
+ throw new JMException("no source sequence");
+ }
+ //TODO use cancel insted of suspend
+// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+// rq.suspend(ds);
+ ds.getDestination().removeSequence(ds);
+ }
+
private static String getAddressValue(EndpointReferenceType epr) {
if (null != epr && null != epr.getAddress()) {
return epr.getAddress().getValue();
@@ -435,15 +561,13 @@ public class ManagedRMEndpoint implement
return dsps;
}
- private CompositeData getRetransmissionStatusProperties(long num,
- RetransmissionStatus rs) throws JMException {
+ private CompositeData getRetryStatusProperties(long num, RetryStatus rs) throws JMException {
CompositeData rsps = null;
if (null != rs) {
- Object[] rsv = new Object[] {num, rs.getResends(), rs.getPrevious(),
+ Object[] rsv = new Object[] {num, rs.getRetries(), rs.getPrevious(),
rs.getNext(), rs.getNextInterval(),
rs.getBackoff(), rs.isPending(), rs.isSuspended()};
- rsps = new CompositeDataSupport(retransmissionStatusType,
- RETRANSMISSION_STATUS_NAMES, rsv);
+ rsps = new CompositeDataSupport(retryStatusType, RETRY_STATUS_NAMES, rsv);
}
return rsps;
}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java?rev=1305836&r1=1305835&r2=1305836&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java Tue Mar 27 13:42:47 2012
@@ -67,14 +67,14 @@ public interface RetransmissionQueue {
* @param num
* @return
*/
- RetransmissionStatus getRetransmissionStatus(SourceSequence seq, long num);
+ RetryStatus getRetransmissionStatus(SourceSequence seq, long num);
/**
* Return the retransmission status of all the messages assigned to the sequence.
* @param seq
* @return
*/
- Map<Long, RetransmissionStatus> getRetransmissionStatuses(SourceSequence seq);
+ Map<Long, RetryStatus> getRetransmissionStatuses(SourceSequence seq);
/**
* Initiate resends.
Added: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetryStatus.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetryStatus.java?rev=1305836&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetryStatus.java (added)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetryStatus.java Tue Mar 27 13:42:47 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.Date;
+
+/**
+ * A generic interface to represent the retrying status of a repeating activity
+ * at some WS-RM component.
+ */
+public interface RetryStatus {
+ /**
+ * @return the next retry time
+ */
+ Date getNext();
+
+ /**
+ * @return the previous retry time
+ */
+ Date getPrevious();
+
+ /**
+ * @return the number of retries
+ */
+ int getRetries();
+
+ /**
+ * @return the max number of retries permitted
+ */
+ int getMaxRetries();
+
+ /**
+ * @return the nextInterval
+ */
+ long getNextInterval();
+
+ /**
+ * @return the backoff
+ */
+ long getBackoff();
+
+ /**
+ * @return the pending
+ */
+ boolean isPending();
+
+ /**
+ * @return the suspended
+ */
+ boolean isSuspended();
+}
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1305836&r1=1305835&r2=1305836&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Tue Mar 27 13:42:47 2012
@@ -62,7 +62,7 @@ import org.apache.cxf.ws.rm.RMProperties
import org.apache.cxf.ws.rm.RMUtils;
import org.apache.cxf.ws.rm.RetransmissionCallback;
import org.apache.cxf.ws.rm.RetransmissionQueue;
-import org.apache.cxf.ws.rm.RetransmissionStatus;
+import org.apache.cxf.ws.rm.RetryStatus;
import org.apache.cxf.ws.rm.SourceSequence;
import org.apache.cxf.ws.rm.persistence.RMStore;
import org.apache.cxf.ws.rm.policy.RM10PolicyUtils;
@@ -168,7 +168,7 @@ public class RetransmissionQueueImpl imp
return unacknowledged;
}
- public RetransmissionStatus getRetransmissionStatus(SourceSequence seq, long num) {
+ public RetryStatus getRetransmissionStatus(SourceSequence seq, long num) {
List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
if (null != sequenceCandidates) {
for (int i = 0; i < sequenceCandidates.size(); i++) {
@@ -184,8 +184,8 @@ public class RetransmissionQueueImpl imp
return null;
}
- public Map<Long, RetransmissionStatus> getRetransmissionStatuses(SourceSequence seq) {
- Map<Long, RetransmissionStatus> cp = new HashMap<Long, RetransmissionStatus>();
+ public Map<Long, RetryStatus> getRetransmissionStatuses(SourceSequence seq) {
+ Map<Long, RetryStatus> cp = new HashMap<Long, RetryStatus>();
List<ResendCandidate> sequenceCandidates = getSequenceCandidates(seq);
if (null != sequenceCandidates) {
for (int i = 0; i < sequenceCandidates.size(); i++) {
@@ -469,12 +469,12 @@ public class RetransmissionQueueImpl imp
/**
* Represents a candidate for resend, i.e. an unacked outgoing message.
*/
- protected class ResendCandidate implements Runnable, RetransmissionStatus {
+ protected class ResendCandidate implements Runnable, RetryStatus {
private Message message;
private OutputStream out;
private Date next;
private TimerTask nextTask;
- private int resends;
+ private int retries;
private long nextInterval;
private long backoff;
private boolean pending;
@@ -486,7 +486,7 @@ public class RetransmissionQueueImpl imp
*/
protected ResendCandidate(Message m) {
message = m;
- resends = 0;
+ retries = 0;
out = m.getContent(OutputStream.class);
org.apache.cxf.ws.rmp.v200502.RMAssertion rma =
RM10PolicyUtils.getRMAssertion(manager.getRMAssertion(), message);
@@ -555,11 +555,18 @@ public class RetransmissionQueueImpl imp
/**
* @return number of resend attempts
*/
- public int getResends() {
- return resends;
+ public int getRetries() {
+ return retries;
}
/**
+ * @return number of max resend attempts
+ */
+ public int getMaxRetries() {
+ return 0;
+ }
+
+ /**
* @return date of next resend
*/
public Date getNext() {
@@ -570,7 +577,7 @@ public class RetransmissionQueueImpl imp
* @return date of previous resend or null if no attempt is yet taken
*/
public Date getPrevious() {
- if (resends > 0) {
+ if (retries > 0) {
return new Date(next.getTime() - nextInterval / backoff);
}
return null;
@@ -652,7 +659,7 @@ public class RetransmissionQueueImpl imp
*/
protected synchronized void attempted() {
pending = false;
- resends++;
+ retries++;
if (null != next) {
next = new Date(next.getTime() + nextInterval);
nextInterval *= backoff;
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java?rev=1305836&r1=1305835&r2=1305836&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java Tue Mar 27 13:42:47 2012
@@ -147,7 +147,8 @@ public class ManagedRMManagerTest extend
assertTrue(o instanceof Integer);
assertEquals("No deferred acks", 0, o);
- o = mbs.invoke(endpointName, "getQueuedMessageTotalCount", null, null);
+ o = mbs.invoke(endpointName, "getQueuedMessageTotalCount",
+ new Object[]{true}, new String[]{"boolean"});
assertTrue(o instanceof Integer);
assertEquals("No queued messages", 0, o);
}
@@ -156,10 +157,10 @@ public class ManagedRMManagerTest extend
public void testManagedRMEndpointGetQueuedCount() throws Exception {
ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint();
- int n = managedEndpoint.getQueuedMessageTotalCount();
+ int n = managedEndpoint.getQueuedMessageTotalCount(true);
assertEquals(3, n);
- n = managedEndpoint.getQueuedMessageCount("seq1");
+ n = managedEndpoint.getQueuedMessageCount("seq1", true);
assertEquals(2, n);
}
@@ -263,9 +264,9 @@ public class ManagedRMManagerTest extend
assertTrue("seq3".equals(key) || "seq4".equals(key));
}
- private void verifyRetransmissionStatus(CompositeData cd, long num, RetransmissionStatus status) {
+ private void verifyRetransmissionStatus(CompositeData cd, long num, RetryStatus status) {
assertEquals(num, cd.get("messageNumber"));
- assertEquals(status.getResends(), cd.get("resends"));
+ assertEquals(status.getRetries(), cd.get("retries"));
assertEquals(status.getNext(), cd.get("next"));
assertEquals(status.getPrevious(), cd.get("previous"));
assertEquals(status.getNextInterval(), cd.get("nextInterval"));
@@ -394,7 +395,7 @@ public class ManagedRMManagerTest extend
private class TestRetransmissionQueue implements RetransmissionQueue {
private Set<String> suspended = new HashSet<String>();
- private RetransmissionStatus status = new TestRetransmissionStatus();
+ private RetryStatus status = new TestRetransmissionStatus();
public int countUnacknowledged(SourceSequence seq) {
final String key = seq.getIdentifier().getValue();
@@ -430,7 +431,7 @@ public class ManagedRMManagerTest extend
return list;
}
- public RetransmissionStatus getRetransmissionStatus(SourceSequence seq, long num) {
+ public RetryStatus getRetransmissionStatus(SourceSequence seq, long num) {
final String key = seq.getIdentifier().getValue();
if (("seq1".equals(key) && (2L == num || 4L == num))
|| ("seq2".equals(key) && (2L == num))) {
@@ -439,7 +440,7 @@ public class ManagedRMManagerTest extend
return null;
}
- public Map<Long, RetransmissionStatus> getRetransmissionStatuses(SourceSequence seq) {
+ public Map<Long, RetryStatus> getRetransmissionStatuses(SourceSequence seq) {
// TODO Auto-generated method stub
return null;
}
@@ -464,12 +465,12 @@ public class ManagedRMManagerTest extend
return suspended.contains(sid);
}
- RetransmissionStatus getRetransmissionStatus() {
+ RetryStatus getRetransmissionStatus() {
return status;
}
}
- private static class TestRetransmissionStatus implements RetransmissionStatus {
+ private static class TestRetransmissionStatus implements RetryStatus {
private long interval = 300000L;
private Date next = new Date(System.currentTimeMillis() + interval / 2);
private Date previous = new Date(next.getTime() - interval);
@@ -482,10 +483,14 @@ public class ManagedRMManagerTest extend
return previous;
}
- public int getResends() {
+ public int getRetries() {
return 2;
}
+ public int getMaxRetries() {
+ return -1;
+ }
+
public long getNextInterval() {
return interval;
}
Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java?rev=1305836&r1=1305835&r2=1305836&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java (original)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImplTest.java Tue Mar 27 13:42:47 2012
@@ -106,7 +106,7 @@ public class RetransmissionQueueImplTest
long now = System.currentTimeMillis();
RetransmissionQueueImpl.ResendCandidate candidate = queue.createResendCandidate(message);
assertSame(message, candidate.getMessage());
- assertEquals(0, candidate.getResends());
+ assertEquals(0, candidate.getRetries());
Date refDate = new Date(now + 5000);
assertTrue(!candidate.getNext().before(refDate));
refDate = new Date(now + 7000);
@@ -122,7 +122,7 @@ public class RetransmissionQueueImplTest
long now = System.currentTimeMillis();
RetransmissionQueueImpl.ResendCandidate candidate = queue.createResendCandidate(message);
candidate.attempted();
- assertEquals(1, candidate.getResends());
+ assertEquals(1, candidate.getRetries());
Date refDate = new Date(now + 15000);
assertTrue(!candidate.getNext().before(refDate));
refDate = new Date(now + 17000);
Modified: cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java?rev=1305836&r1=1305835&r2=1305836&view=diff
==============================================================================
--- cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java (original)
+++ cxf/trunk/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ManagedEndpointsTest.java Tue Mar 27 13:42:47 2012
@@ -154,11 +154,12 @@ public class ManagedEndpointsTest extend
new Object[]{true}, new String[]{"boolean"});
verifyArray("Expected sequence identifier", o, new String[]{dseqId});
- o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null, null);
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount",
+ new Object[]{true}, new String[]{"boolean"});
assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
o = mbs.invoke(clientEndpointName, "getQueuedMessageCount",
- new Object[]{sseqId}, new String[]{"java.lang.String"});
+ new Object[]{sseqId, true}, new String[]{"java.lang.String", "boolean"});
assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
o = mbs.invoke(clientEndpointName, "getCurrentSourceSequence", null, null);
@@ -180,7 +181,8 @@ public class ManagedEndpointsTest extend
greeter.greetMeOneWay("two"); // getting lost
greeter.greetMeOneWay("three"); // sent
- o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null, null);
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount",
+ new Object[]{true}, new String[]{"boolean"});
assertTrue("One queued message", o instanceof Integer && 1 == ((Integer)o).intValue());
o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange",
@@ -203,7 +205,8 @@ public class ManagedEndpointsTest extend
LOG.info("waiting for 12 secs for the retry to complete ...");
Thread.sleep(12000);
- o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null, null);
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount",
+ new Object[]{true}, new String[]{"boolean"});
assertTrue("No queued message", o instanceof Integer && 0 == ((Integer)o).intValue());
o = mbs.invoke(clientEndpointName, "getSourceSequenceAcknowledgedRange",
@@ -260,7 +263,8 @@ public class ManagedEndpointsTest extend
greeter.greetMeOneWay("two"); // sent but suspended
greeter.greetMeOneWay("three"); // sent but suspended
- o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null, null);
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount",
+ new Object[]{true}, new String[]{"boolean"});
assertTrue("One queued message", o instanceof Integer && 1 == ((Integer)o).intValue());
mbs.invoke(clientEndpointName, "suspendSourceQueue",
@@ -272,7 +276,8 @@ public class ManagedEndpointsTest extend
LOG.info("waiting for 10 secs for the retry (suspended)...");
Thread.sleep(10000);
- o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null, null);
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount",
+ new Object[]{true}, new String[]{"boolean"});
assertTrue("One queued message", o instanceof Integer && 1 == ((Integer)o).intValue());
mbs.invoke(clientEndpointName, "resumeSourceQueue",
@@ -282,7 +287,8 @@ public class ManagedEndpointsTest extend
LOG.info("waiting for 15 secs for the retry (resumed)...");
Thread.sleep(10000);
- o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount", null, null);
+ o = mbs.invoke(clientEndpointName, "getQueuedMessageTotalCount",
+ new Object[]{true}, new String[]{"boolean"});
assertTrue("No queued messages", o instanceof Integer && 0 == ((Integer)o).intValue());
}
@@ -323,7 +329,7 @@ public class ManagedEndpointsTest extend
assertTrue(value instanceof CompositeData);
CompositeData cd = (CompositeData)value;
verifyValue(cd, "messageNumber", num);
- verifyValue(cd, "resends", count);
+ verifyValue(cd, "retries", count);
Date now = new Date();
if (count > 0) {
assertTrue(now.after((Date)getValue(cd, "previous")));