You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2015/11/05 11:57:08 UTC

cxf git commit: [CXF-6667] Closing a source sequence in WS-RM may lead to inconsistent sequence status

Repository: cxf
Updated Branches:
  refs/heads/master b97a0fed6 -> 3aede31ec


[CXF-6667] Closing a source sequence in WS-RM may lead to inconsistent sequence status


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/3aede31e
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/3aede31e
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/3aede31e

Branch: refs/heads/master
Commit: 3aede31ec2755468b6310591b4e3c467fb2b9ed2
Parents: b97a0fe
Author: Akitoshi Yoshida <ay...@apache.org>
Authored: Wed Nov 4 15:32:57 2015 +0100
Committer: Akitoshi Yoshida <ay...@apache.org>
Committed: Thu Nov 5 11:55:08 2015 +0100

----------------------------------------------------------------------
 .../cxf/ws/rm/RMCaptureOutInterceptor.java      |   6 +
 .../java/org/apache/cxf/ws/rm/RMEndpoint.java   |   1 -
 .../apache/cxf/systest/ws/rm/SequenceTest.java  | 125 +++++++++++++++++++
 3 files changed, 131 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/3aede31e/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
index 38e5f7c..298ddcd 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
@@ -199,6 +199,12 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message>  {
             getManager().initializeInterceptorChain(msg);
             //doneCaptureMessage(msg);
             captureMessage(msg);
+        } else if (isLastMessage) {
+            // got either the rm11 CS or the rm10 empty LM
+            RMStore store = getManager().getStore();
+            if (null != store) {
+                store.persistOutgoing(rmpsOut.getSourceSequence(), null);
+            }
         }
     }
     private void captureMessage(Message message) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/3aede31e/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
index 3146bac..e393124 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
@@ -816,7 +816,6 @@ public class RMEndpoint {
                         // REVISIT: this may be non-standard
                         // getProxy().ackRequested(seq);
                     } else {
-
                         getProxy().lastMessage(seq);
                     }
                 } catch (RMException ex) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/3aede31e/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
index f4124cf..346a0f6 100644
--- a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
@@ -23,9 +23,14 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.logging.Logger;
@@ -77,10 +82,15 @@ import org.apache.cxf.testutil.recorders.OutMessageRecorder;
 import org.apache.cxf.transport.http.HTTPConduit;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.ws.addressing.VersionTransformer.Names200408;
+import org.apache.cxf.ws.rm.DestinationSequence;
 import org.apache.cxf.ws.rm.RM10Constants;
 import org.apache.cxf.ws.rm.RMContextUtils;
 import org.apache.cxf.ws.rm.RMManager;
 import org.apache.cxf.ws.rm.RMProperties;
+import org.apache.cxf.ws.rm.SourceSequence;
+import org.apache.cxf.ws.rm.persistence.RMMessage;
+import org.apache.cxf.ws.rm.persistence.RMStore;
+import org.apache.cxf.ws.rm.v200702.Identifier;
 
 import org.junit.After;
 import org.junit.BeforeClass;
