You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ff...@apache.org on 2010/11/24 09:41:56 UTC
svn commit: r1038524 - in /cxf/branches/2.2.x-fixes: ./
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/
rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/
systests/ws-specs/src/test/java/org/apache/c...
Author: ffang
Date: Wed Nov 24 08:41:56 2010
New Revision: 1038524
URL: http://svn.apache.org/viewvc?rev=1038524&view=rev
Log:
Merged revisions 1038519 via svnmerge from
https://svn.apache.org/repos/asf/cxf/branches/2.3.x-fixes
................
r1038519 | ffang | 2010-11-24 16:18:31 +0800 (三, 24 11 2010) | 9 lines
Merged revisions 1038509 via svnmerge from
https://svn.apache.org/repos/asf/cxf/trunk
........
r1038509 | ffang | 2010-11-24 15:42:49 +0800 (三, 24 11 2010) | 1 line
[CXF-3114]WS-RM's RMTxStore's does not recover stored sequences after restart
........
................
Modified:
cxf/branches/2.2.x-fixes/ (props changed)
cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
cxf/branches/2.2.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
cxf/branches/2.2.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java
Propchange: cxf/branches/2.2.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1038524&r1=1038523&r2=1038524&view=diff
==============================================================================
--- cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Wed Nov 24 08:41:56 2010
@@ -394,12 +394,18 @@ public class RMManager implements Server
String id = RMUtils.getEndpointIdentifier(endpoint);
Collection<SourceSequence> sss = store.getSourceSequences(id);
- if (null == sss || 0 == sss.size()) {
+ Collection<DestinationSequence> dss = store.getDestinationSequences(id);
+ if ((null == sss || 0 == sss.size()) && (null == dss || 0 == dss.size())) {
return;
}
LOG.log(Level.FINE, "Number of source sequences: {0}", sss.size());
+ LOG.log(Level.FINE, "Number of destination sequences: {0}", dss.size());
- RMEndpoint rme = null;
+ LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
+ new Object[] {null == conduit ? "client" : "server", id});
+ RMEndpoint rme = createReliableEndpoint(endpoint);
+ rme.initialise(conduit, null);
+ reliableEndpoints.put(endpoint, rme);
for (SourceSequence ss : sss) {
Collection<RMMessage> ms = store.getMessages(ss.getIdentifier(), true);
@@ -408,13 +414,6 @@ public class RMManager implements Server
}
LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
- if (null == rme) {
- LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
- new Object[] {null == conduit ? "client" : "server", id});
- rme = createReliableEndpoint(endpoint);
- rme.initialise(conduit, null);
- reliableEndpoints.put(endpoint, rme);
- }
rme.getSource().addSequence(ss, false);
for (RMMessage m : ms) {
@@ -457,6 +456,10 @@ public class RMManager implements Server
retransmissionQueue.addUnacknowledged(message);
}
}
+
+ for (DestinationSequence ds : dss) {
+ rme.getDestination().addSequence(ds, false);
+ }
retransmissionQueue.start();
}
Modified: cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1038524&r1=1038523&r2=1038524&view=diff
==============================================================================
--- cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java (original)
+++ cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java Wed Nov 24 08:41:56 2010
@@ -604,7 +604,7 @@ public class RMTxStore implements RMStor
}
try {
- connection.setAutoCommit(false);
+ connection.setAutoCommit(true);
createTables();
} catch (SQLException ex) {
LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
@@ -614,7 +614,14 @@ public class RMTxStore implements RMStor
LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", se);
}
throw new RMStoreException(ex);
- }
+ } finally {
+ try {
+ connection.setAutoCommit(false);
+ } catch (SQLException ex) {
+ LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
+ throw new RMStoreException(ex);
+ }
+ }
}
Connection getConnection() {
Modified: cxf/branches/2.2.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java?rev=1038524&r1=1038523&r2=1038524&view=diff
==============================================================================
--- cxf/branches/2.2.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java (original)
+++ cxf/branches/2.2.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java Wed Nov 24 08:41:56 2010
@@ -491,7 +491,7 @@ public class RMManagerTest extends Asser
InterfaceInfo ii = control.createMock(InterfaceInfo.class);
setUpEndpointForRecovery(endpoint, ei, si, bi, ii);
Conduit conduit = control.createMock(Conduit.class);
- setUpRecoverReliableEndpoint(endpoint, conduit, null, null);
+ setUpRecoverReliableEndpoint(endpoint, conduit, null, null, null);
control.replay();
manager.recoverReliableEndpoint(endpoint, conduit);
control.verify();
@@ -499,7 +499,8 @@ public class RMManagerTest extends Asser
control.reset();
setUpEndpointForRecovery(endpoint, ei, si, bi, ii);
SourceSequence ss = control.createMock(SourceSequence.class);
- setUpRecoverReliableEndpoint(endpoint, conduit, ss, null);
+ DestinationSequence ds = control.createMock(DestinationSequence.class);
+ setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, null);
control.replay();
manager.recoverReliableEndpoint(endpoint, conduit);
control.verify();
@@ -507,7 +508,7 @@ public class RMManagerTest extends Asser
control.reset();
setUpEndpointForRecovery(endpoint, ei, si, bi, ii);
RMMessage m = control.createMock(RMMessage.class);
- setUpRecoverReliableEndpoint(endpoint, conduit, ss, m);
+ setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, m);
control.replay();
manager.recoverReliableEndpoint(endpoint, conduit);
control.verify();
@@ -529,7 +530,7 @@ public class RMManagerTest extends Asser
void setUpRecoverReliableEndpoint(Endpoint endpoint,
Conduit conduit,
SourceSequence ss,
- RMMessage m) {
+ DestinationSequence ds, RMMessage m) {
RMStore store = control.createMock(RMStore.class);
RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
manager.setStore(store);
@@ -544,6 +545,14 @@ public class RMManagerTest extends Asser
return;
}
+ Collection<DestinationSequence> dss = new ArrayList<DestinationSequence>();
+ if (null != ds) {
+ dss.add(ds);
+ }
+ EasyMock.expect(store.getDestinationSequences("{S}s.{P}p")).andReturn(dss);
+ if (null == ds) {
+ return;
+ }
Collection<RMMessage> ms = new ArrayList<RMMessage>();
if (null != m) {
ms.add(m);
@@ -552,25 +561,29 @@ public class RMManagerTest extends Asser
id.setValue("S1");
EasyMock.expect(ss.getIdentifier()).andReturn(id).times(null == m ? 1 : 2);
EasyMock.expect(store.getMessages(id, true)).andReturn(ms);
- if (null == m) {
- return;
- }
+
manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
RMEndpoint rme = control.createMock(RMEndpoint.class);
EasyMock.expect(manager.createReliableEndpoint(endpoint)).andReturn(rme);
Source source = control.createMock(Source.class);
- EasyMock.expect(rme.getSource()).andReturn(source);
- source.addSequence(ss, false);
+ EasyMock.expect(rme.getSource()).andReturn(source).anyTimes();
+
+ Destination destination = control.createMock(Destination.class);
+ EasyMock.expect(rme.getDestination()).andReturn(destination);
+ destination.addSequence(ds, false);
EasyMock.expectLastCall();
Service service = control.createMock(Service.class);
- EasyMock.expect(endpoint.getService()).andReturn(service);
+ EasyMock.expect(endpoint.getService()).andReturn(service).anyTimes();
Binding binding = control.createMock(Binding.class);
- EasyMock.expect(endpoint.getBinding()).andReturn(binding);
+ EasyMock.expect(endpoint.getBinding()).andReturn(binding).anyTimes();
- EasyMock.expect(ss.isLastMessage()).andReturn(true);
- EasyMock.expect(ss.getCurrentMessageNr()).andReturn(BigInteger.TEN);
+ EasyMock.expect(ss.isLastMessage()).andReturn(true).anyTimes();
+ EasyMock.expect(ss.getCurrentMessageNr()).andReturn(BigInteger.TEN).anyTimes();
+ if (null == m) {
+ return;
+ }
EasyMock.expect(m.getMessageNumber()).andReturn(BigInteger.TEN).times(2);
if (null == conduit) {
EasyMock.expect(m.getTo()).andReturn("toAddress");
Modified: cxf/branches/2.2.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java?rev=1038524&r1=1038523&r2=1038524&view=diff
==============================================================================
--- cxf/branches/2.2.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java (original)
+++ cxf/branches/2.2.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java Wed Nov 24 08:41:56 2010
@@ -71,7 +71,8 @@ public class ServerPersistenceTest exten
private OutMessageRecorder out;
private InMessageRecorder in;
-
+ private Bus greeterBus;
+
@BeforeClass
public static void startServers() throws Exception {
RMTxStore.deleteDatabaseFiles();
@@ -104,7 +105,7 @@ public class ServerPersistenceTest exten
assertTrue("Failed to start greeter", control.startGreeter(SERVER_LOSS_CFG));
LOG.fine("Started greeter server.");
- Bus greeterBus = new SpringBusFactory().createBus(CFG);
+ greeterBus = new SpringBusFactory().createBus(CFG);
LOG.fine("Created bus " + greeterBus + " with cfg : " + CFG);
BusFactory.setDefaultBus(greeterBus);
@@ -132,7 +133,7 @@ public class ServerPersistenceTest exten
LOG.fine("Configured greeter client.");
- Response<GreetMeResponse> responses[] = cast(new Response[3]);
+ Response<GreetMeResponse> responses[] = cast(new Response[4]);
responses[0] = greeter.greetMeAsync("one");
responses[1] = greeter.greetMeAsync("two");
@@ -151,6 +152,12 @@ public class ServerPersistenceTest exten
verifyServerRecovery(responses);
+ out.getOutboundMessages().clear();
+ in.getInboundMessages().clear();
+
+ responses[3] = greeter.greetMeAsync("four");
+ verifyRetransmissionQueue();
+
greeterBus.shutdown(true);
control.stopGreeter(CFG);
@@ -163,7 +170,7 @@ public class ServerPersistenceTest exten
// wait another while to prove that response to second request is indeed lost
Thread.sleep(4000);
int nDone = 0;
- for (int i = 0; i < responses.length; i++) {
+ for (int i = 0; i < 3; i++) {
if (responses[i].isDone()) {
nDone++;
}
@@ -197,7 +204,7 @@ public class ServerPersistenceTest exten
int nDone = 0;
long waited = 0;
while (waited < 5000) {
- for (int i = 0; i < responses.length; i++) {
+ for (int i = 0; i < responses.length - 1; i++) {
if (responses[i].isDone()) {
nDone++;
}
@@ -228,6 +235,13 @@ public class ServerPersistenceTest exten
}
+ void verifyRetransmissionQueue() throws Exception {
+ awaitMessages(1, 3, 40000);
+
+ boolean empty = greeterBus.getExtension(RMManager.class).getRetransmissionQueue().isEmpty();
+ assertTrue("Retransmission Queue is not empty", empty);
+ }
+
protected void awaitMessages(int nExpectedOut, int nExpectedIn) {
awaitMessages(nExpectedOut, nExpectedIn, 10000);
}