You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ff...@apache.org on 2010/11/24 09:41:56 UTC

svn commit: r1038524 - in /cxf/branches/2.2.x-fixes: ./ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ systests/ws-specs/src/test/java/org/apache/c...

Author: ffang
Date: Wed Nov 24 08:41:56 2010
New Revision: 1038524

URL: http://svn.apache.org/viewvc?rev=1038524&view=rev
Log:
Merged revisions 1038519 via svnmerge from 
https://svn.apache.org/repos/asf/cxf/branches/2.3.x-fixes

................
  r1038519 | ffang | 2010-11-24 16:18:31 +0800 (三, 24 11 2010) | 9 lines
  
  Merged revisions 1038509 via svnmerge from 
  https://svn.apache.org/repos/asf/cxf/trunk
  
  ........
    r1038509 | ffang | 2010-11-24 15:42:49 +0800 (三, 24 11 2010) | 1 line
    
    [CXF-3114]WS-RM's RMTxStore's does not recover stored sequences after restart
  ........
................

Modified:
    cxf/branches/2.2.x-fixes/   (props changed)
    cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
    cxf/branches/2.2.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
    cxf/branches/2.2.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java

Propchange: cxf/branches/2.2.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1038524&r1=1038523&r2=1038524&view=diff
==============================================================================
--- cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Wed Nov 24 08:41:56 2010
@@ -394,12 +394,18 @@ public class RMManager implements Server
         String id = RMUtils.getEndpointIdentifier(endpoint);
         
         Collection<SourceSequence> sss = store.getSourceSequences(id);
-        if (null == sss || 0 == sss.size()) {                        
+        Collection<DestinationSequence> dss = store.getDestinationSequences(id);
+        if ((null == sss || 0 == sss.size()) && (null == dss || 0 == dss.size())) {                        
             return;
         }
         LOG.log(Level.FINE, "Number of source sequences: {0}", sss.size());
+        LOG.log(Level.FINE, "Number of destination sequences: {0}", dss.size());
         
-        RMEndpoint rme = null;
+        LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
+                new Object[] {null == conduit ? "client" : "server", id});
+        RMEndpoint rme = createReliableEndpoint(endpoint);
+        rme.initialise(conduit, null);
+        reliableEndpoints.put(endpoint, rme);
         for (SourceSequence ss : sss) {            
  
             Collection<RMMessage> ms = store.getMessages(ss.getIdentifier(), true);
@@ -408,13 +414,6 @@ public class RMManager implements Server
             }
             LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
             
-            if (null == rme) {
-                LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
-                        new Object[] {null == conduit ? "client" : "server", id});
-                rme = createReliableEndpoint(endpoint);
-                rme.initialise(conduit, null);
-                reliableEndpoints.put(endpoint, rme);
-            }
             rme.getSource().addSequence(ss, false);
             
             for (RMMessage m : ms) {                
@@ -457,6 +456,10 @@ public class RMManager implements Server
                 retransmissionQueue.addUnacknowledged(message);
             }            
         }
+        
+        for (DestinationSequence ds : dss) {
+            rme.getDestination().addSequence(ds, false);        
+        }
         retransmissionQueue.start();
         
     }

Modified: cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1038524&r1=1038523&r2=1038524&view=diff
==============================================================================
--- cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java (original)
+++ cxf/branches/2.2.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java Wed Nov 24 08:41:56 2010
@@ -604,7 +604,7 @@ public class RMTxStore implements RMStor
         }
         
         try {
-            connection.setAutoCommit(false);
+            connection.setAutoCommit(true);
             createTables();
         } catch (SQLException ex) {
             LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
@@ -614,7 +614,14 @@ public class RMTxStore implements RMStor
                 LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", se);
             }
             throw new RMStoreException(ex);