@@ -1392,6 +1402,11 @@ public class SequenceTest extends AbstractBusClientServerTestBase {
     public void testTerminateOnShutdown() throws Exception {
         init("org/apache/cxf/systest/ws/rm/terminate-on-shutdown.xml", true);
 
+        RMManager manager = greeterBus.getExtension(RMManager.class);
+        // this test also verify the DB is correctly being updated during the shutdown
+        RMMemoryStore store = new RMMemoryStore();
+        manager.setStore(store);
+
         greeter.greetMeOneWay("neutrophil");
         greeter.greetMeOneWay("basophil");
         greeter.greetMeOneWay("eosinophil");
@@ -1422,6 +1437,10 @@ public class SequenceTest extends AbstractBusClientServerTestBase {
         mf.verifyActions(expectedActions, false);
         mf.verifyAcknowledgements(new boolean[] {false, true}, false);
         
+        // additional check to verify the operations performed on DB
+        assertEquals("sequences not released from DB", 0, store.ssmap.size());
+        assertEquals("messages not released from DB", 0, store.ommap.size());
+        assertEquals("sequence not closed in DB", 1, store.ssclosed.size());
     }    
 
     @Test
@@ -1693,4 +1712,110 @@ public class SequenceTest extends AbstractBusClientServerTestBase {
         }
         return null;
     }
+
+    private static class RMMemoryStore implements RMStore {
+        // during this particular test, the operations are expected to be invoked sequentially so use just HashMap
+        Map<Identifier, SourceSequence> ssmap = new HashMap<Identifier, SourceSequence>();
+        Map<Identifier, DestinationSequence> dsmap = new HashMap<Identifier, DestinationSequence>();
+        Map<Identifier, Collection<RMMessage>> ommap = new HashMap<Identifier, Collection<RMMessage>>();
+        Map<Identifier, Collection<RMMessage>> immap = new HashMap<Identifier, Collection<RMMessage>>();
+        Set<Identifier> ssclosed = new HashSet<Identifier>();
+
+        @Override
+        public void createSourceSequence(SourceSequence seq) {
+            ssmap.put(seq.getIdentifier(), seq);
+        }
+
+        @Override
+        public void createDestinationSequence(DestinationSequence seq) {
+            dsmap.put(seq.getIdentifier(), seq);
+        }
+
+        @Override
+        public SourceSequence getSourceSequence(Identifier seq) {
+            return ssmap.get(seq);
+        }
+
+        @Override
+        public DestinationSequence getDestinationSequence(Identifier seq) {
+            return dsmap.get(seq);
+        }
+
+        @Override
+        public void removeSourceSequence(Identifier seq) {
+            ssmap.remove(seq);
+        }
+
+        @Override
+        public void removeDestinationSequence(Identifier seq) {
+            dsmap.remove(seq);
+        }
+
+        @Override
+        public Collection<SourceSequence> getSourceSequences(String endpointIdentifier) {
+            return ssmap.values();
+        }
+
+        @Override
+        public Collection<DestinationSequence> getDestinationSequences(String endpointIdentifier) {
+            return dsmap.values();
+        }
+
+        @Override
+        public Collection<RMMessage> getMessages(Identifier sid, boolean outbound) {
+            return outbound ? ommap.get(sid) : immap.get(sid);
+        }
+
+        @Override
+        public void persistOutgoing(SourceSequence seq, RMMessage msg) {
+            Collection<RMMessage> cm = getMessages(seq.getIdentifier(), ommap);
+            if (msg != null) {
+                //  update the sequence status and add the message
+                cm.add(msg);
+            } else {
+                // update only the sequence status
+                if (seq.isLastMessage()) {
+                    ssclosed.add(seq.getIdentifier());
+                }
+            }
+        }
+
+        @Override
+        public void persistIncoming(DestinationSequence seq, RMMessage msg) {
+            Collection<RMMessage> cm = getMessages(seq.getIdentifier(), immap);
+            if (msg != null) {
+                //  update the sequence status and add the message
+                cm.add(msg);
+            } else {
+                // update only the sequence status
+            }
+        }
+
+        @Override
+        public void removeMessages(Identifier sid, Collection<Long> messageNrs, boolean outbound) {
+            removeMessages(sid, messageNrs, outbound ? ommap : immap);
+        }
+
+        private Collection<RMMessage> getMessages(Identifier seq, Map<Identifier, Collection<RMMessage>> map) {
+            Collection<RMMessage> cm = map.get(seq);
+            if (cm == null) {
+                cm = new LinkedList<RMMessage>();
+                map.put(seq, cm);
+            }
+            return cm;
+        }
+
+        private void removeMessages(Identifier sid, Collection<Long> messageNrs,
+                                    Map<Identifier, Collection<RMMessage>> map) {
+            for (Iterator<RMMessage> it = map.get(sid).iterator(); it.hasNext();) {
+                RMMessage m = it.next();
+                if (messageNrs.contains(m.getMessageNumber())) {
+                    it.remove();
+                }
+            }
+            if (map.get(sid).size() == 0) {
+                map.remove(sid);
+            }
+        }
+    }
 }