You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2016/07/03 22:09:59 UTC
[02/20] cxf git commit: [CXF-4209] Server side message redelivery
support for WS-RM
[CXF-4209] Server side message redelivery support for WS-RM
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/6b8a340c
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/6b8a340c
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/6b8a340c
Branch: refs/heads/master-jaxrs-2.1
Commit: 6b8a340c7c5d5c6cd24f421d0caeaa368e94891d
Parents: 3a1fa0b
Author: Kai Rommel <ka...@sap.com>
Authored: Tue Jun 7 17:47:58 2016 +0200
Committer: Akitoshi Yoshida <ay...@apache.org>
Committed: Thu Jun 30 11:16:59 2016 +0200
----------------------------------------------------------------------
.../java/org/apache/cxf/ws/rm/Destination.java | 23 +
.../apache/cxf/ws/rm/DestinationSequence.java | 100 ++-
.../org/apache/cxf/ws/rm/ManagedRMEndpoint.java | 192 ++---
.../cxf/ws/rm/RMCaptureInInterceptor.java | 227 +++++-
.../cxf/ws/rm/RMCaptureOutInterceptor.java | 1 +
.../apache/cxf/ws/rm/RMDeliveryInterceptor.java | 12 +
.../java/org/apache/cxf/ws/rm/RMEndpoint.java | 3 +
.../org/apache/cxf/ws/rm/RMInInterceptor.java | 74 +-
.../java/org/apache/cxf/ws/rm/RMManager.java | 76 +-
.../java/org/apache/cxf/ws/rm/RMProperties.java | 18 +
.../org/apache/cxf/ws/rm/RedeliveryQueue.java | 106 +++
.../main/java/org/apache/cxf/ws/rm/Servant.java | 2 +-
.../org/apache/cxf/ws/rm/feature/RMFeature.java | 4 +-
.../apache/cxf/ws/rm/persistence/RMMessage.java | 9 +
.../cxf/ws/rm/persistence/jdbc/RMTxStore.java | 74 +-
.../cxf/ws/rm/soap/RMSoapInInterceptor.java | 5 +-
.../cxf/ws/rm/soap/RedeliveryQueueImpl.java | 699 +++++++++++++++++++
.../configuration/wsrm-manager-types.xsd | 15 +
.../schemas/configuration/wsrm-policy.xjb | 9 +
.../cxf/ws/rm/DestinationSequenceTest.java | 4 +-
.../org/apache/cxf/ws/rm/RMEndpointTest.java | 4 +
.../apache/cxf/ws/rm/RMInInterceptorTest.java | 25 +
.../org/apache/cxf/ws/rm/RMManagerTest.java | 43 +-
.../rm/persistence/jdbc/RMTxStoreTestBase.java | 4 +
.../cxf/systest/ws/rm/RedeliveryTest.java | 186 +++++
25 files changed, 1703 insertions(+), 212 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
index 178a63c..3d3489b 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
@@ -29,6 +29,7 @@ import java.util.logging.Logger;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
@@ -75,6 +76,14 @@ public class Destination extends AbstractEndpoint {
processingSequenceCount.incrementAndGet();
}
+ // this method ensures to keep the sequence until all the messages are delivered
+ public void terminateSequence(DestinationSequence seq) {
+ seq.terminate();
+ if (seq.allAcknowledgedMessagesDelivered()) {
+ removeSequence(seq);
+ }
+ }
+
public void removeSequence(DestinationSequence seq) {
DestinationSequence o;
o = map.remove(seq.getIdentifier().getValue());
@@ -208,6 +217,20 @@ public class Destination extends AbstractEndpoint {
long mn = sequenceType.getMessageNumber().longValue();
seq.processingComplete(mn);
seq.purgeAcknowledged(mn);
+ // remove acknowledged undelivered message
+ seq.removeDeliveringMessageNumber(mn);
+ if (seq.isTerminated() && seq.allAcknowledgedMessagesDelivered()) {
+ removeSequence(seq);
+ }
+ }
+ CachedOutputStream saved = (CachedOutputStream)message.remove(RMMessageConstants.SAVED_CONTENT);
+ if (saved != null) {
+ saved.releaseTempFileHold();
+ try {
+ saved.close();
+ } catch (IOException e) {
+ // ignore
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
index 58b7906..3442fc5 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
@@ -19,6 +19,8 @@
package org.apache.cxf.ws.rm;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -33,12 +35,15 @@ import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.continuations.Continuation;
import org.apache.cxf.continuations.ContinuationProvider;
import org.apache.cxf.continuations.SuspendedInvocationException;
+import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageUtils;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.ws.policy.PolicyVerificationInInterceptor;
import org.apache.cxf.ws.rm.RMConfiguration.DeliveryAssurance;
import org.apache.cxf.ws.rm.manager.AcksPolicyType;
+import org.apache.cxf.ws.rm.persistence.PersistenceUtils;
import org.apache.cxf.ws.rm.persistence.RMMessage;
import org.apache.cxf.ws.rm.persistence.RMStore;
import org.apache.cxf.ws.rm.v200702.Identifier;
@@ -55,6 +60,7 @@ public class DestinationSequence extends AbstractSequence {
private long lastMessageNumber;
private SequenceMonitor monitor;
private boolean acknowledgeOnNextOccasion;
+ private boolean terminated;
private List<DeferredAcknowledgment> deferredAcknowledgments;
private SequenceTermination scheduledTermination;
private String correlationID;
@@ -62,18 +68,25 @@ public class DestinationSequence extends AbstractSequence {
private volatile long highNumberCompleted;
private long nextInOrder;
private List<Continuation> continuations = new LinkedList<Continuation>();
+ // this map is used for robust and redelivery tracking. for redelivery it holds the beingDeliverd messages
private Set<Long> deliveringMessageNumbers = new HashSet<Long>();
public DestinationSequence(Identifier i, EndpointReferenceType a, Destination d, ProtocolVariation pv) {
- this(i, a, 0, null, pv);
+ this(i, a, 0, false, null, pv);
destination = d;
}
public DestinationSequence(Identifier i, EndpointReferenceType a,
long lmn, SequenceAcknowledgement ac, ProtocolVariation pv) {
+ this(i, a, lmn, false, ac, pv);
+ }
+
+ public DestinationSequence(Identifier i, EndpointReferenceType a,
+ long lmn, boolean t, SequenceAcknowledgement ac, ProtocolVariation pv) {
super(i, pv);
acksTo = a;
lastMessageNumber = lmn;
+ terminated = t;
acknowledgement = ac;
if (null == acknowledgement) {
acknowledgement = new SequenceAcknowledgement();
@@ -122,6 +135,7 @@ public class DestinationSequence extends AbstractSequence {
}
monitor.acknowledgeMessage();
+ boolean updated = false;
synchronized (this) {
boolean done = false;
@@ -136,11 +150,13 @@ public class DestinationSequence extends AbstractSequence {
long diff = r.getLower() - messageNumber;
if (diff == 1) {
r.setLower(messageNumber);
+ updated = true;
done = true;
} else if (diff > 0) {
break;
} else if (messageNumber - r.getUpper().longValue() == 1) {
r.setUpper(messageNumber);
+ updated = true;
done = true;
break;
}
@@ -152,6 +168,7 @@ public class DestinationSequence extends AbstractSequence {
AcknowledgementRange range = new AcknowledgementRange();
range.setLower(messageNumber);
range.setUpper(messageNumber);
+ updated = true;
acknowledgement.getAcknowledgementRange().add(i, range);
if (acknowledgement.getAcknowledgementRange().size() > 1) {
@@ -163,18 +180,45 @@ public class DestinationSequence extends AbstractSequence {
mergeRanges();
}
- RMStore store = destination.getManager().getStore();
- if (null != store) {
- RMMessage msg = null;
- if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))) {
- msg = new RMMessage();
- CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
- msg.setContent(cos);
- msg.setContentType((String) message.get(Message.CONTENT_TYPE));
- msg.setMessageNumber(st.getMessageNumber());
+ if (updated) {
+ RMStore store = destination.getManager().getStore();
+ if (null != store) {
+ // only save message, when policy verification is successful
+ // otherwise msgs will be stored and redelivered which do not pass initial verification
+ // as interceptor is called in a later phase than the capturing
+ PolicyVerificationInInterceptor intercep = new PolicyVerificationInInterceptor();
+ boolean policiesVerified = false;
+ try {
+ intercep.handleMessage(message);
+ policiesVerified = true;
+ } catch (Fault e) {
+ // Ignore
+ }
+ RMMessage msg = null;
+ if (policiesVerified
+ && !MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))) {
+ try {
+ msg = new RMMessage();
+ CachedOutputStream cos = (CachedOutputStream)message
+ .get(RMMessageConstants.SAVED_CONTENT);
+ msg.setMessageNumber(st.getMessageNumber());
+ msg.setCreatedTime(rmps.getCreatedTime());
+ // in case no attachments are available, cos can be saved directly
+ if (message.getAttachments() == null) {
+ msg.setContent(cos);
+ msg.setContentType((String)message.get(Message.CONTENT_TYPE));
+ } else {
+ InputStream is = cos.getInputStream();
+ PersistenceUtils.encodeRMContent(msg, message, is);
+ }
+ store.persistIncoming(this, msg);
+ } catch (IOException e) {
+ throw new Fault(e);
+ }
+ }
}
- store.persistIncoming(this, msg);
}
+ deliveringMessageNumbers.add(messageNumber);
RMEndpoint reliableEndpoint = destination.getReliableEndpoint();
RMConfiguration cfg = reliableEndpoint.getConfiguration();
@@ -277,7 +321,7 @@ public class DestinationSequence extends AbstractSequence {
return false;
}
if (robustDelivering) {
- deliveringMessageNumbers.add(mn);
+ addDeliveringMessageNumber(mn);
}
if (config.isInOrder()) {
return waitInQueue(mn, canSkip, message, cont);
@@ -286,9 +330,21 @@ public class DestinationSequence extends AbstractSequence {
}
void removeDeliveringMessageNumber(long mn) {
- deliveringMessageNumbers.remove(mn);
+ synchronized (deliveringMessageNumbers) {
+ deliveringMessageNumbers.remove(mn);
+ }
+ }
+ void addDeliveringMessageNumber(long mn) {
+ synchronized (deliveringMessageNumbers) {
+ deliveringMessageNumbers.add(mn);
+ }
}
+ // this method is only used for redelivery
+ boolean allAcknowledgedMessagesDelivered() {
+ return deliveringMessageNumbers.isEmpty();
+ }
+
private Continuation getContinuation(Message message) {
if (message == null) {
return null;
@@ -496,6 +552,22 @@ public class DestinationSequence extends AbstractSequence {
}
}
}
+
+ void terminate() {
+ if (!terminated) {
+ terminated = true;
+ RMStore store = destination.getManager().getStore();
+ if (null == store) {
+ return;
+ }
+ // only updating the sequence
+ store.persistIncoming(this, null);
+ }
+ }
+
+ public boolean isTerminated() {
+ return terminated;
+ }
final class SequenceTermination extends TimerTask {
@@ -521,7 +593,7 @@ public class DestinationSequence extends AbstractSequence {
LogUtils.log(LOG, Level.WARNING, "TERMINATING_INACTIVE_SEQ_MSG",
DestinationSequence.this.getIdentifier().getValue());
- DestinationSequence.this.destination.removeSequence(DestinationSequence.this);
+ DestinationSequence.this.destination.terminateSequence(DestinationSequence.this);
} else {
// reschedule
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
index b96361d..c14bd84 100755
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
@@ -136,8 +136,7 @@ public class ManagedRMEndpoint implements ManagedComponent {
if (outbound) {
return endpoint.getManager().getRetransmissionQueue().countUnacknowledged();
} else {
-// return endpoint.getManager().getRedeliveryQueue().countUndelivered();
- return 0;
+ return endpoint.getManager().getRedeliveryQueue().countUndelivered();
}
}
@@ -155,12 +154,11 @@ public class ManagedRMEndpoint implements ManagedComponent {
}
return manager.getRetransmissionQueue().countUnacknowledged(ss);
} else {
-// DestinationSequence ds = getDestinationSeq(sid);
-// if (null == ds) {
-// throw new IllegalArgumentException("no sequence");
-// }
-// return manager.getRedeliveryQueue().countUndelivered(ds);
- return 0;
+ DestinationSequence ds = getDestinationSeq(sid);
+ if (null == ds) {
+ throw new IllegalArgumentException("no sequence");
+ }
+ return manager.getRedeliveryQueue().countUndelivered(ds);
}
}
@@ -280,55 +278,55 @@ public class ManagedRMEndpoint implements ManagedComponent {
return rsps;
}
-// @ManagedOperation(description = "Redelivery Status")
-// @ManagedOperationParameters({
-// @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier"),
-// @ManagedOperationParameter(name = "messageNumber", description = "The message number")
-// })
-// public CompositeData getRedeliveryStatus(String sid, long num) throws JMException {
-// DestinationSequence ds = getDestinationSeq(sid);
-// if (null == ds) {
-// throw new IllegalArgumentException("no sequence");
-// }
-// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
-// RetryStatus rs = rq.getRedeliveryStatus(ds, num);
-// return getRetryStatusProperties(num, rs);
-// }
-
-// @ManagedOperation(description = "Redelivery Statuses")
-// @ManagedOperationParameters({
-// @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
-// })
-// public CompositeData[] getRedeliveryStatuses(String sid) throws JMException {
-// DestinationSequence ds = getDestinationSeq(sid);
-// if (null == ds) {
-// throw new IllegalArgumentException("no sequence");
-// }
-// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
-// Map<Long, RetryStatus> rsmap = rq.getRedeliveryStatuses(ds);
-//
-// CompositeData[] rsps = new CompositeData[rsmap.size()];
-// int i = 0;
-// for (Map.Entry<Long, RetryStatus> rs : rsmap.entrySet()) {
-// rsps[i++] = getRetryStatusProperties(rs.getKey(), rs.getValue());
-// }
-// return rsps;
-// }
-
-// @ManagedOperation(description = "List of UnDelivered Message Numbers")
-// @ManagedOperationParameters({
-// @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
-// })
-// public Long[] getUnDeliveredMessageIdentifiers(String sid) {
-// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
-// DestinationSequence ds = getDestinationSeq(sid);
-// if (null == ds) {
-// throw new IllegalArgumentException("no sequence");
-// }
-//
-// List<Long> numbers = rq.getUndeliveredMessageNumbers(ds);
-// return numbers.toArray(new Long[numbers.size()]);
-// }
+ @ManagedOperation(description = "Redelivery Status")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier"),
+ @ManagedOperationParameter(name = "messageNumber", description = "The message number")
+ })
+ public CompositeData getRedeliveryStatus(String sid, long num) throws JMException {
+ DestinationSequence ds = getDestinationSeq(sid);
+ if (null == ds) {
+ throw new IllegalArgumentException("no sequence");
+ }
+ RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+ RetryStatus rs = rq.getRedeliveryStatus(ds, num);
+ return getRetryStatusProperties(num, rs);
+ }
+
+ @ManagedOperation(description = "Redelivery Statuses")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public CompositeData[] getRedeliveryStatuses(String sid) throws JMException {
+ DestinationSequence ds = getDestinationSeq(sid);
+ if (null == ds) {
+ throw new IllegalArgumentException("no sequence");
+ }
+ RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+ Map<Long, RetryStatus> rsmap = rq.getRedeliveryStatuses(ds);
+
+ CompositeData[] rsps = new CompositeData[rsmap.size()];
+ int i = 0;
+ for (Map.Entry<Long, RetryStatus> rs : rsmap.entrySet()) {
+ rsps[i++] = getRetryStatusProperties(rs.getKey(), rs.getValue());
+ }
+ return rsps;
+ }
+
+ @ManagedOperation(description = "List of UnDelivered Message Numbers")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public Long[] getUnDeliveredMessageIdentifiers(String sid) {
+ RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+ DestinationSequence ds = getDestinationSeq(sid);
+ if (null == ds) {
+ throw new IllegalArgumentException("no sequence");
+ }
+
+ List<Long> numbers = rq.getUndeliveredMessageNumbers(ds);
+ return numbers.toArray(new Long[numbers.size()]);
+ }
@ManagedOperation(description = "List of Source Sequence IDs")
@ManagedOperationParameters({
@@ -383,31 +381,31 @@ public class ManagedRMEndpoint implements ManagedComponent {
rq.resume(ss);
}
-// @ManagedOperation(description = "Suspend Redelivery Queue")
-// @ManagedOperationParameters({
-// @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
-// })
-// public void suspendDestinationQueue(String sid) throws JMException {
-// DestinationSequence ds = getDestinationSeq(sid);
-// if (null == ds) {
-// throw new IllegalArgumentException("no sequence");
-// }
-// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
-// rq.suspend(ds);
-// }
-
-// @ManagedOperation(description = "Resume Redelivery Queue")
-// @ManagedOperationParameters({
-// @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
-// })
-// public void resumeDestinationQueue(String sid) throws JMException {
-// DestinationSequence ds = getDestinationSeq(sid);
-// if (null == ds) {
-// throw new JMException("no source sequence");
-// }
-// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
-// rq.resume(ds);
-// }
+ @ManagedOperation(description = "Suspend Redelivery Queue")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public void suspendDestinationQueue(String sid) throws JMException {
+ DestinationSequence ds = getDestinationSeq(sid);
+ if (null == ds) {
+ throw new IllegalArgumentException("no sequence");
+ }
+ RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+ rq.suspend(ds);
+ }
+
+ @ManagedOperation(description = "Resume Redelivery Queue")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public void resumeDestinationQueue(String sid) throws JMException {
+ DestinationSequence ds = getDestinationSeq(sid);
+ if (null == ds) {
+ throw new JMException("no source sequence");
+ }
+ RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+ rq.resume(ds);
+ }
@ManagedOperation(description = "Current Source Sequence Properties")
public CompositeData getCurrentSourceSequence() throws JMException {
@@ -572,10 +570,13 @@ public class ManagedRMEndpoint implements ManagedComponent {
public void removeDestinationSequence(String sid) throws JMException {
DestinationSequence ds = getDestinationSeq(sid);
if (null == ds) {
- throw new JMException("no source sequence");
+ throw new JMException("no destination sequence");
}
-// RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
-// rq.suspend(ds);
+ RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+ if (rq.countUndelivered(ds) > 0) {
+ throw new JMException("sequence not empty");
+ }
+ rq.stop(ds);
ds.getDestination().removeSequence(ds);
}
@@ -591,6 +592,19 @@ public class ManagedRMEndpoint implements ManagedComponent {
RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
rq.purgeAll(ss);
}
+
+ @ManagedOperation(description = "Purge UnDelivered Messages")
+ @ManagedOperationParameters({
+ @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+ })
+ public void purgeUnDeliverededMessages(String sid) {
+ DestinationSequence ds = getDestinationSeq(sid);
+ if (null == ds) {
+ throw new IllegalArgumentException("no sequence");
+ }
+ RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+ rq.purgeAll(ds);
+ }
private static String getAddressValue(EndpointReferenceType epr) {
if (null != epr && null != epr.getAddress()) {
@@ -675,10 +689,10 @@ public class ManagedRMEndpoint implements ManagedComponent {
// return endpoint.getManager().countCompleted();
// }
-// @ManagedAttribute(description = "Number of Inbound Queued Messages", currencyTimeLimit = 10)
-// public int getQueuedMessagesInboundCount() {
-// return endpoint.getManager().getRedeliveryQueue().countUndelivered();
-// }
+ @ManagedAttribute(description = "Number of Inbound Queued Messages", currencyTimeLimit = 10)
+ public int getQueuedMessagesInboundCount() {
+ return endpoint.getManager().getRedeliveryQueue().countUndelivered();
+ }
// @ManagedAttribute(description = "Number of Inbound Completed Messages", currencyTimeLimit = 10)
// public int getCompletedMessagesInboundCount() {
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
index 9d48cbc..87ae210 100755
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
@@ -19,51 +19,242 @@
package org.apache.cxf.ws.rm;
+import java.io.Closeable;
+import java.io.IOException;
import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.logging.Logger;
+import javax.xml.soap.SOAPException;
+import javax.xml.soap.SOAPMessage;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+import javax.xml.transform.stream.StreamSource;
+
import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.StaxInInterceptor;
import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
import org.apache.cxf.phase.Phase;
+import org.apache.cxf.staxutils.StaxUtils;
+import org.apache.cxf.staxutils.transform.OutTransformWriter;
+import org.apache.cxf.ws.addressing.AddressingProperties;
/**
*
*/
public class RMCaptureInInterceptor extends AbstractRMInterceptor<Message> {
+
private static final Logger LOG = LogUtils.getLogger(RMCaptureInInterceptor.class);
-
+
public RMCaptureInInterceptor() {
- super(Phase.PRE_STREAM);
+ super(Phase.POST_STREAM);
+ addAfter(StaxInInterceptor.class.getName());
}
+ @Override
protected void handle(Message message) throws SequenceFault, RMException {
- LOG.entering(getClass().getName(), "handleMessage");
- // This message capturing mechanism will need to be changed at some point.
- // Until then, we keep this interceptor here and utilize the robust
- // option to avoid the unnecessary message capturing/caching.
- if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))) {
- InputStream is = message.getContent(InputStream.class);
- if (is != null) {
+
+ // all messages are initially captured as they cannot be distinguished at this phase
+ // Non application messages temp files are released (cos.releaseTempFileHold()) in RMInInterceptor
+ if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))
+ && (getManager().getStore() != null || (getManager().getDestinationPolicy() != null && getManager()
+ .getDestinationPolicy().getRetryPolicy() != null))) {
+
+ message.getInterceptorChain().add(new RMCaptureInEnd());
+ XMLStreamReader reader = message.getContent(XMLStreamReader.class);
+
+ if (null != reader) {
CachedOutputStream saved = new CachedOutputStream();
+ // REVISIT check factory for READER
try {
- IOUtils.copy(is, saved);
-
+ StaxUtils.copy(reader, saved);
saved.flush();
- is.close();
- saved.lockOutputStream();
-
+ saved.holdTempFile();
+ reader.close();
+ LOG.fine("Create new XMLStreamReader");
+ InputStream is = saved.getInputStream();
+ // keep References to clean-up tmp files in RMDeliveryInterceptor
+ setCloseable(message, saved, is);
+ XMLStreamReader newReader = StaxUtils.createXMLStreamReader(is);
+ StaxUtils.configureReader(reader, message);
+ message.setContent(XMLStreamReader.class, newReader);
LOG.fine("Capturing the original RM message");
- //RewindableInputStream ris = RewindableInputStream.makeRewindable(saved.getInputStream());
- message.setContent(InputStream.class, saved.getInputStream());
message.put(RMMessageConstants.SAVED_CONTENT, saved);
- } catch (Exception e) {
+ } catch (XMLStreamException | IOException e) {
throw new Fault(e);
}
+ } else {
+ org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message(
+ "No message found for redeliver", LOG, Collections.<String> emptyList());
+ RMException ex = new RMException(msg);
+ throw new Fault(ex);
+ }
+ }
+ }
+
+ private boolean isApplicationMessage(Message message) {
+ final AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);
+ if (null != maps && null != maps.getAction()) {
+ return !RMContextUtils.isRMProtocolMessage(maps.getAction().getValue());
+ }
+ return false;
+ }
+
+ private void setCloseable(Message message, CachedOutputStream cos, InputStream is) {
+ message.put("org.apache.cxf.ws.rm.content.closeable", new Closeable() {
+ @Override
+ public void close() throws IOException {
+ try {
+ is.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ try {
+ cos.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ }
+ });
+ }
+
+ /**
+ * RMCaptureInEnd interceptor is used to switch saved_content, in case WSS is activated.
+ */
+ private class RMCaptureInEnd extends AbstractPhaseInterceptor<Message> {
+ RMCaptureInEnd() {
+ super(Phase.PRE_LOGICAL);
+ addBefore(RMInInterceptor.class.getName());
+ }
+
+ @Override
+ public void handleFault(Message message) {
+ // in case of a SequenceFault SAVED_CONTENT must be released
+ Exception ex = message.getContent(Exception.class);
+ if (ex instanceof SequenceFault) {
+ Closeable closable = (Closeable)message.get("org.apache.cxf.ws.rm.content.closeable");
+ if (null != closable) {
+ try {
+ closable.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ }
+ CachedOutputStream saved = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+ if (saved != null) {
+ saved.releaseTempFileHold();
+ try {
+ saved.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ public void handleMessage(Message message) {
+ LOG.entering(getClass().getName(), "handleMessage");
+ // Capturing the soap envelope. In case of WSS was activated, decrypted envelope is captured.
+ if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))
+ && isApplicationMessage(message)
+ && (getManager().getStore() != null || (getManager().getDestinationPolicy() != null && getManager()
+ .getDestinationPolicy().getRetryPolicy() != null))) {
+
+ CachedOutputStream saved = new CachedOutputStream();
+ SOAPMessage soapMessage = message.getContent(SOAPMessage.class);
+
+ if (soapMessage != null) {
+ try {
+ javax.xml.transform.Source envelope = soapMessage.getSOAPPart().getContent();
+ StaxUtils.copy(envelope, saved);
+ saved.flush();
+ // create a new source part from cos
+ InputStream is = saved.getInputStream();
+ // close old saved content
+ closeOldSavedContent(message);
+ // keep References to clean-up tmp files in RMDeliveryInterceptor
+ setCloseable(message, saved, is);
+ StreamSource source = new StreamSource(is);
+ soapMessage.getSOAPPart().setContent(source);
+ // when WSS was activated, saved content still contains soap headers to be removed
+ message.put(RMMessageConstants.SAVED_CONTENT, removeUnnecessarySoapHeaders(saved));
+ } catch (SOAPException | IOException | XMLStreamException e) {
+ throw new Fault(e);
+ }
+ }
+ }
+ }
+
+ private void closeOldSavedContent(Message message) {
+ CachedOutputStream saved = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+ if (saved != null) {
+ saved.releaseTempFileHold();
+ try {
+ saved.close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ Closeable closable = (Closeable)message.get("org.apache.cxf.ws.rm.content.closeable");
+ if (null != closable) {
+ try {
+ closable.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ }
+ }
+
+ private CachedOutputStream removeUnnecessarySoapHeaders(CachedOutputStream saved) {
+ CachedOutputStream newSaved = new CachedOutputStream();
+
+ InputStream is = null;
+ try {
+ is = saved.getInputStream();
+ XMLStreamWriter capture = StaxUtils.createXMLStreamWriter(newSaved,
+ StandardCharsets.UTF_8.name());
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("{http://schemas.xmlsoap.org/ws/2005/02/rm}Sequence", "");
+ map.put("{http://schemas.xmlsoap.org/ws/2005/02/rm}SequenceAcknowledgement", "");
+ map.put("{http://docs.oasis-open.org/ws-rx/wsrm/200702}Sequence", "");
+ map.put("{http://docs.oasis-open.org/ws-rx/wsrm/200702}SequenceAcknowledgement", "");
+ map.put("{http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd}Security",
+ "");
+ // attributes to be removed
+ Map<String, String> amap = new HashMap<String, String>();
+ amap.put("{http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd}Id",
+ "");
+
+ capture = new OutTransformWriter(capture, map, Collections.<String, String> emptyMap(),
+ Collections.<String> emptyList(), amap, false, null);
+ StaxUtils.copy(new StreamSource(is), capture);
+ capture.flush();
+ capture.close();
+ newSaved.flush();
+ // hold temp file, otherwise it will be deleted in case msg was written to RMTxStore
+ // or resend was executed
+ newSaved.holdTempFile();
+ is.close();
+ } catch (IOException | XMLStreamException e) {
+ throw new Fault(e);
+ } finally {
+ if (null != is) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ }
}
+ return newSaved;
}
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
index b3c412d..af682c3 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
@@ -291,6 +291,7 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message> {
}
// serializes the message content and the attachments into
// the RMMessage content
+ msg.setCreatedTime(rmps.getCreatedTime());
PersistenceUtils.encodeRMContent(msg, message, is);
store.persistOutgoing(ss, msg);
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
index 262e066..ff34c95 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
@@ -19,6 +19,8 @@
package org.apache.cxf.ws.rm;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.logging.Logger;
import org.apache.cxf.common.logging.LogUtils;
@@ -60,5 +62,15 @@ public class RMDeliveryInterceptor extends AbstractRMInterceptor<Message> {
dest.acknowledge(message);
}
dest.processingComplete(message);
+
+ // close InputStream of RMCaptureInInterceptor, to delete tmp files in filesystem
+ Closeable closable = (Closeable)message.get("org.apache.cxf.ws.rm.content.closeable");
+ if (null != closable) {
+ try {
+ closable.close();
+ } catch (IOException e) {
+ // Ignore
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
index e393124..03f6d13 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
@@ -830,6 +830,9 @@ public class RMEndpoint {
for (SourceSequence ss : getSource().getAllSequences()) {
manager.getRetransmissionQueue().stop(ss);
}
+ for (DestinationSequence ds : getDestination().getAllSequences()) {
+ manager.getRedeliveryQueue().stop(ds);
+ }
// unregistering of this managed bean from the server is done by the bus itself
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
index 5b516e4..c82412b 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
@@ -24,6 +24,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageUtils;
@@ -32,6 +33,7 @@ import org.apache.cxf.ws.addressing.ContextUtils;
import org.apache.cxf.ws.addressing.MAPAggregator;
import org.apache.cxf.ws.rm.v200702.Identifier;
import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
+import org.apache.cxf.ws.rm.v200702.SequenceType;
import org.apache.cxf.ws.security.trust.STSUtils;
/**
@@ -49,17 +51,21 @@ public class RMInInterceptor extends AbstractRMInterceptor<Message> {
@Override
public void handleFault(Message message) {
message.put(MAPAggregator.class.getName(), true);
- if (RMContextUtils.getProtocolVariation(message) != null
- && MessageUtils.isTrue(message.get(RMMessageConstants.DELIVERING_ROBUST_ONEWAY))) {
- // revert the delivering entry from the destination sequence
- try {
- Destination destination = getManager().getDestination(message);
- if (destination != null) {
- destination.releaseDeliveringStatus(message);
+ if (RMContextUtils.getProtocolVariation(message) != null) {
+ if (MessageUtils.isTrue(message.get(RMMessageConstants.DELIVERING_ROBUST_ONEWAY))) {
+ // revert the delivering entry from the destination sequence
+ try {
+ Destination destination = getManager().getDestination(message);
+ if (destination != null) {
+ destination.releaseDeliveringStatus(message);
+ }
+ } catch (RMException e) {
+ LOG.log(Level.WARNING, "Failed to revert the delivering status");
}
- } catch (RMException e) {
- LOG.log(Level.WARNING, "Failed to revert the delivering status");
- }
+ } else if (isRedeliveryEnabled(message) && RMContextUtils.isServerSide(message)
+ && isApplicationMessage(message) && hasValidSequence(message)) {
+ getManager().getRedeliveryQueue().addUndelivered(message);
+ }
}
// make sure the fault is returned for an ws-rm related fault or an invalid ws-rm message
// note that OneWayProcessingInterceptor handles the robust case, hence not handled here.
@@ -82,6 +88,42 @@ public class RMInInterceptor extends AbstractRMInterceptor<Message> {
|| message.getContent(Exception.class) instanceof SequenceFault);
}
+ private boolean hasValidSequence(Message message) {
+ final RMProperties rmps = RMContextUtils.retrieveRMProperties(message, false);
+ if (rmps != null) {
+ SequenceType st = rmps.getSequence();
+ if (st != null && st.getIdentifier() != null) {
+ try {
+ Destination destination = getManager().getDestination(message);
+ if (destination != null && destination.getSequence(st.getIdentifier()) != null) {
+ return true;
+ }
+ } catch (RMException e) {
+ // fall through
+ }
+ }
+ }
+ return false;
+ }
+
+ private static boolean isApplicationMessage(Message message) {
+ final AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);
+ if (null != maps && null != maps.getAction()) {
+ return !RMContextUtils.isRMProtocolMessage(maps.getAction().getValue());
+ }
+ return false;
+ }
+
+ private boolean isRedeliveryEnabled(Message message) {
+ // deprecated redelivery mode check
+ if (MessageUtils.isTrue(message.getContextualProperty("org.apache.cxf.ws.rm.destination.redeliver"))) {
+ LOG.warning("Use RetryPolicy to enable the redelivery mode");
+ return true;
+ }
+ return getManager().getDestinationPolicy() != null
+ && getManager().getDestinationPolicy().getRetryPolicy() != null;
+ }
+
protected void handle(Message message) throws SequenceFault, RMException {
LOG.entering(getClass().getName(), "handleMessage");
@@ -138,10 +180,6 @@ public class RMInInterceptor extends AbstractRMInterceptor<Message> {
}
RMContextUtils.setProtocolVariation(message, protocol);
- // Destination destination = getManager().getDestination(message);
- // RMEndpoint rme = getManager().getReliableEndpoint(message);
- // Servant servant = new Servant(rme);
-
boolean isApplicationMessage = !RMContextUtils.isRMProtocolMessage(action);
LOG.fine("isApplicationMessage: " + isApplicationMessage);
@@ -164,6 +202,12 @@ public class RMInInterceptor extends AbstractRMInterceptor<Message> {
rme.receivedApplicationMessage();
}
} else {
+ // in case message is not an application message, release SAVED_CONTENT
+ // otherwise tmp files will not be closed
+ CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+ if (null != cos) {
+ cos.releaseTempFileHold();
+ }
rme.receivedControlMessage();
if (RM10Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION.equals(action)
|| RM11Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION.equals(action)) {
@@ -215,7 +259,7 @@ public class RMInInterceptor extends AbstractRMInterceptor<Message> {
final boolean robust =
MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
if (robust) {
- // set this property to change the acknlowledging behavior
+ // set this property to change the acknowledging behavior
message.put(RMMessageConstants.DELIVERING_ROBUST_ONEWAY, Boolean.TRUE);
}
destination.acknowledge(message);
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
index 9639cfe..6a0839e 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
@@ -53,6 +53,9 @@ import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.phase.PhaseInterceptorChain;
import org.apache.cxf.service.Service;
+import org.apache.cxf.service.model.BindingInfo;
+import org.apache.cxf.service.model.InterfaceInfo;
+import org.apache.cxf.service.model.ServiceInfo;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.ContextUtils;
@@ -70,6 +73,7 @@ import org.apache.cxf.ws.rm.persistence.PersistenceUtils;
import org.apache.cxf.ws.rm.persistence.RMMessage;
import org.apache.cxf.ws.rm.persistence.RMStore;
import org.apache.cxf.ws.rm.policy.RMPolicyUtilities;
+import org.apache.cxf.ws.rm.soap.RedeliveryQueueImpl;
import org.apache.cxf.ws.rm.soap.RetransmissionQueueImpl;
import org.apache.cxf.ws.rm.soap.SoapFaultFactory;
import org.apache.cxf.ws.rm.v200702.CloseSequenceType;
@@ -112,6 +116,7 @@ public class RMManager {
private RMStore store;
private SequenceIdentifierGenerator idGenerator;
private RetransmissionQueue retransmissionQueue;
+ private RedeliveryQueue redeliveryQueue;
private Map<Endpoint, RMEndpoint> reliableEndpoints = new ConcurrentHashMap<Endpoint, RMEndpoint>();
private AtomicReference<Timer> timer = new AtomicReference<Timer>();
private RMConfiguration configuration;
@@ -185,6 +190,14 @@ public class RMManager {
retransmissionQueue = rq;
}
+ public RedeliveryQueue getRedeliveryQueue() {
+ return redeliveryQueue;
+ }
+
+ public void setRedeliveryQueue(RedeliveryQueue redeliveryQueue) {
+ this.redeliveryQueue = redeliveryQueue;
+ }
+
public SequenceIdentifierGenerator getIdGenerator() {
return idGenerator;
}
@@ -556,10 +569,10 @@ public class RMManager {
}
for (DestinationSequence ds : dss) {
- reconverDestinationSequence(endpoint, conduit, rme.getDestination(), ds);
+ recoverDestinationSequence(endpoint, conduit, rme.getDestination(), ds);
}
retransmissionQueue.start();
-
+ redeliveryQueue.start();
}
private void recoverSourceSequence(Endpoint endpoint, Conduit conduit, Source s,
@@ -570,7 +583,7 @@ public class RMManager {
return;
}
LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
-
+ // only recover the sequence if there are pending messages
s.addSequence(ss, false);
// choosing an arbitrary valid source sequence as the current source sequence
if (s.getAssociatedSequence(null) == null && !ss.isExpired() && !ss.isLastMessage()) {
@@ -596,6 +609,7 @@ public class RMManager {
st.setMessageNumber(m.getMessageNumber());
RMProperties rmps = new RMProperties();
rmps.setSequence(st);
+ rmps.setCreatedTime(m.getCreatedTime());
rmps.exposeAs(ss.getProtocol().getWSRMNamespace());
if (ss.isLastMessage() && ss.getCurrentMessageNr() == m.getMessageNumber()) {
CloseSequenceType close = new CloseSequenceType();
@@ -623,10 +637,59 @@ public class RMManager {
}
}
- private void reconverDestinationSequence(Endpoint endpoint, Conduit conduit, Destination d,
+ private void recoverDestinationSequence(Endpoint endpoint, Conduit conduit, Destination d,
DestinationSequence ds) {
+ // always recover the sequence
d.addSequence(ds, false);
- //TODO add the redelivery code
+
+ Collection<RMMessage> ms = store.getMessages(ds.getIdentifier(), false);
+ if (null == ms || 0 == ms.size()) {
+ return;
+ }
+ LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
+
+ for (RMMessage m : ms) {
+ Message message = new MessageImpl();
+ Exchange exchange = new ExchangeImpl();
+ message.setExchange(exchange);
+ if (null != conduit) {
+ exchange.setConduit(conduit);
+ }
+ exchange.put(Endpoint.class, endpoint);
+ exchange.put(Service.class, endpoint.getService());
+ if (endpoint.getEndpointInfo().getService() != null) {
+ exchange.put(ServiceInfo.class, endpoint.getEndpointInfo().getService());
+ exchange.put(InterfaceInfo.class, endpoint.getEndpointInfo().getService().getInterface());
+ }
+ exchange.put(Binding.class, endpoint.getBinding());
+ exchange.put(BindingInfo.class, endpoint.getEndpointInfo().getBinding());
+ exchange.put(Bus.class, bus);
+
+ SequenceType st = new SequenceType();
+ st.setIdentifier(ds.getIdentifier());
+ st.setMessageNumber(m.getMessageNumber());
+ RMProperties rmps = new RMProperties();
+ rmps.setSequence(st);
+ rmps.setCreatedTime(m.getCreatedTime());
+ RMContextUtils.storeRMProperties(message, rmps, false);
+ try {
+ // RMMessage is stored in a serialized way, therefore
+ // RMMessage content must be splitted into soap root message
+ // and attachments
+ PersistenceUtils.decodeRMContent(m, message);
+ redeliveryQueue.addUndelivered(message);
+ // add acknowledged undelivered message
+ ds.addDeliveringMessageNumber(m.getMessageNumber());
+ } catch (IOException e) {
+ LOG.log(Level.SEVERE, "Error reading persisted message data", e);
+ }
+ }
+
+ // if no messages are recovered and the sequence has been already terminated, remove the sequence
+ if (ds.isTerminated() && ds.allAcknowledgedMessagesDelivered()) {
+ d.removeSequence(ds);
+ store.removeDestinationSequence(ds.getIdentifier());
+ }
}
RMEndpoint createReliableEndpoint(final Endpoint endpoint) {
@@ -664,6 +727,9 @@ public class RMManager {
if (null == retransmissionQueue) {
retransmissionQueue = new RetransmissionQueueImpl(this);
}
+ if (null == redeliveryQueue) {
+ redeliveryQueue = new RedeliveryQueueImpl(this);
+ }
if (null == idGenerator) {
idGenerator = new DefaultSequenceIdentifierGenerator();
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java
index 448bfc6..f137803 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java
@@ -37,6 +37,7 @@ public class RMProperties {
private CloseSequenceType closeSequence;
private String namespaceURI;
private boolean lastMessage;
+ private long createdTime = System.currentTimeMillis();
public Collection<SequenceAcknowledgement> getAcks() {
return acks;
@@ -120,4 +121,21 @@ public class RMProperties {
public void exposeAs(String uri) {
namespaceURI = uri;
}
+
+ /**
+ * Get the initial creation time of this RM properties instance.
+ * @return Returns the createdTime.
+ */
+ public long getCreatedTime() {
+ return createdTime;
+ }
+
+ /**
+ * Set the initial creation time of this RM properties instance.
+ *
+ * @param createdTime The createdTime to set.
+ */
+ public void setCreatedTime(long createdTime) {
+ this.createdTime = createdTime;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RedeliveryQueue.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RedeliveryQueue.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RedeliveryQueue.java
new file mode 100644
index 0000000..4737859
--- /dev/null
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RedeliveryQueue.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cxf.message.Message;
+
+public interface RedeliveryQueue {
+
+
+ String DEFAULT_BASE_REDELIVERY_INTERVAL = "3000";
+ int DEFAULT_EXPONENTIAL_BACKOFF = 2;
+
+ /**
+ * @param seq the sequence under consideration
+ * @return the number of unacknowledged messages for that sequence
+ */
+ int countUndelivered(DestinationSequence seq);
+
+ /**
+ * @return the total number of undelivered messages in this queue
+ */
+ int countUndelivered();
+
+ /**
+ * @return true if there are no unacknowledged messages in the queue
+ */
+ boolean isEmpty();
+
+ /**
+ * Accepts a failed message for possible future redelivery.
+ * @param message the message context.
+ */
+ void addUndelivered(Message message);
+
+ /**
+ * Purge all candiates for the given sequence.
+ *
+ * @param seq the sequence object
+ */
+ void purgeAll(DestinationSequence seq);
+
+ /**
+ *
+ * @param seq
+ * @return
+ */
+ List<Long> getUndeliveredMessageNumbers(DestinationSequence seq);
+
+ /**
+ * Returns the retransmission status for the specified message.
+ * @param seq
+ * @param num
+ * @return
+ */
+ RetryStatus getRedeliveryStatus(DestinationSequence seq, long num);
+
+ /**
+ * Return the retransmission status of all the messages assigned to the sequence.
+ * @param seq
+ * @return
+ */
+ Map<Long, RetryStatus> getRedeliveryStatuses(DestinationSequence seq);
+
+ /**
+ * Initiate resends.
+ */
+ void start();
+
+ /**
+ * Stops redelivery queue.
+ * @param seq
+ */
+ void stop(DestinationSequence seq);
+
+ /**
+ * Suspends the redelivery attempts for the specified sequence
+ * @param seq
+ */
+ void suspend(DestinationSequence seq);
+
+ /**
+ * Resumes the redelivery attempts for the specified sequence
+ * @param seq
+ */
+ void resume(DestinationSequence seq);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
index 7cfeade..80a64bc 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
@@ -245,7 +245,7 @@ public class Servant implements Invoker {
return null;
}
- destination.removeSequence(terminatedSeq);
+ destination.terminateSequence(terminatedSeq);
// the following may be necessary if the last message for this sequence was a oneway
// request and hence there was no response to which a last message could have been added
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
index 08cbf2a..90db17b 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
@@ -123,9 +123,7 @@ public class RMFeature extends AbstractFeature {
provider.getInInterceptors().add(rmLogicalIn);
provider.getInInterceptors().add(rmInCodec);
provider.getInInterceptors().add(rmDelivery);
- if (null != store) {
- provider.getInInterceptors().add(rmCaptureIn);
- }
+ provider.getInInterceptors().add(rmCaptureIn);
provider.getOutInterceptors().add(rmLogicalOut);
provider.getOutInterceptors().add(rmOutCodec);
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
index 348117c..ad61b04 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
@@ -32,6 +32,7 @@ public class RMMessage {
private String contentType;
private long messageNumber;
private String to;
+ private long createdTime;
/**
* Returns the message number of the message within its sequence.
@@ -120,4 +121,12 @@ public class RMMessage {
this.contentType = contentType;
}
+ public long getCreatedTime() {
+ return createdTime;
+ }
+
+ public void setCreatedTime(long createdTime) {
+ this.createdTime = createdTime;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
index 641df46..e3cfa0b 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
@@ -75,6 +75,7 @@ public class RMTxStore implements RMStore {
{"LAST_MSG_NO", "DECIMAL(19, 0)"},
{"ENDPOINT_ID", "VARCHAR(1024)"},
{"ACKNOWLEDGED", "BLOB"},
+ {"TERMINATED", "CHAR(1)"},
{"PROTOCOL_VERSION", "VARCHAR(256)"}};
private static final String[] DEST_SEQUENCES_TABLE_KEYS = {"SEQ_ID"};
private static final String[][] SRC_SEQUENCES_TABLE_COLS
@@ -90,6 +91,7 @@ public class RMTxStore implements RMStore {
= {{"SEQ_ID", "VARCHAR(256) NOT NULL"},
{"MSG_NO", "DECIMAL(19, 0) NOT NULL"},
{"SEND_TO", "VARCHAR(256)"},
+ {"CREATED_TIME", "DECIMAL(19, 0)"},
{"CONTENT", "BLOB"},
{"CONTENT_TYPE", "VARCHAR(1024)"}};
private static final String[] MESSAGES_TABLE_KEYS = {"SEQ_ID", "MSG_NO"};
@@ -119,27 +121,27 @@ public class RMTxStore implements RMStore {
private static final String DELETE_SRC_SEQUENCE_STMT_STR =
"DELETE FROM CXF_RM_SRC_SEQUENCES WHERE SEQ_ID = ?";
private static final String UPDATE_DEST_SEQUENCE_STMT_STR =
- "UPDATE CXF_RM_DEST_SEQUENCES SET LAST_MSG_NO = ?, ACKNOWLEDGED = ? WHERE SEQ_ID = ?";
+ "UPDATE CXF_RM_DEST_SEQUENCES SET LAST_MSG_NO = ?, TERMINATED = ?, ACKNOWLEDGED = ? WHERE SEQ_ID = ?";
private static final String UPDATE_SRC_SEQUENCE_STMT_STR =
"UPDATE CXF_RM_SRC_SEQUENCES SET CUR_MSG_NO = ?, LAST_MSG = ? WHERE SEQ_ID = ?";
private static final String CREATE_MESSAGE_STMT_STR
- = "INSERT INTO {0} (SEQ_ID, MSG_NO, SEND_TO, CONTENT, CONTENT_TYPE) VALUES(?, ?, ?, ?, ?)";
+ = "INSERT INTO {0} (SEQ_ID, MSG_NO, SEND_TO, CREATED_TIME, CONTENT, CONTENT_TYPE) VALUES(?, ?, ?, ?, ?, ?)";
private static final String DELETE_MESSAGE_STMT_STR =
"DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
private static final String SELECT_DEST_SEQUENCE_STMT_STR =
- "SELECT ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+ "SELECT ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, TERMINATED, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+ "WHERE SEQ_ID = ?";
private static final String SELECT_SRC_SEQUENCE_STMT_STR =
"SELECT CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID, PROTOCOL_VERSION FROM CXF_RM_SRC_SEQUENCES "
+ "WHERE SEQ_ID = ?";
private static final String SELECT_DEST_SEQUENCES_STMT_STR =
- "SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+ "SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, TERMINATED, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+ "WHERE ENDPOINT_ID = ?";
private static final String SELECT_SRC_SEQUENCES_STMT_STR =
"SELECT SEQ_ID, CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID, PROTOCOL_VERSION "
+ "FROM CXF_RM_SRC_SEQUENCES WHERE ENDPOINT_ID = ?";
private static final String SELECT_MESSAGES_STMT_STR =
- "SELECT MSG_NO, SEND_TO, CONTENT, CONTENT_TYPE FROM {0} WHERE SEQ_ID = ?";
+ "SELECT MSG_NO, SEND_TO, CREATED_TIME, CONTENT, CONTENT_TYPE FROM {0} WHERE SEQ_ID = ?";
private static final String ALTER_TABLE_STMT_STR =
"ALTER TABLE {0} ADD {1} {2}";
private static final String CREATE_INBOUND_MESSAGE_STMT_STR =
@@ -395,13 +397,14 @@ public class RMTxStore implements RMStore {
EndpointReferenceType acksTo = RMUtils.createReference(res.getString(1));
long lm = res.getLong(2);
ProtocolVariation pv = decodeProtocolVersion(res.getString(3));
- InputStream is = res.getBinaryStream(4);
+ boolean t = res.getBoolean(4);
+ InputStream is = res.getBinaryStream(5);
SequenceAcknowledgement ack = null;
if (null != is) {
ack = PersistenceUtils.getInstance()
.deserialiseAcknowledgment(is);
}
- return new DestinationSequence(sid, acksTo, lm, ack, pv);
+ return new DestinationSequence(sid, acksTo, lm, t, ack, pv);
}
} catch (SQLException ex) {
conex = ex;
@@ -521,13 +524,14 @@ public class RMTxStore implements RMStore {
EndpointReferenceType acksTo = RMUtils.createReference(res.getString(2));
long lm = res.getLong(3);
ProtocolVariation pv = decodeProtocolVersion(res.getString(4));
- InputStream is = res.getBinaryStream(5);
+ boolean t = res.getBoolean(5);
+ InputStream is = res.getBinaryStream(6);
SequenceAcknowledgement ack = null;
if (null != is) {
ack = PersistenceUtils.getInstance()
.deserialiseAcknowledgment(is);
}
- DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, ack, pv);
+ DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, t, ack, pv);
seqs.add(seq);
}
} catch (SQLException ex) {
@@ -596,11 +600,13 @@ public class RMTxStore implements RMStore {
while (res.next()) {
long mn = res.getLong(1);
String to = res.getString(2);
- Blob blob = res.getBlob(3);
- String contentType = res.getString(4);
+ long ct = res.getLong(3);
+ Blob blob = res.getBlob(4);
+ String contentType = res.getString(5);
RMMessage msg = new RMMessage();
msg.setMessageNumber(mn);
msg.setTo(to);
+ msg.setCreatedTime(ct);
CachedOutputStream cos = new CachedOutputStream();
IOUtils.copyAndCloseInput(blob.getBinaryStream(), cos);
cos.flush();
@@ -752,8 +758,9 @@ public class RMTxStore implements RMStore {
stmt.setString(1, id);
stmt.setLong(2, nr);
stmt.setString(3, to);
- stmt.setBinaryStream(4, msgin);
- stmt.setString(5, contentType);
+ stmt.setLong(4, msg.getCreatedTime());
+ stmt.setBinaryStream(5, msgin);
+ stmt.setString(6, contentType);
stmt.execute();
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "Successfully stored {0} message number {1} for sequence {2}",
@@ -805,10 +812,11 @@ public class RMTxStore implements RMStore {
stmt = getStatement(con, UPDATE_DEST_SEQUENCE_STMT_STR);
long lastMessageNr = seq.getLastMessageNumber();
- stmt.setLong(1, lastMessageNr);
+ stmt.setLong(1, lastMessageNr);
+ stmt.setString(2, seq.isTerminated() ? "1" : "0");
InputStream is = PersistenceUtils.getInstance().serialiseAcknowledgment(seq.getAcknowledgment());
- stmt.setBinaryStream(2, is, is.available());
- stmt.setString(3, seq.getIdentifier().getValue());
+ stmt.setBinaryStream(3, is, is.available());
+ stmt.setString(4, seq.getIdentifier().getValue());
stmt.execute();
} finally {
releaseResources(stmt, null);
@@ -884,12 +892,11 @@ public class RMTxStore implements RMStore {
}
protected void verifyTable(Connection con, String tableName, String[][] tableCols) {
- List<String[]> newCols = new ArrayList<String[]>();
- ResultSet rs = null;
try {
DatabaseMetaData metadata = con.getMetaData();
- rs = metadata.getColumns(null, null, tableName, "%");
+ ResultSet rs = metadata.getColumns(null, null, tableName, "%");
Set<String> dbCols = new HashSet<String>();
+ List<String[]> newCols = new ArrayList<String[]>();
while (rs.next()) {
dbCols.add(rs.getString(4));
}
@@ -898,26 +905,12 @@ public class RMTxStore implements RMStore {
newCols.add(col);
}
}
- } catch (SQLException ex) {
- if (LOG.isLoggable(Level.FINE)) {
- LOG.fine("Table " + tableName + " cannot be verified.");
- }
- } finally {
- if (rs != null) {
- try {
- rs.close();
- } catch (SQLException e) {
- // ignore
+ if (newCols.size() > 0) {
+ // need to add the new columns
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "Table " + tableName + " needs additional columns");
}
- }
- }
- if (newCols.size() > 0) {
- // need to add the new columns
- if (LOG.isLoggable(Level.FINE)) {
- LOG.log(Level.FINE, "Table " + tableName + " needs additional columns");
- }
-
- try {
+
for (String[] newCol : newCols) {
Statement st = con.createStatement();
try {
@@ -931,9 +924,9 @@ public class RMTxStore implements RMStore {
st.close();
}
}
- } catch (SQLException ex) {
- LOG.log(Level.WARNING, "Table " + tableName + " cannot be altered.", ex);
}
+ } catch (SQLException ex) {
+ LOG.log(Level.WARNING, "Table " + tableName + " cannot be altered.", ex);
}
}
@@ -965,7 +958,6 @@ public class RMTxStore implements RMStore {
for (int i = 0; i < SET_SCHEMA_STMT_STRS.length; i++) {
try {
stmt.executeUpdate(MessageFormat.format(SET_SCHEMA_STMT_STRS[i], schemaName));
- ex0 = null;
break;
} catch (SQLException ex) {
ex.setNextException(ex0);
http://git-wip-us.apache.org/repos/asf/cxf/blob/6b8a340c/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java
index 8e1b25e..9215031 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java
@@ -139,7 +139,10 @@ public class RMSoapInInterceptor extends AbstractSoapInterceptor {
* @return the RM properties
*/
public RMProperties unmarshalRMProperties(SoapMessage message) {
- RMProperties rmps = new RMProperties();
+ RMProperties rmps = (RMProperties)message.get(RMContextUtils.getRMPropertiesKey(false));
+ if (rmps == null) {
+ rmps = new RMProperties();
+ }
List<Header> headers = message.getHeaders();
if (headers != null) {
decodeHeaders(message, headers, rmps);