You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2016/07/22 17:06:12 UTC

[7/9] cxf git commit: [CXF-4209] Server side message redelivery support for WS-RM

[CXF-4209] Server side message redelivery support for WS-RM


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0dd29509
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0dd29509
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0dd29509

Branch: refs/heads/3.1.x-fixes
Commit: 0dd29509e42fc412ec0cf214e66885d26da9850e
Parents: 661c0bd
Author: Kai Rommel <ka...@sap.com>
Authored: Tue Jun 7 17:47:58 2016 +0200
Committer: Daniel Kulp <dk...@apache.org>
Committed: Fri Jul 22 12:13:14 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/cxf/ws/rm/Destination.java  |  23 +
 .../apache/cxf/ws/rm/DestinationSequence.java   | 100 ++-
 .../org/apache/cxf/ws/rm/ManagedRMEndpoint.java | 192 ++---
 .../cxf/ws/rm/RMCaptureInInterceptor.java       | 227 +++++-
 .../cxf/ws/rm/RMCaptureOutInterceptor.java      |   1 +
 .../apache/cxf/ws/rm/RMDeliveryInterceptor.java |  12 +
 .../java/org/apache/cxf/ws/rm/RMEndpoint.java   |   3 +
 .../org/apache/cxf/ws/rm/RMInInterceptor.java   |  74 +-
 .../java/org/apache/cxf/ws/rm/RMManager.java    |  76 +-
 .../java/org/apache/cxf/ws/rm/RMProperties.java |  18 +
 .../org/apache/cxf/ws/rm/RedeliveryQueue.java   | 106 +++
 .../main/java/org/apache/cxf/ws/rm/Servant.java |   2 +-
 .../org/apache/cxf/ws/rm/feature/RMFeature.java |   4 +-
 .../apache/cxf/ws/rm/persistence/RMMessage.java |   9 +
 .../cxf/ws/rm/persistence/jdbc/RMTxStore.java   |  74 +-
 .../cxf/ws/rm/soap/RMSoapInInterceptor.java     |   5 +-
 .../cxf/ws/rm/soap/RedeliveryQueueImpl.java     | 699 +++++++++++++++++++
 .../configuration/wsrm-manager-types.xsd        |  15 +
 .../schemas/configuration/wsrm-policy.xjb       |   9 +
 .../cxf/ws/rm/DestinationSequenceTest.java      |   4 +-
 .../org/apache/cxf/ws/rm/RMEndpointTest.java    |   4 +
 .../apache/cxf/ws/rm/RMInInterceptorTest.java   |  25 +
 .../org/apache/cxf/ws/rm/RMManagerTest.java     |  43 +-
 .../rm/persistence/jdbc/RMTxStoreTestBase.java  |   4 +
 .../cxf/systest/ws/rm/RedeliveryTest.java       | 186 +++++
 25 files changed, 1703 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
index 178a63c..3d3489b 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Destination.java
@@ -29,6 +29,7 @@ import java.util.logging.Logger;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
@@ -75,6 +76,14 @@ public class Destination extends AbstractEndpoint {
         processingSequenceCount.incrementAndGet();
     }
 
+    // this method ensures to keep the sequence until all the messages are delivered
+    public void terminateSequence(DestinationSequence seq) {
+        seq.terminate();
+        if (seq.allAcknowledgedMessagesDelivered()) {
+            removeSequence(seq);
+        }
+    }
+
     public void removeSequence(DestinationSequence seq) {
         DestinationSequence o;
         o = map.remove(seq.getIdentifier().getValue());
@@ -208,6 +217,20 @@ public class Destination extends AbstractEndpoint {
             long mn = sequenceType.getMessageNumber().longValue();
             seq.processingComplete(mn);
             seq.purgeAcknowledged(mn);
+            // remove acknowledged undelivered message
+            seq.removeDeliveringMessageNumber(mn);
+            if (seq.isTerminated() && seq.allAcknowledgedMessagesDelivered()) {
+                removeSequence(seq);
+            }
+        }
+        CachedOutputStream saved = (CachedOutputStream)message.remove(RMMessageConstants.SAVED_CONTENT);
+        if (saved != null) {
+            saved.releaseTempFileHold();
+            try {
+                saved.close();
+            } catch (IOException e) {
+                // ignore
+            }
         }
     }
     

http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
index 58b7906..3442fc5 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/DestinationSequence.java
@@ -19,6 +19,8 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -33,12 +35,15 @@ import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
 import org.apache.cxf.continuations.SuspendedInvocationException;
+import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.ws.policy.PolicyVerificationInInterceptor;
 import org.apache.cxf.ws.rm.RMConfiguration.DeliveryAssurance;
 import org.apache.cxf.ws.rm.manager.AcksPolicyType;
+import org.apache.cxf.ws.rm.persistence.PersistenceUtils;
 import org.apache.cxf.ws.rm.persistence.RMMessage;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.v200702.Identifier;