-        }   
+        } finally {
+            try {
+                connection.setAutoCommit(false);                
+            } catch (SQLException ex) {
+                LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
+                throw new RMStoreException(ex);
+            }
+        }
     }   
     
     Connection getConnection() {

Modified: cxf/branches/2.2.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java?rev=1038524&r1=1038523&r2=1038524&view=diff
==============================================================================
--- cxf/branches/2.2.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java (original)
+++ cxf/branches/2.2.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java Wed Nov 24 08:41:56 2010
@@ -491,7 +491,7 @@ public class RMManagerTest extends Asser
         InterfaceInfo ii = control.createMock(InterfaceInfo.class);
         setUpEndpointForRecovery(endpoint, ei, si, bi, ii);          
         Conduit conduit = control.createMock(Conduit.class);        
-        setUpRecoverReliableEndpoint(endpoint, conduit, null, null);
+        setUpRecoverReliableEndpoint(endpoint, conduit, null, null, null);
         control.replay();
         manager.recoverReliableEndpoint(endpoint, conduit);
         control.verify();
@@ -499,7 +499,8 @@ public class RMManagerTest extends Asser
         control.reset();
         setUpEndpointForRecovery(endpoint, ei, si, bi, ii);
         SourceSequence ss = control.createMock(SourceSequence.class);
-        setUpRecoverReliableEndpoint(endpoint, conduit, ss, null);
+        DestinationSequence ds = control.createMock(DestinationSequence.class);
+        setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, null);
         control.replay();
         manager.recoverReliableEndpoint(endpoint, conduit);
         control.verify();
@@ -507,7 +508,7 @@ public class RMManagerTest extends Asser
         control.reset();
         setUpEndpointForRecovery(endpoint, ei, si, bi, ii);  
         RMMessage m = control.createMock(RMMessage.class);
-        setUpRecoverReliableEndpoint(endpoint, conduit, ss, m);        
+        setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, m);        
         control.replay();
         manager.recoverReliableEndpoint(endpoint, conduit);
         control.verify();        
@@ -529,7 +530,7 @@ public class RMManagerTest extends Asser
     void setUpRecoverReliableEndpoint(Endpoint endpoint,
                                       Conduit conduit, 
                                       SourceSequence ss, 
-                                      RMMessage m)  {                
+                                      DestinationSequence ds, RMMessage m)  {                
         RMStore store = control.createMock(RMStore.class);
         RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
         manager.setStore(store);
@@ -544,6 +545,14 @@ public class RMManagerTest extends Asser
             return;
         }         
         
