You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2015/11/05 11:57:08 UTC
cxf git commit: [CXF-6667] Closing a source sequence in WS-RM may
lead to inconsistent sequence status
Repository: cxf
Updated Branches:
refs/heads/master b97a0fed6 -> 3aede31ec
[CXF-6667] Closing a source sequence in WS-RM may lead to inconsistent sequence status
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/3aede31e
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/3aede31e
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/3aede31e
Branch: refs/heads/master
Commit: 3aede31ec2755468b6310591b4e3c467fb2b9ed2
Parents: b97a0fe
Author: Akitoshi Yoshida <ay...@apache.org>
Authored: Wed Nov 4 15:32:57 2015 +0100
Committer: Akitoshi Yoshida <ay...@apache.org>
Committed: Thu Nov 5 11:55:08 2015 +0100
----------------------------------------------------------------------
.../cxf/ws/rm/RMCaptureOutInterceptor.java | 6 +
.../java/org/apache/cxf/ws/rm/RMEndpoint.java | 1 -
.../apache/cxf/systest/ws/rm/SequenceTest.java | 125 +++++++++++++++++++
3 files changed, 131 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/3aede31e/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
index 38e5f7c..298ddcd 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMCaptureOutInterceptor.java
@@ -199,6 +199,12 @@ public class RMCaptureOutInterceptor extends AbstractRMInterceptor<Message> {
getManager().initializeInterceptorChain(msg);
//doneCaptureMessage(msg);
captureMessage(msg);
+ } else if (isLastMessage) {
+ // got either the rm11 CS or the rm10 empty LM
+ RMStore store = getManager().getStore();
+ if (null != store) {
+ store.persistOutgoing(rmpsOut.getSourceSequence(), null);
+ }
}
}
private void captureMessage(Message message) {
http://git-wip-us.apache.org/repos/asf/cxf/blob/3aede31e/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
----------------------------------------------------------------------
diff --git a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
index 3146bac..e393124 100644
--- a/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
+++ b/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMEndpoint.java
@@ -816,7 +816,6 @@ public class RMEndpoint {
// REVISIT: this may be non-standard
// getProxy().ackRequested(seq);
} else {
-
getProxy().lastMessage(seq);
}
} catch (RMException ex) {
http://git-wip-us.apache.org/repos/asf/cxf/blob/3aede31e/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
----------------------------------------------------------------------
diff --git a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
index f4124cf..346a0f6 100644
--- a/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
+++ b/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
@@ -23,9 +23,14 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.logging.Logger;
@@ -77,10 +82,15 @@ import org.apache.cxf.testutil.recorders.OutMessageRecorder;
import org.apache.cxf.transport.http.HTTPConduit;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
import org.apache.cxf.ws.addressing.VersionTransformer.Names200408;
+import org.apache.cxf.ws.rm.DestinationSequence;
import org.apache.cxf.ws.rm.RM10Constants;
import org.apache.cxf.ws.rm.RMContextUtils;
import org.apache.cxf.ws.rm.RMManager;
import org.apache.cxf.ws.rm.RMProperties;
+import org.apache.cxf.ws.rm.SourceSequence;
+import org.apache.cxf.ws.rm.persistence.RMMessage;
+import org.apache.cxf.ws.rm.persistence.RMStore;
+import org.apache.cxf.ws.rm.v200702.Identifier;
import org.junit.After;
import org.junit.BeforeClass;
@@ -1392,6 +1402,11 @@ public class SequenceTest extends AbstractBusClientServerTestBase {
public void testTerminateOnShutdown() throws Exception {
init("org/apache/cxf/systest/ws/rm/terminate-on-shutdown.xml", true);
+ RMManager manager = greeterBus.getExtension(RMManager.class);
+ // this test also verify the DB is correctly being updated during the shutdown
+ RMMemoryStore store = new RMMemoryStore();
+ manager.setStore(store);
+
greeter.greetMeOneWay("neutrophil");
greeter.greetMeOneWay("basophil");
greeter.greetMeOneWay("eosinophil");
@@ -1422,6 +1437,10 @@ public class SequenceTest extends AbstractBusClientServerTestBase {
mf.verifyActions(expectedActions, false);
mf.verifyAcknowledgements(new boolean[] {false, true}, false);
+ // additional check to verify the operations performed on DB
+ assertEquals("sequences not released from DB", 0, store.ssmap.size());
+ assertEquals("messages not released from DB", 0, store.ommap.size());
+ assertEquals("sequence not closed in DB", 1, store.ssclosed.size());
}
@Test
@@ -1693,4 +1712,110 @@ public class SequenceTest extends AbstractBusClientServerTestBase {
}
return null;
}
+
+ private static class RMMemoryStore implements RMStore {
+ // during this particular test, the operations are expected to be invoked sequentially so use just HashMap
+ Map<Identifier, SourceSequence> ssmap = new HashMap<Identifier, SourceSequence>();
+ Map<Identifier, DestinationSequence> dsmap = new HashMap<Identifier, DestinationSequence>();
+ Map<Identifier, Collection<RMMessage>> ommap = new HashMap<Identifier, Collection<RMMessage>>();
+ Map<Identifier, Collection<RMMessage>> immap = new HashMap<Identifier, Collection<RMMessage>>();
+ Set<Identifier> ssclosed = new HashSet<Identifier>();
+
+ @Override
+ public void createSourceSequence(SourceSequence seq) {
+ ssmap.put(seq.getIdentifier(), seq);
+ }
+
+ @Override
+ public void createDestinationSequence(DestinationSequence seq) {
+ dsmap.put(seq.getIdentifier(), seq);
+ }
+
+ @Override
+ public SourceSequence getSourceSequence(Identifier seq) {
+ return ssmap.get(seq);
+ }
+
+ @Override
+ public DestinationSequence getDestinationSequence(Identifier seq) {
+ return dsmap.get(seq);
+ }
+
+ @Override
+ public void removeSourceSequence(Identifier seq) {
+ ssmap.remove(seq);
+ }
+
+ @Override
+ public void removeDestinationSequence(Identifier seq) {
+ dsmap.remove(seq);
+ }
+
+ @Override
+ public Collection<SourceSequence> getSourceSequences(String endpointIdentifier) {
+ return ssmap.values();
+ }
+
+ @Override
+ public Collection<DestinationSequence> getDestinationSequences(String endpointIdentifier) {
+ return dsmap.values();
+ }
+
+ @Override
+ public Collection<RMMessage> getMessages(Identifier sid, boolean outbound) {
+ return outbound ? ommap.get(sid) : immap.get(sid);
+ }
+
+ @Override
+ public void persistOutgoing(SourceSequence seq, RMMessage msg) {
+ Collection<RMMessage> cm = getMessages(seq.getIdentifier(), ommap);
+ if (msg != null) {
+ // update the sequence status and add the message
+ cm.add(msg);
+ } else {
+ // update only the sequence status
+ if (seq.isLastMessage()) {
+ ssclosed.add(seq.getIdentifier());
+ }
+ }
+ }
+
+ @Override
+ public void persistIncoming(DestinationSequence seq, RMMessage msg) {
+ Collection<RMMessage> cm = getMessages(seq.getIdentifier(), immap);
+ if (msg != null) {
+ // update the sequence status and add the message
+ cm.add(msg);
+ } else {
+ // update only the sequence status
+ }
+ }
+
+ @Override
+ public void removeMessages(Identifier sid, Collection<Long> messageNrs, boolean outbound) {
+ removeMessages(sid, messageNrs, outbound ? ommap : immap);
+ }
+
+ private Collection<RMMessage> getMessages(Identifier seq, Map<Identifier, Collection<RMMessage>> map) {
+ Collection<RMMessage> cm = map.get(seq);
+ if (cm == null) {
+ cm = new LinkedList<RMMessage>();
+ map.put(seq, cm);
+ }
+ return cm;
+ }
+
+ private void removeMessages(Identifier sid, Collection<Long> messageNrs,
+ Map<Identifier, Collection<RMMessage>> map) {
+ for (Iterator<RMMessage> it = map.get(sid).iterator(); it.hasNext();) {
+ RMMessage m = it.next();
+ if (messageNrs.contains(m.getMessageNumber())) {
+ it.remove();
+ }
+ }
+ if (map.get(sid).size() == 0) {
+ map.remove(sid);
+ }
+ }
+ }
}