@@ -55,6 +60,7 @@ public class DestinationSequence extends AbstractSequence {
     private long lastMessageNumber;
     private SequenceMonitor monitor;
     private boolean acknowledgeOnNextOccasion;
+    private boolean terminated;
     private List<DeferredAcknowledgment> deferredAcknowledgments;
     private SequenceTermination scheduledTermination;
     private String correlationID;
@@ -62,18 +68,25 @@ public class DestinationSequence extends AbstractSequence {
     private volatile long highNumberCompleted;
     private long nextInOrder;
     private List<Continuation> continuations = new LinkedList<Continuation>();
+    // this map is used for robust and redelivery tracking. for redelivery it holds the beingDeliverd messages
     private Set<Long> deliveringMessageNumbers = new HashSet<Long>();
     
     public DestinationSequence(Identifier i, EndpointReferenceType a, Destination d, ProtocolVariation pv) {
-        this(i, a, 0, null, pv);
+        this(i, a, 0, false, null, pv);
         destination = d;
     }
     
     public DestinationSequence(Identifier i, EndpointReferenceType a,
                               long lmn, SequenceAcknowledgement ac, ProtocolVariation pv) {
+        this(i, a, lmn, false, ac, pv);
+    }
+    
+    public DestinationSequence(Identifier i, EndpointReferenceType a,
+                              long lmn, boolean t, SequenceAcknowledgement ac, ProtocolVariation pv) {
         super(i, pv);
         acksTo = a;
         lastMessageNumber = lmn;
+        terminated = t;
         acknowledgement = ac;
         if (null == acknowledgement) {
             acknowledgement = new SequenceAcknowledgement();
@@ -122,6 +135,7 @@ public class DestinationSequence extends AbstractSequence {
         }        
         
         monitor.acknowledgeMessage();
+        boolean updated = false;
         
         synchronized (this) {
             boolean done = false;
@@ -136,11 +150,13 @@ public class DestinationSequence extends AbstractSequence {
                 long diff = r.getLower() - messageNumber;
                 if (diff == 1) {
                     r.setLower(messageNumber);
+                    updated = true;
                     done = true;
                 } else if (diff > 0) {
                     break;
                 } else if (messageNumber - r.getUpper().longValue() == 1) {
                     r.setUpper(messageNumber);
+                    updated = true;
                     done = true;
                     break;
                 }
@@ -152,6 +168,7 @@ public class DestinationSequence extends AbstractSequence {
                 AcknowledgementRange range = new AcknowledgementRange();
                 range.setLower(messageNumber);
                 range.setUpper(messageNumber);
+                updated = true;
                 acknowledgement.getAcknowledgementRange().add(i, range);
                 if (acknowledgement.getAcknowledgementRange().size() > 1) {
                     
@@ -163,18 +180,45 @@ public class DestinationSequence extends AbstractSequence {
             mergeRanges();
         }
 
-        RMStore store = destination.getManager().getStore();
-        if (null != store) {
-            RMMessage msg = null;
-            if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))) {
-                msg = new RMMessage();
-                CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
-                msg.setContent(cos);
-                msg.setContentType((String) message.get(Message.CONTENT_TYPE));
-                msg.setMessageNumber(st.getMessageNumber());
+        if (updated) {
+            RMStore store = destination.getManager().getStore();
+            if (null != store) {
+                // only save message, when policy verification is successful
+                // otherwise msgs will be stored and redelivered which do not pass initial verification
+                // as interceptor is called in a later phase than the capturing
+                PolicyVerificationInInterceptor intercep = new PolicyVerificationInInterceptor();
+                boolean policiesVerified = false;
+                try {
+                    intercep.handleMessage(message);
+                    policiesVerified = true;
+                } catch (Fault e) {
+                    // Ignore
+                }
+                RMMessage msg = null;
+                if (policiesVerified
+                    && !MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))) {
+                    try {
+                        msg = new RMMessage();
+                        CachedOutputStream cos = (CachedOutputStream)message
+                            .get(RMMessageConstants.SAVED_CONTENT);
+                        msg.setMessageNumber(st.getMessageNumber());
+                        msg.setCreatedTime(rmps.getCreatedTime());
+                        // in case no attachments are available, cos can be saved directly
+                        if (message.getAttachments() == null) {
+                            msg.setContent(cos);
+                            msg.setContentType((String)message.get(Message.CONTENT_TYPE));
+                        } else {
+                            InputStream is = cos.getInputStream();
+                            PersistenceUtils.encodeRMContent(msg, message, is);
+                        }
+                        store.persistIncoming(this, msg);
+                    } catch (IOException e) {
+                        throw new Fault(e);
+                    }
+                }
             }
-            store.persistIncoming(this, msg);
         }
+        deliveringMessageNumbers.add(messageNumber);
         
         RMEndpoint reliableEndpoint = destination.getReliableEndpoint();
         RMConfiguration cfg = reliableEndpoint.getConfiguration();
@@ -277,7 +321,7 @@ public class DestinationSequence extends AbstractSequence {
             return false;
         } 
         if (robustDelivering) {
-            deliveringMessageNumbers.add(mn);
+            addDeliveringMessageNumber(mn);
         }
         if (config.isInOrder()) {
             return waitInQueue(mn, canSkip, message, cont);
@@ -286,9 +330,21 @@ public class DestinationSequence extends AbstractSequence {
     }
     
     void removeDeliveringMessageNumber(long mn) {
-        deliveringMessageNumbers.remove(mn);
+        synchronized (deliveringMessageNumbers) {
+            deliveringMessageNumbers.remove(mn);
+        }
+    }
+    void addDeliveringMessageNumber(long mn) {
+        synchronized (deliveringMessageNumbers) {
+            deliveringMessageNumbers.add(mn);
+        }
     }
     
+    // this method is only used for redelivery
+    boolean allAcknowledgedMessagesDelivered() {
+        return deliveringMessageNumbers.isEmpty();
+    }
+
     private Continuation getContinuation(Message message) {
         if (message == null) {
             return null;
@@ -496,6 +552,22 @@ public class DestinationSequence extends AbstractSequence {
             } 
         }
     }
+
+    void terminate() {
+        if (!terminated) {
+            terminated = true;
+            RMStore store = destination.getManager().getStore();
+            if (null == store) {
+                return;
+            }
+            // only updating the sequence
+            store.persistIncoming(this, null);
+        }
+    }
+    
+    public boolean isTerminated() {
+        return terminated;
+    }
     
     final class SequenceTermination extends TimerTask {
         
@@ -521,7 +593,7 @@ public class DestinationSequence extends AbstractSequence {
                     
                     LogUtils.log(LOG, Level.WARNING, "TERMINATING_INACTIVE_SEQ_MSG", 
                                  DestinationSequence.this.getIdentifier().getValue());
-                    DestinationSequence.this.destination.removeSequence(DestinationSequence.this);
+                    DestinationSequence.this.destination.terminateSequence(DestinationSequence.this);
 
                 } else {
                    // reschedule 

http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
index b96361d..c14bd84 100755
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java
@@ -136,8 +136,7 @@ public class ManagedRMEndpoint implements ManagedComponent {
         if (outbound) {
             return endpoint.getManager().getRetransmissionQueue().countUnacknowledged(); 
         } else {
-//            return endpoint.getManager().getRedeliveryQueue().countUndelivered();
-            return 0;
+            return endpoint.getManager().getRedeliveryQueue().countUndelivered();
         }
     }
 
@@ -155,12 +154,11 @@ public class ManagedRMEndpoint implements ManagedComponent {
             }
             return manager.getRetransmissionQueue().countUnacknowledged(ss);
         } else {
-//            DestinationSequence ds = getDestinationSeq(sid);
-//            if (null == ds) {
-//                throw new IllegalArgumentException("no sequence");
-//            }
-//            return manager.getRedeliveryQueue().countUndelivered(ds);
-            return 0;
+            DestinationSequence ds = getDestinationSeq(sid);
+            if (null == ds) {
+                throw new IllegalArgumentException("no sequence");
+            }
+            return manager.getRedeliveryQueue().countUndelivered(ds);
         }
     }
 
@@ -280,55 +278,55 @@ public class ManagedRMEndpoint implements ManagedComponent {
         return rsps;
     }
 
-//     @ManagedOperation(description = "Redelivery Status")
-//     @ManagedOperationParameters({
-//         @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier"),
-//         @ManagedOperationParameter(name = "messageNumber", description = "The message number")
-//     })
-//     public CompositeData getRedeliveryStatus(String sid, long num) throws JMException {
-//         DestinationSequence ds = getDestinationSeq(sid);
-//         if (null == ds) {
-//             throw new IllegalArgumentException("no sequence");
-//         }
-//         RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
-//         RetryStatus rs = rq.getRedeliveryStatus(ds, num);
-//         return getRetryStatusProperties(num, rs);
-//     }
-
-//    @ManagedOperation(description = "Redelivery Statuses")
-//     @ManagedOperationParameters({
-//         @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
-//     })
-//     public CompositeData[] getRedeliveryStatuses(String sid) throws JMException {
-//         DestinationSequence ds = getDestinationSeq(sid);
-//         if (null == ds) {
-//             throw new IllegalArgumentException("no sequence");
-//         }
-//         RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
-//         Map<Long, RetryStatus> rsmap = rq.getRedeliveryStatuses(ds);
-//
-//         CompositeData[] rsps = new CompositeData[rsmap.size()];
-//         int i = 0;
-//         for (Map.Entry<Long, RetryStatus> rs : rsmap.entrySet()) {
-//             rsps[i++] = getRetryStatusProperties(rs.getKey(), rs.getValue());
-//         }
-//         return rsps;
-//     }
-
-//     @ManagedOperation(description = "List of UnDelivered Message Numbers")
-//     @ManagedOperationParameters({
-//         @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
-//     })
-//     public Long[] getUnDeliveredMessageIdentifiers(String sid) {
-//         RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
-//         DestinationSequence ds = getDestinationSeq(sid);
-//         if (null == ds) {
-//             throw new IllegalArgumentException("no sequence");
-//         }
-//        
-//         List<Long> numbers = rq.getUndeliveredMessageNumbers(ds);
-//         return numbers.toArray(new Long[numbers.size()]);
-//     }
+    @ManagedOperation(description = "Redelivery Status")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier"),
+        @ManagedOperationParameter(name = "messageNumber", description = "The message number")
+    })
+    public CompositeData getRedeliveryStatus(String sid, long num) throws JMException {
+        DestinationSequence ds = getDestinationSeq(sid);
+        if (null == ds) {
+            throw new IllegalArgumentException("no sequence");
+        }
+        RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+        RetryStatus rs = rq.getRedeliveryStatus(ds, num);
+        return getRetryStatusProperties(num, rs);
+    }
+
+    @ManagedOperation(description = "Redelivery Statuses")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier")
+    })
+    public CompositeData[] getRedeliveryStatuses(String sid) throws JMException {
+        DestinationSequence ds = getDestinationSeq(sid);
+        if (null == ds) {
+            throw new IllegalArgumentException("no sequence");
+        }
+        RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+        Map<Long, RetryStatus> rsmap = rq.getRedeliveryStatuses(ds);
+
+        CompositeData[] rsps = new CompositeData[rsmap.size()];
+        int i = 0;
+        for (Map.Entry<Long, RetryStatus> rs : rsmap.entrySet()) {
+            rsps[i++] = getRetryStatusProperties(rs.getKey(), rs.getValue());
+        }
+        return rsps;
+    }
+
+    @ManagedOperation(description = "List of UnDelivered Message Numbers")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+    })
+    public Long[] getUnDeliveredMessageIdentifiers(String sid) {
+        RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+        DestinationSequence ds = getDestinationSeq(sid);
+        if (null == ds) {
+            throw new IllegalArgumentException("no sequence");
+        }
+        
+        List<Long> numbers = rq.getUndeliveredMessageNumbers(ds);
+        return numbers.toArray(new Long[numbers.size()]);
+    }
 
     @ManagedOperation(description = "List of Source Sequence IDs")
     @ManagedOperationParameters({
@@ -383,31 +381,31 @@ public class ManagedRMEndpoint implements ManagedComponent {
         rq.resume(ss);
     }
     
-//     @ManagedOperation(description = "Suspend Redelivery Queue")
-//     @ManagedOperationParameters({
-//         @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
-//     })
-//     public void suspendDestinationQueue(String sid) throws JMException {
-//         DestinationSequence ds = getDestinationSeq(sid);
-//         if (null == ds) {
-//             throw new IllegalArgumentException("no sequence");
-//         }
-//         RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
-//         rq.suspend(ds);
-//     }
-    
-//     @ManagedOperation(description = "Resume Redelivery Queue")
-//     @ManagedOperationParameters({
-//         @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
-//     })
-//     public void resumeDestinationQueue(String sid) throws JMException {
-//         DestinationSequence ds = getDestinationSeq(sid);
-//         if (null == ds) {
-//             throw new JMException("no source sequence");
-//         }
-//         RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
-//         rq.resume(ds);
-//     }
+    @ManagedOperation(description = "Suspend Redelivery Queue")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+    })
+    public void suspendDestinationQueue(String sid) throws JMException {
+        DestinationSequence ds = getDestinationSeq(sid);
+        if (null == ds) {
+            throw new IllegalArgumentException("no sequence");
+        }
+        RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+        rq.suspend(ds);
+    }
+
+    @ManagedOperation(description = "Resume Redelivery Queue")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+    })
+    public void resumeDestinationQueue(String sid) throws JMException {
+        DestinationSequence ds = getDestinationSeq(sid);
+        if (null == ds) {
+            throw new JMException("no source sequence");
+        }
+        RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+        rq.resume(ds);
+    }
     
     @ManagedOperation(description = "Current Source Sequence Properties")
     public CompositeData getCurrentSourceSequence() throws JMException {
@@ -572,10 +570,13 @@ public class ManagedRMEndpoint implements ManagedComponent {
     public void removeDestinationSequence(String sid) throws JMException {
         DestinationSequence ds = getDestinationSeq(sid);
         if (null == ds) {
-            throw new JMException("no source sequence");
+            throw new JMException("no destination sequence");
         }
-//         RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
-//         rq.suspend(ds);
+        RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+        if (rq.countUndelivered(ds) > 0) {
+            throw new JMException("sequence not empty");
+        }
+        rq.stop(ds);
         ds.getDestination().removeSequence(ds);
     }
     
@@ -591,6 +592,19 @@ public class ManagedRMEndpoint implements ManagedComponent {
         RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue();
         rq.purgeAll(ss);
     }
+
+    @ManagedOperation(description = "Purge UnDelivered Messages")
+    @ManagedOperationParameters({
+        @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") 
+    })
+    public void purgeUnDeliverededMessages(String sid) {
+        DestinationSequence ds = getDestinationSeq(sid);
+        if (null == ds) {
+            throw new IllegalArgumentException("no sequence");
+        }
+        RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue();
+        rq.purgeAll(ds);
+    }
     
     private static String getAddressValue(EndpointReferenceType epr) {
         if (null != epr && null != epr.getAddress()) {
@@ -675,10 +689,10 @@ public class ManagedRMEndpoint implements ManagedComponent {
 //        return endpoint.getManager().countCompleted();
 //    }
 
-//    @ManagedAttribute(description = "Number of Inbound Queued Messages", currencyTimeLimit = 10)
-//    public int getQueuedMessagesInboundCount() {
-//        return endpoint.getManager().getRedeliveryQueue().countUndelivered();
-//    }
+    @ManagedAttribute(description = "Number of Inbound Queued Messages", currencyTimeLimit = 10)
+    public int getQueuedMessagesInboundCount() {
+        return endpoint.getManager().getRedeliveryQueue().countUndelivered();
+    }
 
 //    @ManagedAttribute(description = "Number of Inbound Completed Messages", currencyTimeLimit = 10)
 //    public int getCompletedMessagesInboundCount() {

http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
index 9d48cbc..87ae210 100755
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureInInterceptor.java
@@ -19,51 +19,242 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.logging.Logger;
 
+import javax.xml.soap.SOAPException;
+import javax.xml.soap.SOAPMessage;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.stream.XMLStreamWriter;
+import javax.xml.transform.stream.StreamSource;
+
 import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.StaxInInterceptor;
 import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
 import org.apache.cxf.phase.Phase;
+import org.apache.cxf.staxutils.StaxUtils;
+import org.apache.cxf.staxutils.transform.OutTransformWriter;
+import org.apache.cxf.ws.addressing.AddressingProperties;
 
 /**
  * 
  */
 public class RMCaptureInInterceptor extends AbstractRMInterceptor<Message> {
+
     private static final Logger LOG = LogUtils.getLogger(RMCaptureInInterceptor.class);
-    
+
     public RMCaptureInInterceptor() {
-        super(Phase.PRE_STREAM);
+        super(Phase.POST_STREAM);
+        addAfter(StaxInInterceptor.class.getName());
     }
 
+    @Override
     protected void handle(Message message) throws SequenceFault, RMException {
-        LOG.entering(getClass().getName(), "handleMessage");
-        // This message capturing mechanism will need to be changed at some point.
-        // Until then, we keep this interceptor here and utilize the robust
-        // option to avoid the unnecessary message capturing/caching.
-        if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))) {
-            InputStream is = message.getContent(InputStream.class);
-            if (is != null) {
+       
+        // all messages are initially captured as they cannot be distinguished at this phase
+        // Non application messages temp files are released (cos.releaseTempFileHold()) in RMInInterceptor
+        if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))
+            && (getManager().getStore() != null || (getManager().getDestinationPolicy() != null && getManager()
+                .getDestinationPolicy().getRetryPolicy() != null))) {
+
+            message.getInterceptorChain().add(new RMCaptureInEnd());
+            XMLStreamReader reader = message.getContent(XMLStreamReader.class);
+            
+            if (null != reader) {
                 CachedOutputStream saved = new CachedOutputStream();
+                // REVISIT check factory for READER
                 try {
-                    IOUtils.copy(is, saved);
-
+                    StaxUtils.copy(reader, saved);
                     saved.flush();
-                    is.close();
-                    saved.lockOutputStream();
-
+                    saved.holdTempFile();
+                    reader.close();
+                    LOG.fine("Create new XMLStreamReader");
+                    InputStream is = saved.getInputStream();
+                    // keep References to clean-up tmp files in RMDeliveryInterceptor
+                    setCloseable(message, saved, is);
+                    XMLStreamReader newReader = StaxUtils.createXMLStreamReader(is);
+                    StaxUtils.configureReader(reader, message);
+                    message.setContent(XMLStreamReader.class, newReader);
                     LOG.fine("Capturing the original RM message");
-                    //RewindableInputStream ris = RewindableInputStream.makeRewindable(saved.getInputStream());
-                    message.setContent(InputStream.class, saved.getInputStream());
                     message.put(RMMessageConstants.SAVED_CONTENT, saved);
-                } catch (Exception e) {
+                } catch (XMLStreamException | IOException e) {
                     throw new Fault(e);
                 }
+            } else {
+                org.apache.cxf.common.i18n.Message msg = new org.apache.cxf.common.i18n.Message(
+                                  "No message found for redeliver", LOG, Collections.<String> emptyList());
+                RMException ex = new RMException(msg);
+                throw new Fault(ex);
+            }
+        }
+    }
+
+    private boolean isApplicationMessage(Message message) {
+        final AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);
+        if (null != maps && null != maps.getAction()) {
+            return !RMContextUtils.isRMProtocolMessage(maps.getAction().getValue());
+        }
+        return false;
+    }
+
+    private void setCloseable(Message message, CachedOutputStream cos, InputStream is) {
+        message.put("org.apache.cxf.ws.rm.content.closeable", new Closeable() {
+            @Override
+            public void close() throws IOException {
+                try {
+                    is.close();
+                } catch (IOException e) {
+                    // Ignore
+                }
+                try {
+                    cos.close();
+                } catch (IOException e) {
+                    // Ignore
+                }
+            }
+        });
+    }
+
+    /**
+     * RMCaptureInEnd interceptor is used to switch saved_content, in case WSS is activated.
+     */
+    private class RMCaptureInEnd extends AbstractPhaseInterceptor<Message> {
+        RMCaptureInEnd() {
+            super(Phase.PRE_LOGICAL);
+            addBefore(RMInInterceptor.class.getName());
+        }
+
+        @Override
+        public void handleFault(Message message) {
+            // in case of a SequenceFault SAVED_CONTENT must be released
+            Exception ex = message.getContent(Exception.class);
+            if (ex instanceof SequenceFault) {
+                Closeable closable = (Closeable)message.get("org.apache.cxf.ws.rm.content.closeable");
+                if (null != closable) {
+                    try {
+                        closable.close();
+                    } catch (IOException e) {
+                        // Ignore
+                    }
+                }
+                CachedOutputStream saved = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+                if (saved != null) {
+                    saved.releaseTempFileHold();
+                    try {
+                        saved.close();
+                    } catch (IOException e) {
+                        // ignore
+                    }
+                }
+            }
+        }
+
+        public void handleMessage(Message message) {
+            LOG.entering(getClass().getName(), "handleMessage");
+            // Capturing the soap envelope. In case of WSS was activated, decrypted envelope is captured.
+            if (!MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY))
+                && isApplicationMessage(message)
+                && (getManager().getStore() != null || (getManager().getDestinationPolicy() != null && getManager()
+                    .getDestinationPolicy().getRetryPolicy() != null))) {
+
+                CachedOutputStream saved = new CachedOutputStream();
+                SOAPMessage soapMessage = message.getContent(SOAPMessage.class);
+
+                if (soapMessage != null) {
+                    try {
+                        javax.xml.transform.Source envelope = soapMessage.getSOAPPart().getContent();
+                        StaxUtils.copy(envelope, saved);
+                        saved.flush();
+                        // create a new source part from cos
+                        InputStream is = saved.getInputStream();
+                        // close old saved content
+                        closeOldSavedContent(message);
+                        // keep References to clean-up tmp files in RMDeliveryInterceptor
+                        setCloseable(message, saved, is);
+                        StreamSource source = new StreamSource(is);
+                        soapMessage.getSOAPPart().setContent(source);
+                        // when WSS was activated, saved content still contains soap headers to be removed
+                        message.put(RMMessageConstants.SAVED_CONTENT, removeUnnecessarySoapHeaders(saved));
+                    } catch (SOAPException | IOException | XMLStreamException e) {
+                        throw new Fault(e);
+                    }
+                }
+            }
+        }
+
+        private void closeOldSavedContent(Message message) {
+            CachedOutputStream saved = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+            if (saved != null) {
+                saved.releaseTempFileHold();
+                try {
+                    saved.close();
+                } catch (IOException e) {
+                    // ignore
+                }
+            }
+            Closeable closable = (Closeable)message.get("org.apache.cxf.ws.rm.content.closeable");
+            if (null != closable) {
+                try {
+                    closable.close();
+                } catch (IOException e) {
+                    // Ignore
+                }
+            }
+        }
+
+        private CachedOutputStream removeUnnecessarySoapHeaders(CachedOutputStream saved) {
+            CachedOutputStream newSaved = new CachedOutputStream();
+
+            InputStream is = null;
+            try {
+                is = saved.getInputStream();
+                XMLStreamWriter capture = StaxUtils.createXMLStreamWriter(newSaved,
+                                                                          StandardCharsets.UTF_8.name());
+                Map<String, String> map = new HashMap<String, String>();
+                map.put("{http://schemas.xmlsoap.org/ws/2005/02/rm}Sequence", "");
+                map.put("{http://schemas.xmlsoap.org/ws/2005/02/rm}SequenceAcknowledgement", "");
+                map.put("{http://docs.oasis-open.org/ws-rx/wsrm/200702}Sequence", "");
+                map.put("{http://docs.oasis-open.org/ws-rx/wsrm/200702}SequenceAcknowledgement", "");
+                map.put("{http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd}Security",
+                        "");
+                // attributes to be removed
+                Map<String, String> amap = new HashMap<String, String>();
+                amap.put("{http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd}Id",
+                         "");
+
+                capture = new OutTransformWriter(capture, map, Collections.<String, String> emptyMap(),
+                                                 Collections.<String> emptyList(), amap, false, null);
+                StaxUtils.copy(new StreamSource(is), capture);
+                capture.flush();
+                capture.close();
+                newSaved.flush();
+                // hold temp file, otherwise it will be deleted in case msg was written to RMTxStore
+                // or resend was executed
+                newSaved.holdTempFile();
+                is.close();
+            } catch (IOException | XMLStreamException e) {
+                throw new Fault(e);
+            } finally {
+                if (null != is) {
+                    try {
+                        is.close();
+                    } catch (IOException e) {
+                        // Ignore
+                    }
+                }
             }
+            return newSaved;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
index b3c412d..af682c3 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
@@ -291,6 +291,7 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message>  {
                     }
                     // serializes the message content and the attachments into
                     // the RMMessage content
+                    msg.setCreatedTime(rmps.getCreatedTime());
                     PersistenceUtils.encodeRMContent(msg, message, is);
                     store.persistOutgoing(ss, msg);
                 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
index 262e066..ff34c95 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMDeliveryInterceptor.java
@@ -19,6 +19,8 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.logging.Logger;
 
 import org.apache.cxf.common.logging.LogUtils;
@@ -60,5 +62,15 @@ public class RMDeliveryInterceptor extends AbstractRMInterceptor<Message> {
             dest.acknowledge(message);
         }
         dest.processingComplete(message);
+        
+        // close InputStream of RMCaptureInInterceptor, to delete tmp files in filesystem
+        Closeable closable = (Closeable)message.get("org.apache.cxf.ws.rm.content.closeable");
+        if (null != closable) {
+            try {
+                closable.close();
+            } catch (IOException e) {
+                // Ignore
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
index e393124..03f6d13 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
@@ -830,6 +830,9 @@ public class RMEndpoint {
         for (SourceSequence ss : getSource().getAllSequences()) {
             manager.getRetransmissionQueue().stop(ss);
         }
+        for (DestinationSequence ds : getDestination().getAllSequences()) {
+            manager.getRedeliveryQueue().stop(ds);
+        }
 
         // unregistering of this managed bean from the server is done by the bus itself
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
index 5b516e4..c82412b 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMInInterceptor.java
@@ -24,6 +24,7 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.io.CachedOutputStream;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageUtils;
@@ -32,6 +33,7 @@ import org.apache.cxf.ws.addressing.ContextUtils;
 import org.apache.cxf.ws.addressing.MAPAggregator;
 import org.apache.cxf.ws.rm.v200702.Identifier;
 import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
+import org.apache.cxf.ws.rm.v200702.SequenceType;
 import org.apache.cxf.ws.security.trust.STSUtils;
 
 /**
@@ -49,17 +51,21 @@ public class RMInInterceptor extends AbstractRMInterceptor<Message> {
     @Override
     public void handleFault(Message message) {
         message.put(MAPAggregator.class.getName(), true);
-        if (RMContextUtils.getProtocolVariation(message) != null
-            && MessageUtils.isTrue(message.get(RMMessageConstants.DELIVERING_ROBUST_ONEWAY))) {
-            // revert the delivering entry from the destination sequence
-            try {
-                Destination destination = getManager().getDestination(message);
-                if (destination != null) {
-                    destination.releaseDeliveringStatus(message);
+        if (RMContextUtils.getProtocolVariation(message) != null) {
+            if (MessageUtils.isTrue(message.get(RMMessageConstants.DELIVERING_ROBUST_ONEWAY))) {
+                // revert the delivering entry from the destination sequence
+                try {
+                    Destination destination = getManager().getDestination(message);
+                    if (destination != null) {
+                        destination.releaseDeliveringStatus(message);
+                    }
+                } catch (RMException e) {
+                    LOG.log(Level.WARNING, "Failed to revert the delivering status");
                 }
-            } catch (RMException e) {
-                LOG.log(Level.WARNING, "Failed to revert the delivering status");
-            }
+            } else if (isRedeliveryEnabled(message) && RMContextUtils.isServerSide(message)
+                       && isApplicationMessage(message) && hasValidSequence(message)) {
+                getManager().getRedeliveryQueue().addUndelivered(message);
+            } 
         }
         // make sure the fault is returned for an ws-rm related fault or an invalid ws-rm message
         // note that OneWayProcessingInterceptor handles the robust case, hence not handled here.
@@ -82,6 +88,42 @@ public class RMInInterceptor extends AbstractRMInterceptor<Message> {
                 || message.getContent(Exception.class) instanceof SequenceFault);
     }
 
+    private boolean hasValidSequence(Message message) {
+        final RMProperties rmps = RMContextUtils.retrieveRMProperties(message, false);
+        if (rmps != null) {
+            SequenceType st = rmps.getSequence();
+            if (st != null && st.getIdentifier() != null) {
+                try {
+                    Destination destination = getManager().getDestination(message);
+                    if (destination != null && destination.getSequence(st.getIdentifier()) != null) {
+                        return true;
+                    }
+                } catch (RMException e) {
+                    // fall through
+                }
+            }
+        }
+        return false;
+    }
+    
+    private static boolean isApplicationMessage(Message message) {
+        final AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);
+        if (null != maps && null != maps.getAction()) {
+            return !RMContextUtils.isRMProtocolMessage(maps.getAction().getValue());
+        }
+        return false;
+    }
+
+    private boolean isRedeliveryEnabled(Message message) {
+        // deprecated redelivery mode check
+        if (MessageUtils.isTrue(message.getContextualProperty("org.apache.cxf.ws.rm.destination.redeliver"))) {
+            LOG.warning("Use RetryPolicy to enable the redelivery mode");
+            return true;
+        }
+        return getManager().getDestinationPolicy() != null 
+            && getManager().getDestinationPolicy().getRetryPolicy() != null; 
+    }
+
     protected void handle(Message message) throws SequenceFault, RMException {
         LOG.entering(getClass().getName(), "handleMessage");
         
@@ -138,10 +180,6 @@ public class RMInInterceptor extends AbstractRMInterceptor<Message> {
         }
         RMContextUtils.setProtocolVariation(message, protocol);
         
-        // Destination destination = getManager().getDestination(message);
-        // RMEndpoint rme = getManager().getReliableEndpoint(message);
-        // Servant servant = new Servant(rme);
-        
         boolean isApplicationMessage = !RMContextUtils.isRMProtocolMessage(action);
         LOG.fine("isApplicationMessage: " + isApplicationMessage);
         
@@ -164,6 +202,12 @@ public class RMInInterceptor extends AbstractRMInterceptor<Message> {
                 rme.receivedApplicationMessage();
             }
         } else {
+            // in case message is not an application message, release SAVED_CONTENT
+            // otherwise tmp files will not be closed
+            CachedOutputStream cos = (CachedOutputStream)message.get(RMMessageConstants.SAVED_CONTENT);
+            if (null != cos) {
+                cos.releaseTempFileHold();
+            }           
             rme.receivedControlMessage();
             if (RM10Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION.equals(action)
                 || RM11Constants.SEQUENCE_ACKNOWLEDGMENT_ACTION.equals(action)) {
@@ -215,7 +259,7 @@ public class RMInInterceptor extends AbstractRMInterceptor<Message> {
         final boolean robust =
             MessageUtils.isTrue(message.getContextualProperty(Message.ROBUST_ONEWAY));
         if (robust) {
-            // set this property to change the acknlowledging behavior
+            // set this property to change the acknowledging behavior
             message.put(RMMessageConstants.DELIVERING_ROBUST_ONEWAY, Boolean.TRUE);
         }
         destination.acknowledge(message);

http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
index 9639cfe..6a0839e 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
@@ -53,6 +53,9 @@ import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.phase.PhaseInterceptorChain;
 import org.apache.cxf.service.Service;
+import org.apache.cxf.service.model.BindingInfo;
+import org.apache.cxf.service.model.InterfaceInfo;
+import org.apache.cxf.service.model.ServiceInfo;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.ws.addressing.AddressingProperties;
 import org.apache.cxf.ws.addressing.ContextUtils;
@@ -70,6 +73,7 @@ import org.apache.cxf.ws.rm.persistence.PersistenceUtils;
 import org.apache.cxf.ws.rm.persistence.RMMessage;
 import org.apache.cxf.ws.rm.persistence.RMStore;
 import org.apache.cxf.ws.rm.policy.RMPolicyUtilities;
+import org.apache.cxf.ws.rm.soap.RedeliveryQueueImpl;
 import org.apache.cxf.ws.rm.soap.RetransmissionQueueImpl;
 import org.apache.cxf.ws.rm.soap.SoapFaultFactory;
 import org.apache.cxf.ws.rm.v200702.CloseSequenceType;
@@ -112,6 +116,7 @@ public class RMManager {
     private RMStore store;
     private SequenceIdentifierGenerator idGenerator;
     private RetransmissionQueue retransmissionQueue;
+    private RedeliveryQueue redeliveryQueue;
     private Map<Endpoint, RMEndpoint> reliableEndpoints = new ConcurrentHashMap<Endpoint, RMEndpoint>();
     private AtomicReference<Timer> timer = new AtomicReference<Timer>();
     private RMConfiguration configuration;
@@ -185,6 +190,14 @@ public class RMManager {
         retransmissionQueue = rq;
     }
 
+    public RedeliveryQueue getRedeliveryQueue() {
+        return redeliveryQueue;
+    }
+
+    public void setRedeliveryQueue(RedeliveryQueue redeliveryQueue) {
+        this.redeliveryQueue = redeliveryQueue;
+    }
+
     public SequenceIdentifierGenerator getIdGenerator() {
         return idGenerator;
     }
@@ -556,10 +569,10 @@ public class RMManager {
         }
         
         for (DestinationSequence ds : dss) {
-            reconverDestinationSequence(endpoint, conduit, rme.getDestination(), ds);
+            recoverDestinationSequence(endpoint, conduit, rme.getDestination(), ds);
         }
         retransmissionQueue.start();
-        
+        redeliveryQueue.start();
     }
     
     private void recoverSourceSequence(Endpoint endpoint, Conduit conduit, Source s, 
@@ -570,7 +583,7 @@ public class RMManager {
             return;
         }
         LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
-            
+        // only recover the sequence if there are pending messages
         s.addSequence(ss, false);
         // choosing an arbitrary valid source sequence as the current source sequence
         if (s.getAssociatedSequence(null) == null && !ss.isExpired() && !ss.isLastMessage()) {
@@ -596,6 +609,7 @@ public class RMManager {
             st.setMessageNumber(m.getMessageNumber());
             RMProperties rmps = new RMProperties();
             rmps.setSequence(st);
+            rmps.setCreatedTime(m.getCreatedTime());
             rmps.exposeAs(ss.getProtocol().getWSRMNamespace());
             if (ss.isLastMessage() && ss.getCurrentMessageNr() == m.getMessageNumber()) {
                 CloseSequenceType close = new CloseSequenceType();
@@ -623,10 +637,59 @@ public class RMManager {
         }            
     }
 
-    private void reconverDestinationSequence(Endpoint endpoint, Conduit conduit, Destination d, 
+    private void recoverDestinationSequence(Endpoint endpoint, Conduit conduit, Destination d, 
                                              DestinationSequence ds) {
+        // always recover the sequence 
         d.addSequence(ds, false);
-        //TODO add the redelivery code
+        
+        Collection<RMMessage> ms = store.getMessages(ds.getIdentifier(), false);
+        if (null == ms || 0 == ms.size()) {
+            return;
+        }
+        LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
+
+        for (RMMessage m : ms) {                
+            Message message = new MessageImpl();
+            Exchange exchange = new ExchangeImpl();
+            message.setExchange(exchange);
+            if (null != conduit) {
+                exchange.setConduit(conduit);
+            }
+            exchange.put(Endpoint.class, endpoint);
+            exchange.put(Service.class, endpoint.getService());
+            if (endpoint.getEndpointInfo().getService() != null) {
+                exchange.put(ServiceInfo.class, endpoint.getEndpointInfo().getService());
+                exchange.put(InterfaceInfo.class, endpoint.getEndpointInfo().getService().getInterface());
+            }
+            exchange.put(Binding.class, endpoint.getBinding());
+            exchange.put(BindingInfo.class, endpoint.getEndpointInfo().getBinding());
+            exchange.put(Bus.class, bus);
+            
+            SequenceType st = new SequenceType();
+            st.setIdentifier(ds.getIdentifier());
+            st.setMessageNumber(m.getMessageNumber());
+            RMProperties rmps = new RMProperties();
+            rmps.setSequence(st);
+            rmps.setCreatedTime(m.getCreatedTime());
+            RMContextUtils.storeRMProperties(message, rmps, false);                
+            try {
+                // RMMessage is stored in a serialized way, therefore
+                // RMMessage content must be splitted into soap root message
+                // and attachments
+                PersistenceUtils.decodeRMContent(m, message);
+                redeliveryQueue.addUndelivered(message);
+                // add  acknowledged undelivered message
+                ds.addDeliveringMessageNumber(m.getMessageNumber());
+            } catch (IOException e) {
+                LOG.log(Level.SEVERE, "Error reading persisted message data", e);
+            }
+        }
+
+        // if no messages are recovered and the sequence has been already terminated, remove the sequence
+        if (ds.isTerminated() && ds.allAcknowledgedMessagesDelivered()) {
+            d.removeSequence(ds);
+            store.removeDestinationSequence(ds.getIdentifier());
+        }
     }
 
     RMEndpoint createReliableEndpoint(final Endpoint endpoint) {
@@ -664,6 +727,9 @@ public class RMManager {
         if (null == retransmissionQueue) {
             retransmissionQueue = new RetransmissionQueueImpl(this);
         }
+        if (null == redeliveryQueue) {
+            redeliveryQueue = new RedeliveryQueueImpl(this);
+        }
         if (null == idGenerator) {
             idGenerator = new DefaultSequenceIdentifierGenerator();
         }

http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java
index 448bfc6..f137803 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMProperties.java
@@ -37,6 +37,7 @@ public class RMProperties {
     private CloseSequenceType closeSequence;
     private String namespaceURI;
     private boolean lastMessage;
+    private long createdTime = System.currentTimeMillis();
     
     public Collection<SequenceAcknowledgement> getAcks() {
         return acks;
@@ -120,4 +121,21 @@ public class RMProperties {
     public void exposeAs(String uri) {
         namespaceURI = uri;
     }
+
+    /**
+     * Get the initial creation time of this RM properties instance.
+     * @return Returns the createdTime.
+     */
+    public long getCreatedTime() {
+        return createdTime;
+    }
+
+    /**
+     * Set the initial creation time of this RM properties instance.
+     * 
+     * @param createdTime The createdTime to set.
+     */
+    public void setCreatedTime(long createdTime) {
+        this.createdTime = createdTime;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RedeliveryQueue.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RedeliveryQueue.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RedeliveryQueue.java
new file mode 100644
index 0000000..4737859
--- /dev/null
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RedeliveryQueue.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cxf.message.Message;
+
+public interface RedeliveryQueue {
+    
+    
+    String DEFAULT_BASE_REDELIVERY_INTERVAL = "3000";
+    int DEFAULT_EXPONENTIAL_BACKOFF = 2;
+    
+    /**
+     * @param seq the sequence under consideration
+     * @return the number of unacknowledged messages for that sequence
+     */
+    int countUndelivered(DestinationSequence seq);
+    
+    /**
+     * @return the total number of undelivered messages in this queue
+     */
+    int countUndelivered();
+
+    /**
+     * @return true if there are no unacknowledged messages in the queue
+     */
+    boolean isEmpty();
+    
+    /**
+     * Accepts a failed message for possible future redelivery.
+     * @param message the message context.
+     */
+    void addUndelivered(Message message);
+    
+    /**
+     * Purge all candiates for the given sequence.
+     * 
+     * @param seq the sequence object
+     */
+    void purgeAll(DestinationSequence seq);
+
+    /**
+     * 
+     * @param seq
+     * @return
+     */
+    List<Long> getUndeliveredMessageNumbers(DestinationSequence seq);
+
+    /**
+     * Returns the retransmission status for the specified message.
+     * @param seq
+     * @param num
+     * @return
+     */
+    RetryStatus getRedeliveryStatus(DestinationSequence seq, long num);
+    
+    /**
+     * Return the retransmission status of all the messages assigned to the sequence.
+     * @param seq
+     * @return
+     */
+    Map<Long, RetryStatus> getRedeliveryStatuses(DestinationSequence seq);
+        
+    /**
+     * Initiate resends.
+     */
+    void start();
+    
+    /**
+     * Stops redelivery queue.
+     * @param seq
+     */
+    void stop(DestinationSequence seq);
+    
+    /**
+     * Suspends the redelivery attempts for the specified sequence
+     * @param seq
+     */
+    void suspend(DestinationSequence seq);
+
+    /**
+     * Resumes the redelivery attempts for the specified sequence
+     * @param seq
+     */
+    void resume(DestinationSequence seq);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
index 7cfeade..80a64bc 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
@@ -245,7 +245,7 @@ public class Servant implements Invoker {
             return null;
         } 
 
-        destination.removeSequence(terminatedSeq);
+        destination.terminateSequence(terminatedSeq);
         
         // the following may be necessary if the last message for this sequence was a oneway
         // request and hence there was no response to which a last message could have been added

http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
index 08cbf2a..90db17b 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/feature/RMFeature.java
@@ -123,9 +123,7 @@ public class RMFeature extends AbstractFeature {
         provider.getInInterceptors().add(rmLogicalIn);
         provider.getInInterceptors().add(rmInCodec);
         provider.getInInterceptors().add(rmDelivery);
-        if (null != store) {
-            provider.getInInterceptors().add(rmCaptureIn);
-        }
+        provider.getInInterceptors().add(rmCaptureIn);
 
         provider.getOutInterceptors().add(rmLogicalOut);
         provider.getOutInterceptors().add(rmOutCodec);

http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
index 348117c..ad61b04 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/RMMessage.java
@@ -32,6 +32,7 @@ public class RMMessage {
     private String contentType;
     private long messageNumber;
     private String to;
+    private long createdTime;
     
     /**
      * Returns the message number of the message within its sequence.
@@ -120,4 +121,12 @@ public class RMMessage {
         this.contentType = contentType;
     }
 
+    public long getCreatedTime() {
+        return createdTime;
+    }
+
+    public void setCreatedTime(long createdTime) {
+        this.createdTime = createdTime;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
index 641df46..e3cfa0b 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
@@ -75,6 +75,7 @@ public class RMTxStore implements RMStore {
            {"LAST_MSG_NO", "DECIMAL(19, 0)"},
            {"ENDPOINT_ID", "VARCHAR(1024)"},
            {"ACKNOWLEDGED", "BLOB"},
+           {"TERMINATED", "CHAR(1)"},
            {"PROTOCOL_VERSION", "VARCHAR(256)"}};
     private static final String[] DEST_SEQUENCES_TABLE_KEYS = {"SEQ_ID"};
     private static final String[][] SRC_SEQUENCES_TABLE_COLS
@@ -90,6 +91,7 @@ public class RMTxStore implements RMStore {
         = {{"SEQ_ID", "VARCHAR(256) NOT NULL"},
            {"MSG_NO", "DECIMAL(19, 0) NOT NULL"},
            {"SEND_TO", "VARCHAR(256)"},
+           {"CREATED_TIME", "DECIMAL(19, 0)"},
            {"CONTENT", "BLOB"},
            {"CONTENT_TYPE", "VARCHAR(1024)"}};
     private static final String[] MESSAGES_TABLE_KEYS = {"SEQ_ID", "MSG_NO"};
@@ -119,27 +121,27 @@ public class RMTxStore implements RMStore {
     private static final String DELETE_SRC_SEQUENCE_STMT_STR =
         "DELETE FROM CXF_RM_SRC_SEQUENCES WHERE SEQ_ID = ?";
     private static final String UPDATE_DEST_SEQUENCE_STMT_STR =
-        "UPDATE CXF_RM_DEST_SEQUENCES SET LAST_MSG_NO = ?, ACKNOWLEDGED = ? WHERE SEQ_ID = ?";
+        "UPDATE CXF_RM_DEST_SEQUENCES SET LAST_MSG_NO = ?, TERMINATED = ?, ACKNOWLEDGED = ? WHERE SEQ_ID = ?";
     private static final String UPDATE_SRC_SEQUENCE_STMT_STR =
         "UPDATE CXF_RM_SRC_SEQUENCES SET CUR_MSG_NO = ?, LAST_MSG = ? WHERE SEQ_ID = ?";
     private static final String CREATE_MESSAGE_STMT_STR 
-        = "INSERT INTO {0} (SEQ_ID, MSG_NO, SEND_TO, CONTENT, CONTENT_TYPE) VALUES(?, ?, ?, ?, ?)";
+        = "INSERT INTO {0} (SEQ_ID, MSG_NO, SEND_TO, CREATED_TIME, CONTENT, CONTENT_TYPE) VALUES(?, ?, ?, ?, ?, ?)";
     private static final String DELETE_MESSAGE_STMT_STR =
         "DELETE FROM {0} WHERE SEQ_ID = ? AND MSG_NO = ?";
     private static final String SELECT_DEST_SEQUENCE_STMT_STR =
-        "SELECT ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+        "SELECT ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, TERMINATED, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
         + "WHERE SEQ_ID = ?";
     private static final String SELECT_SRC_SEQUENCE_STMT_STR =
         "SELECT CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID, PROTOCOL_VERSION FROM CXF_RM_SRC_SEQUENCES "
         + "WHERE SEQ_ID = ?";
     private static final String SELECT_DEST_SEQUENCES_STMT_STR =
-        "SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
+        "SELECT SEQ_ID, ACKS_TO, LAST_MSG_NO, PROTOCOL_VERSION, TERMINATED, ACKNOWLEDGED FROM CXF_RM_DEST_SEQUENCES "
         + "WHERE ENDPOINT_ID = ?";
     private static final String SELECT_SRC_SEQUENCES_STMT_STR =
         "SELECT SEQ_ID, CUR_MSG_NO, LAST_MSG, EXPIRY, OFFERING_SEQ_ID, PROTOCOL_VERSION "
         + "FROM CXF_RM_SRC_SEQUENCES WHERE ENDPOINT_ID = ?";
     private static final String SELECT_MESSAGES_STMT_STR =
-        "SELECT MSG_NO, SEND_TO, CONTENT, CONTENT_TYPE FROM {0} WHERE SEQ_ID = ?";
+        "SELECT MSG_NO, SEND_TO, CREATED_TIME, CONTENT, CONTENT_TYPE FROM {0} WHERE SEQ_ID = ?";
     private static final String ALTER_TABLE_STMT_STR =
         "ALTER TABLE {0} ADD {1} {2}";
     private static final String CREATE_INBOUND_MESSAGE_STMT_STR = 
@@ -395,13 +397,14 @@ public class RMTxStore implements RMStore {
                 EndpointReferenceType acksTo = RMUtils.createReference(res.getString(1));  
                 long lm = res.getLong(2);
                 ProtocolVariation pv = decodeProtocolVersion(res.getString(3));
-                InputStream is = res.getBinaryStream(4);
+                boolean t = res.getBoolean(4);
+                InputStream is = res.getBinaryStream(5);
                 SequenceAcknowledgement ack = null;
                 if (null != is) {
                     ack = PersistenceUtils.getInstance()
                         .deserialiseAcknowledgment(is); 
                 }
-                return new DestinationSequence(sid, acksTo, lm, ack, pv);
+                return new DestinationSequence(sid, acksTo, lm, t, ack, pv);
             }
         } catch (SQLException ex) {
             conex = ex;
@@ -521,13 +524,14 @@ public class RMTxStore implements RMStore {
                 EndpointReferenceType acksTo = RMUtils.createReference(res.getString(2));  
                 long lm = res.getLong(3);
                 ProtocolVariation pv = decodeProtocolVersion(res.getString(4));
-                InputStream is = res.getBinaryStream(5);
+                boolean t = res.getBoolean(5);
+                InputStream is = res.getBinaryStream(6);
                 SequenceAcknowledgement ack = null;
                 if (null != is) {
                     ack = PersistenceUtils.getInstance()
                         .deserialiseAcknowledgment(is); 
                 }
-                DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, ack, pv);
+                DestinationSequence seq = new DestinationSequence(sid, acksTo, lm, t, ack, pv);
                 seqs.add(seq);                                                 
             }
         } catch (SQLException ex) {
@@ -596,11 +600,13 @@ public class RMTxStore implements RMStore {
             while (res.next()) {
                 long mn = res.getLong(1);
                 String to = res.getString(2);
-                Blob blob = res.getBlob(3);
-                String contentType = res.getString(4);
+                long ct = res.getLong(3);
+                Blob blob = res.getBlob(4);
+                String contentType = res.getString(5);
                 RMMessage msg = new RMMessage();
                 msg.setMessageNumber(mn);
                 msg.setTo(to);
+                msg.setCreatedTime(ct);
                 CachedOutputStream cos = new CachedOutputStream();
                 IOUtils.copyAndCloseInput(blob.getBinaryStream(), cos);
                 cos.flush();
@@ -752,8 +758,9 @@ public class RMTxStore implements RMStore {
             stmt.setString(1, id);
             stmt.setLong(2, nr);
             stmt.setString(3, to);
-            stmt.setBinaryStream(4, msgin);
-            stmt.setString(5, contentType);
+            stmt.setLong(4, msg.getCreatedTime());
+            stmt.setBinaryStream(5, msgin);
+            stmt.setString(6, contentType);
             stmt.execute();
             if (LOG.isLoggable(Level.FINE)) {
                 LOG.log(Level.FINE, "Successfully stored {0} message number {1} for sequence {2}",
@@ -805,10 +812,11 @@ public class RMTxStore implements RMStore {
             stmt = getStatement(con, UPDATE_DEST_SEQUENCE_STMT_STR);
 
             long lastMessageNr = seq.getLastMessageNumber();
-            stmt.setLong(1, lastMessageNr); 
+            stmt.setLong(1, lastMessageNr);
+            stmt.setString(2, seq.isTerminated() ? "1" : "0");
             InputStream is = PersistenceUtils.getInstance().serialiseAcknowledgment(seq.getAcknowledgment());
-            stmt.setBinaryStream(2, is, is.available()); 
-            stmt.setString(3, seq.getIdentifier().getValue());
+            stmt.setBinaryStream(3, is, is.available()); 
+            stmt.setString(4, seq.getIdentifier().getValue());
             stmt.execute();
         } finally {
             releaseResources(stmt, null);
@@ -884,12 +892,11 @@ public class RMTxStore implements RMStore {
     }
     
     protected void verifyTable(Connection con, String tableName, String[][] tableCols) {
-        List<String[]> newCols = new ArrayList<String[]>();
-        ResultSet rs = null;
         try {
             DatabaseMetaData metadata = con.getMetaData();
-            rs = metadata.getColumns(null, null, tableName, "%");
+            ResultSet rs = metadata.getColumns(null, null, tableName, "%");
             Set<String> dbCols = new HashSet<String>();
+            List<String[]> newCols = new ArrayList<String[]>();
             while (rs.next()) {
                 dbCols.add(rs.getString(4));
             }
@@ -898,26 +905,12 @@ public class RMTxStore implements RMStore {
                     newCols.add(col);
                 }
             }
-        } catch (SQLException ex) {
-            if (LOG.isLoggable(Level.FINE)) {
-                LOG.fine("Table " + tableName + " cannot be verified.");
-            }
-        } finally {
-            if (rs != null) {
-                try {
-                    rs.close();
-                } catch (SQLException e) {
-                    // ignore
+            if (newCols.size() > 0) {
+                // need to add the new columns
+                if (LOG.isLoggable(Level.FINE)) {
+                    LOG.log(Level.FINE, "Table " + tableName + " needs additional columns");
                 }
-            }
-        }
-        if (newCols.size() > 0) {
-            // need to add the new columns
-            if (LOG.isLoggable(Level.FINE)) {
-                LOG.log(Level.FINE, "Table " + tableName + " needs additional columns");
-            }
-                
-            try { 
+
                 for (String[] newCol : newCols) {
                     Statement st = con.createStatement();
                     try {
@@ -931,9 +924,9 @@ public class RMTxStore implements RMStore {
                         st.close();
                     }
                 }
-            } catch (SQLException ex) {
-                LOG.log(Level.WARNING, "Table " + tableName + " cannot be altered.", ex);
             }
+        } catch (SQLException ex) {
+            LOG.log(Level.WARNING, "Table " + tableName + " cannot be altered.", ex);
         }
     }
 
@@ -965,7 +958,6 @@ public class RMTxStore implements RMStore {
         for (int i = 0; i < SET_SCHEMA_STMT_STRS.length; i++) {
             try {
                 stmt.executeUpdate(MessageFormat.format(SET_SCHEMA_STMT_STRS[i], schemaName));
-                ex0 = null;
                 break;
             } catch (SQLException ex) {
                 ex.setNextException(ex0);

http://git-wip-us.apache.org/repos/asf/cxf/blob/0dd29509/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java
index 8e1b25e..9215031 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RMSoapInInterceptor.java
@@ -139,7 +139,10 @@ public class RMSoapInInterceptor extends AbstractSoapInterceptor {
      * @return the RM properties
      */
     public RMProperties unmarshalRMProperties(SoapMessage message) { 
-        RMProperties rmps = new RMProperties();
+        RMProperties rmps = (RMProperties)message.get(RMContextUtils.getRMPropertiesKey(false));
+        if (rmps == null) {
+            rmps = new RMProperties();
+        }
         List<Header> headers = message.getHeaders();
         if (headers != null) {
             decodeHeaders(message, headers, rmps);