+        Collection<DestinationSequence> dss = new ArrayList<DestinationSequence>();
+        if (null != ds) {
+            dss.add(ds);            
+        }
+        EasyMock.expect(store.getDestinationSequences("{S}s.{P}p")).andReturn(dss);
+        if (null == ds) {
+            return;
+        }
         Collection<RMMessage> ms = new ArrayList<RMMessage>();
         if (null != m) {
             ms.add(m);
@@ -552,25 +561,29 @@ public class RMManagerTest extends Asser
         id.setValue("S1");
         EasyMock.expect(ss.getIdentifier()).andReturn(id).times(null == m ? 1 : 2);
         EasyMock.expect(store.getMessages(id, true)).andReturn(ms);
-        if (null == m) {
-            return;
-        }
+        
         
         manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
         RMEndpoint rme = control.createMock(RMEndpoint.class);
         EasyMock.expect(manager.createReliableEndpoint(endpoint)).andReturn(rme);
         Source source = control.createMock(Source.class);
-        EasyMock.expect(rme.getSource()).andReturn(source);
-        source.addSequence(ss, false);
+        EasyMock.expect(rme.getSource()).andReturn(source).anyTimes();
+                
+        Destination destination = control.createMock(Destination.class);
+        EasyMock.expect(rme.getDestination()).andReturn(destination);
+        destination.addSequence(ds, false);
         EasyMock.expectLastCall();
         
         Service service = control.createMock(Service.class);
-        EasyMock.expect(endpoint.getService()).andReturn(service);
+        EasyMock.expect(endpoint.getService()).andReturn(service).anyTimes();
         Binding binding = control.createMock(Binding.class);
-        EasyMock.expect(endpoint.getBinding()).andReturn(binding);
+        EasyMock.expect(endpoint.getBinding()).andReturn(binding).anyTimes();
        
-        EasyMock.expect(ss.isLastMessage()).andReturn(true);
-        EasyMock.expect(ss.getCurrentMessageNr()).andReturn(BigInteger.TEN);
+        EasyMock.expect(ss.isLastMessage()).andReturn(true).anyTimes();
+        EasyMock.expect(ss.getCurrentMessageNr()).andReturn(BigInteger.TEN).anyTimes();
+        if (null == m) {
+            return;
+        }
         EasyMock.expect(m.getMessageNumber()).andReturn(BigInteger.TEN).times(2);
         if (null == conduit) {
             EasyMock.expect(m.getTo()).andReturn("toAddress");

Modified: cxf/branches/2.2.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.2.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java?rev=1038524&r1=1038523&r2=1038524&view=diff
==============================================================================
--- cxf/branches/2.2.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java (original)
+++ cxf/branches/2.2.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java Wed Nov 24 08:41:56 2010
@@ -71,7 +71,8 @@ public class ServerPersistenceTest exten
 
     private OutMessageRecorder out;
     private InMessageRecorder in;
-
+    private Bus greeterBus;
+    
     @BeforeClass
     public static void startServers() throws Exception {
         RMTxStore.deleteDatabaseFiles();
@@ -104,7 +105,7 @@ public class ServerPersistenceTest exten
         assertTrue("Failed to start greeter", control.startGreeter(SERVER_LOSS_CFG)); 
         LOG.fine("Started greeter server.");
         
-        Bus greeterBus = new SpringBusFactory().createBus(CFG);
+        greeterBus = new SpringBusFactory().createBus(CFG);
         LOG.fine("Created bus " + greeterBus + " with cfg : " + CFG);        
         BusFactory.setDefaultBus(greeterBus);
         
@@ -132,7 +133,7 @@ public class ServerPersistenceTest exten
         
         LOG.fine("Configured greeter client.");
 
-        Response<GreetMeResponse> responses[] = cast(new Response[3]);
+        Response<GreetMeResponse> responses[] = cast(new Response[4]);
         
         responses[0] = greeter.greetMeAsync("one");
         responses[1] = greeter.greetMeAsync("two");
@@ -151,6 +152,12 @@ public class ServerPersistenceTest exten
         
         verifyServerRecovery(responses);
         
+        out.getOutboundMessages().clear();
+        in.getInboundMessages().clear();
+        
+        responses[3] = greeter.greetMeAsync("four");
+        verifyRetransmissionQueue();
+        
         greeterBus.shutdown(true);
         
         control.stopGreeter(CFG);
@@ -163,7 +170,7 @@ public class ServerPersistenceTest exten
         // wait another while to prove that response to second request is indeed lost
         Thread.sleep(4000);
         int nDone = 0;
-        for (int i = 0; i < responses.length; i++) {
+        for (int i = 0; i < 3; i++) {
             if (responses[i].isDone()) {
                 nDone++;
             }
@@ -197,7 +204,7 @@ public class ServerPersistenceTest exten
         int nDone = 0;
         long waited = 0;
         while (waited < 5000) {
-            for (int i = 0; i < responses.length; i++) {
+            for (int i = 0; i < responses.length - 1; i++) {
                 if (responses[i].isDone()) {
                     nDone++;
                 }
@@ -228,6 +235,13 @@ public class ServerPersistenceTest exten
     }
   
     
+    void verifyRetransmissionQueue() throws Exception {
+        awaitMessages(1, 3, 40000);
+        
+        boolean empty = greeterBus.getExtension(RMManager.class).getRetransmissionQueue().isEmpty();
+        assertTrue("Retransmission Queue is not empty", empty);
+    }
+
     protected void awaitMessages(int nExpectedOut, int nExpectedIn) {
         awaitMessages(nExpectedOut, nExpectedIn, 10000);
     }