You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2013/10/17 02:29:47 UTC
svn commit: r1532945 - in /cxf/trunk:
rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Author: dkulp
Date: Thu Oct 17 00:29:46 2013
New Revision: 1532945
URL: http://svn.apache.org/r1532945
Log:
Fix some of the locking in RMManager to prevent some deadlocks on close where a shutdown would ask to terminate the sequence, but we could not process the sequence ack on the decoupled channel.
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1532945&r1=1532944&r2=1532945&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Thu Oct 17 00:29:46 2013
@@ -20,9 +20,9 @@
package org.apache.cxf.ws.rm;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -109,7 +109,7 @@ public class RMManager {
private RMStore store;
private SequenceIdentifierGenerator idGenerator;
private RetransmissionQueue retransmissionQueue;
- private Map<Endpoint, RMEndpoint> reliableEndpoints = new HashMap<Endpoint, RMEndpoint>();
+ private Map<Endpoint, RMEndpoint> reliableEndpoints = new ConcurrentHashMap<Endpoint, RMEndpoint>();
private AtomicReference<Timer> timer = new AtomicReference<Timer>();
private RMConfiguration configuration;
private SourcePolicyType sourcePolicy;
@@ -308,7 +308,7 @@ public class RMManager {
// The real stuff ...
- public synchronized RMEndpoint getReliableEndpoint(Message message) throws RMException {
+ public RMEndpoint getReliableEndpoint(Message message) throws RMException {
Endpoint endpoint = message.getExchange().get(Endpoint.class);
QName name = endpoint.getEndpointInfo().getName();
if (LOG.isLoggable(Level.FINE)) {
@@ -332,6 +332,7 @@ public class RMManager {
addrUri = maps.getNamespaceURI();
}
}
+
RMConfiguration config = getConfiguration();
if (rmUri != null) {
config.setRMNamespace(rmUri);
@@ -364,22 +365,28 @@ public class RMManager {
}
RMEndpoint rme = reliableEndpoints.get(endpoint);
if (null == rme) {
- rme = createReliableEndpoint(endpoint);
- org.apache.cxf.transport.Destination destination = message.getExchange().getDestination();
- EndpointReferenceType replyTo = null;
- if (null != destination) {
- AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);
- replyTo = maps.getReplyTo();
- }
- Endpoint ei = message.getExchange().get(Endpoint.class);
- org.apache.cxf.transport.Destination dest
- = ei == null ? null : ei.getEndpointInfo()
- .getProperty(MAPAggregator.DECOUPLED_DESTINATION,
- org.apache.cxf.transport.Destination.class);
- config = RMPolicyUtilities.getRMConfiguration(config, message);
- rme.initialise(config, message.getExchange().getConduit(message), replyTo, dest, message);
- reliableEndpoints.put(endpoint, rme);
- LOG.fine("Created new RMEndpoint.");
+ synchronized (endpoint) {
+ rme = reliableEndpoints.get(endpoint);
+ if (rme != null) {
+ return rme;
+ }
+ rme = createReliableEndpoint(endpoint);
+ org.apache.cxf.transport.Destination destination = message.getExchange().getDestination();
+ EndpointReferenceType replyTo = null;
+ if (null != destination) {
+ AddressingProperties maps = RMContextUtils.retrieveMAPs(message, false, false);
+ replyTo = maps.getReplyTo();
+ }
+ Endpoint ei = message.getExchange().get(Endpoint.class);
+ org.apache.cxf.transport.Destination dest
+ = ei == null ? null : ei.getEndpointInfo()
+ .getProperty(MAPAggregator.DECOUPLED_DESTINATION,
+ org.apache.cxf.transport.Destination.class);
+ config = RMPolicyUtilities.getRMConfiguration(config, message);
+ rme.initialise(config, message.getExchange().getConduit(message), replyTo, dest, message);
+ reliableEndpoints.put(endpoint, rme);
+ LOG.fine("Created new RMEndpoint.");
+ }
}
return rme;
}
@@ -495,10 +502,8 @@ public class RMManager {
// unregistring of this managed bean from the server is done by the bus itself
}
- synchronized void shutdownReliableEndpoint(Endpoint e) {
- RMEndpoint rme = null;
-
- rme = reliableEndpoints.get(e);
+ void shutdownReliableEndpoint(Endpoint e) {
+ RMEndpoint rme = reliableEndpoints.get(e);
if (rme == null) {
// not found
return;
@@ -534,7 +539,9 @@ public class RMManager {
new Object[] {null == conduit ? "client" : "server", id});
RMEndpoint rme = createReliableEndpoint(endpoint);
rme.initialise(getConfiguration(), conduit, null, null, null);
- reliableEndpoints.put(endpoint, rme);
+ synchronized (reliableEndpoints) {
+ reliableEndpoints.put(endpoint, rme);
+ }
for (SourceSequence ss : sss) {
recoverSourceSequence(endpoint, conduit, rme.getSource(), ss);
}
Modified: cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?rev=1532945&r1=1532944&r2=1532945&view=diff
==============================================================================
--- cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ cxf/trunk/systests/ws-rm/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Thu Oct 17 00:29:46 2013
@@ -1576,13 +1576,13 @@ public class SequenceTest extends Abstra
//ensure we close the decoupled destination of the conduit,
//so that release the port if the destination reference count hit zero
if (greeter != null) {
- ClientProxy.getClient(greeter).getConduit().close();
+ //ClientProxy.getClient(greeter).getConduit().close();
}
if (greeter instanceof Closeable) {
((Closeable)greeter).close();
}
if (dispatch != null) {
- ((DispatchImpl<?>)dispatch).getClient().getConduit().close();
+ //((DispatchImpl<?>)dispatch).getClient().getConduit().close();
}
if (dispatch instanceof Closeable) {
((Closeable)dispatch).close();