You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2013/10/17 02:29:47 UTC

svn commit: r1532945 - in /cxf/trunk: rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java

Author: dkulp
Date: Thu Oct 17 00:29:46 2013
New Revision: 1532945

URL: http://svn.apache.org/r1532945
Log:
Fix some of the locking in RMManager to prevent some deadlocks on close where a shutdown would ask to terminate the sequence, but we could not process the sequence ack on the decoupled channel.

Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1532945&r1=1532944&r2=1532945&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Thu Oct 17 00:29:46 2013
@@ -20,9 +20,9 @@
 package org.apache.cxf.ws.rm;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Timer;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -109,7 +109,7 @@ public class RMManager {
     private RMStore store;
     private SequenceIdentifierGenerator idGenerator;
     private RetransmissionQueue retransmissionQueue;
-    private Map<Endpoint, RMEndpoint> reliableEndpoints = new HashMap<Endpoint, RMEndpoint>();
+    private Map<Endpoint, RMEndpoint> reliableEndpoints = new ConcurrentHashMap<Endpoint, RMEndpoint>();
     private AtomicReference<Timer> timer = new AtomicReference<Timer>();
     private RMConfiguration configuration;
     private SourcePolicyType sourcePolicy;
@@ -308,7 +308,7 @@ public class RMManager {
     
     // The real stuff ...
 
-    public synchronized RMEndpoint getReliableEndpoint(Message message) throws RMException {
+    public RMEndpoint getReliableEndpoint(Message message) throws RMException {
         Endpoint endpoint = message.getExchange().get(Endpoint.class);
         QName name = endpoint.getEndpointInfo().getName();
         if (LOG.isLoggable(Level.FINE)) {
@@ -332,6 +332,7 @@ public class RMManager {
                 addrUri = maps.getNamespaceURI();
             }
         }
+        
         RMConfiguration config = getConfiguration();
         if (rmUri != null) {
             config.setRMNamespace(rmUri);
@@ -364,22 +365,28 @@ public class RMManager {
         }
         RMEndpoint rme = reliableEndpoints.get(endpoint);
         if (null == rme) {
-            rme = createReliableEndpoint(endpoint);
-            org.apache.cxf.transport.Destination destination = message.getExchange().getDestination();
-            EndpointReferenceType replyTo = null;
-            if (null != destination) {
-                AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);
-                replyTo = maps.getReplyTo();
-            }
-            Endpoint ei = message.getExchange().get(Endpoint.class);
-            org.apache.cxf.transport.Destination dest 
-                = ei == null ? null : ei.getEndpointInfo()
-                    .getProperty(MAPAggregator.DECOUPLED_DESTINATION, 
-                             org.apache.cxf.transport.Destination.class);
-            config = RMPolicyUtilities.getRMConfiguration(config, message);
-            rme.initialise(config, message.getExchange().getConduit(message), replyTo, dest, message);
-            reliableEndpoints.put(endpoint, rme);
-            LOG.fine("Created new RMEndpoint.");
+            synchronized (endpoint) {
+                rme = reliableEndpoints.get(endpoint);
+                if (rme != null) {
+                    return rme;
+                }
+                rme = createReliableEndpoint(endpoint);                
+                org.apache.cxf.transport.Destination destination = message.getExchange().getDestination();
+                EndpointReferenceType replyTo = null;
+                if (null != destination) {
+                    AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);
+                    replyTo = maps.getReplyTo();
+                }
+                Endpoint ei = message.getExchange().get(Endpoint.class);
+                org.apache.cxf.transport.Destination dest 
+                    = ei == null ? null : ei.getEndpointInfo()
+                        .getProperty(MAPAggregator.DECOUPLED_DESTINATION, 
+                                 org.apache.cxf.transport.Destination.class);
+                config = RMPolicyUtilities.getRMConfiguration(config, message);
+                rme.initialise(config, message.getExchange().getConduit(message), replyTo, dest, message);
+                reliableEndpoints.put(endpoint, rme);
+                LOG.fine("Created new RMEndpoint.");
+            }
         }
         return rme;
     }
@@ -495,10 +502,8 @@ public class RMManager {
         // unregistring of this managed bean from the server is done by the bus itself
     }
     
-    synchronized void shutdownReliableEndpoint(Endpoint e) {
-        RMEndpoint rme = null;
-
-        rme = reliableEndpoints.get(e);
+    void shutdownReliableEndpoint(Endpoint e) {
+        RMEndpoint rme = reliableEndpoints.get(e);
         if (rme == null) {
             // not found
             return;
@@ -534,7 +539,9 @@ public class RMManager {
                 new Object[] {null == conduit ? "client" : "server", id});
         RMEndpoint rme = createReliableEndpoint(endpoint);
         rme.initialise(getConfiguration(), conduit, null, null, null);
-        reliableEndpoints.put(endpoint, rme);
+        synchronized (reliableEndpoints) {
+            reliableEndpoints.put(endpoint, rme);
+        }
         for (SourceSequence ss : sss) {            
             recoverSourceSequence(endpoint, conduit, rme.getSource(), ss);
         }

Modified: cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?rev=1532945&r1=1532944&r2=1532945&view=diff
==============================================================================
--- cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Thu Oct 17 00:29:46 2013
@@ -1576,13 +1576,13 @@ public class SequenceTest extends Abstra
             //ensure we close the decoupled destination of the conduit,
             //so that release the port if the destination reference count hit zero
             if (greeter != null) {
-                ClientProxy.getClient(greeter).getConduit().close();
+                //ClientProxy.getClient(greeter).getConduit().close();
             }
             if (greeter instanceof Closeable) {
                 ((Closeable)greeter).close();
             }
             if (dispatch != null) {
-                ((DispatchImpl<?>)dispatch).getClient().getConduit().close();
+                //((DispatchImpl<?>)dispatch).getClient().getConduit().close();
             }
             if (dispatch instanceof Closeable) {
                 ((Closeable)dispatch).close();