You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by am...@apache.org on 2013/05/29 17:25:16 UTC
svn commit: r1487514 - in /cxf/dosgi/trunk/dsw/cxf-topology-manager/src:
main/java/org/apache/cxf/dosgi/topologymanager/importer/
main/java/org/apache/cxf/dosgi/topologymanager/rsatracker/
test/java/org/apache/cxf/dosgi/topologymanager/importer/
Author: amichai
Date: Wed May 29 15:25:16 2013
New Revision: 1487514
URL: http://svn.apache.org/r1487514
Log:
DOSGI-174 Fix synchronization and resource leaks in TopologyManagerImport and related classes
Modified:
cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerManager.java
cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImpl.java
cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java
cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/rsatracker/RemoteServiceAdminTracker.java
cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java
Modified: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerManager.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerManager.java?rev=1487514&r1=1487513&r2=1487514&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerManager.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerManager.java Wed May 29 15:25:16 2013
@@ -37,9 +37,9 @@ public class EndpointListenerManager {
private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerManager.class);
private final BundleContext bctx;
- private ServiceRegistration serviceRegistration;
- private List<String> filters = new ArrayList<String>();
- private EndpointListener endpointListener;
+ private volatile ServiceRegistration serviceRegistration;
+ private final List<String> filters = new ArrayList<String>();
+ private final EndpointListener endpointListener;
protected EndpointListenerManager(BundleContext bc, EndpointListener endpointListener) {
this.bctx = bc;
@@ -52,7 +52,9 @@ public class EndpointListenerManager {
}
public void stop() {
- serviceRegistration.unregister();
+ if (serviceRegistration != null) {
+ serviceRegistration.unregister();
+ }
}
protected void extendScope(String filter) {
@@ -84,8 +86,7 @@ public class EndpointListenerManager {
if (LOG.isDebugEnabled()) {
LOG.debug("Current filter: " + filters);
}
- // TODO: make a copy of the filter list
- p.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, filters);
+ p.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, new ArrayList<String>(filters));
}
return p;
Modified: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImpl.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImpl.java?rev=1487514&r1=1487513&r2=1487514&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImpl.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImpl.java Wed May 29 15:25:16 2013
@@ -56,11 +56,11 @@ public class ListenerHookImpl implements
SYSTEM_PACKAGES.add("java.net.ContentHandler");
}
- private BundleContext bctx;
- private ServiceInterestListener serviceInterestListener;
+ private final BundleContext bctx;
+ private final ServiceInterestListener serviceInterestListener;
public ListenerHookImpl(BundleContext bc, ServiceInterestListener serviceInterestListener) {
- bctx = bc;
+ this.bctx = bc;
this.serviceInterestListener = serviceInterestListener;
}
@@ -114,7 +114,7 @@ public class ListenerHookImpl implements
}
- private String getClassNameFromFilter(String filter) {
+ private static String getClassNameFromFilter(String filter) {
if (filter != null) {
Matcher matcher = CLASS_NAME_PATTERN.matcher(filter);
if (matcher.matches() && matcher.groupCount() >= 1) {
Modified: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java?rev=1487514&r1=1487513&r2=1487514&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java Wed May 29 15:25:16 2013
@@ -19,12 +19,11 @@
package org.apache.cxf.dosgi.topologymanager.importer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -90,8 +89,8 @@ public class TopologyManagerImport imple
public TopologyManagerImport(BundleContext bc, RemoteServiceAdminTracker rsaTracker) {
bctx = bc;
- this.remoteServiceAdminTracker = rsaTracker;
- this.remoteServiceAdminTracker.addListener(new RemoteServiceAdminLifeCycleListener() {
+ remoteServiceAdminTracker = rsaTracker;
+ remoteServiceAdminTracker.addListener(new RemoteServiceAdminLifeCycleListener() {
public void added(RemoteServiceAdmin rsa) {
triggerImportsForRemoteServiceAdmin(rsa);
}
@@ -111,7 +110,9 @@ public class TopologyManagerImport imple
}
public void stop() {
+ endpointListenerManager.stop();
execService.shutdown();
+ // this is called from Activator.stop(), which implicitly unregisters our registered services
}
/* (non-Javadoc)
@@ -130,10 +131,10 @@ public class TopologyManagerImport imple
if (importInterestsCounter.remove(filter) == 0) {
LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter);
endpointListenerManager.reduceScope(filter);
- List<ImportRegistration> irs = importedServices.remove(filter);
- if (irs != null) {
- for (ImportRegistration ir : irs) {
- if (ir != null) {
+ synchronized (importedServices) {
+ List<ImportRegistration> irs = importedServices.remove(filter);
+ if (irs != null) {
+ for (ImportRegistration ir : irs) {
ir.close();
}
}
@@ -177,8 +178,9 @@ public class TopologyManagerImport imple
List<EndpointDescription> ips = importPossibilities.get(filter);
if (ips != null) {
ips.remove(epd);
- } else {
- // should not happen
+ if (ips.isEmpty()) {
+ importPossibilities.remove(filter);
+ }
}
}
}
@@ -186,9 +188,8 @@ public class TopologyManagerImport imple
public void triggerImportsForRemoteServiceAdmin(RemoteServiceAdmin rsa) {
LOG.debug("New RSA detected trying to import services with it");
synchronized (importPossibilities) {
- Set<Map.Entry<String, List<EndpointDescription>>> entries = importPossibilities.entrySet();
- for (Entry<String, List<EndpointDescription>> entry : entries) {
- triggerImport(entry.getKey());
+ for (String filter : importPossibilities.keySet()) {
+ triggerImport(filter);
}
}
}
@@ -199,12 +200,8 @@ public class TopologyManagerImport imple
execService.execute(new Runnable() {
public void run() {
try {
- synchronized (importedServices) { // deadlock possibility ?
- synchronized (importPossibilities) {
- unexportNotAvailableServices(filter);
- importServices(filter);
- }
- }
+ unexportNotAvailableServices(filter);
+ importServices(filter);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
@@ -216,53 +213,65 @@ public class TopologyManagerImport imple
}
private void unexportNotAvailableServices(String filter) {
- List<ImportRegistration> importRegistrations = getImportedServices(filter);
- if (importRegistrations.isEmpty()) {
- return;
- }
-
- Iterator<ImportRegistration> it = importRegistrations.iterator();
- while (it.hasNext()) {
- ImportRegistration ir = it.next();
- EndpointDescription ep = ir.getImportReference().getImportedEndpoint();
- if (!isImportPossibilityAvailable(ep, filter)) {
- // unexport service
- ir.close();
- it.remove();
+ synchronized (importedServices) {
+ List<ImportRegistration> importRegistrations = importedServices.get(filter);
+ if (importRegistrations == null) {
+ return;
}
- }
- if (importRegistrations.isEmpty()) {
- importedServices.remove(filter);
+ Iterator<ImportRegistration> it = importRegistrations.iterator();
+ while (it.hasNext()) {
+ ImportRegistration ir = it.next();
+ EndpointDescription ep = ir.getImportReference().getImportedEndpoint();
+ if (!isImportPossibilityAvailable(ep, filter)) {
+ // unexport service
+ ir.close();
+ it.remove();
+ }
+ }
+
+ if (importRegistrations.isEmpty()) {
+ importedServices.remove(filter);
+ }
}
}
private boolean isImportPossibilityAvailable(EndpointDescription ep, String filter) {
- List<EndpointDescription> ips = importPossibilities.get(filter);
- return ips != null && ips.contains(ep);
+ synchronized (importPossibilities) {
+ List<EndpointDescription> ips = importPossibilities.get(filter);
+ return ips != null && ips.contains(ep);
+ }
}
- /**
- * TODO but optional: if the service is already imported and the endpoint is still
- * in the list of possible imports check if a "better" endpoint is now in the list ?
- *
- * @param filter
- */
- private void importServices(String filter) {
- List<ImportRegistration> importRegistrations = getImportedServices(filter);
- List<EndpointDescription> possibilities = importPossibilities.get(filter);
- if (possibilities == null) {
- return;
+ // return a copy to prevent sync issues
+ private List<EndpointDescription> getImportPossibilitiesCopy(String filter) {
+ synchronized (importPossibilities) {
+ List<EndpointDescription> possibilities = importPossibilities.get(filter);
+ return possibilities == null
+ ? Collections.<EndpointDescription>emptyList()
+ : new ArrayList<EndpointDescription>(possibilities);
}
- for (EndpointDescription epd : possibilities) {
- if (!alreadyImported(epd, importRegistrations)) {
- // service not imported yet -> import it now
- ImportRegistration ir = importService(epd);
- if (ir != null) {
- // import was successful
- importRegistrations.add(ir);
- if (!importAllAvailable) {
- return;
+ }
+
+ private void importServices(String filter) {
+ synchronized (importedServices) {
+ List<ImportRegistration> importRegistrations = importedServices.get(filter);
+ for (EndpointDescription epd : getImportPossibilitiesCopy(filter)) {
+ // TODO but optional: if the service is already imported and the endpoint is still
+ // in the list of possible imports check if a "better" endpoint is now in the list
+ if (!alreadyImported(epd, importRegistrations)) {
+ // service not imported yet -> import it now
+ ImportRegistration ir = importService(epd);
+ if (ir != null) {
+ // import was successful
+ if (importRegistrations == null) {
+ importRegistrations = new ArrayList<ImportRegistration>();
+ importedServices.put(filter, importRegistrations);
+ }
+ importRegistrations.add(ir);
+ if (!importAllAvailable) {
+ return;
+ }
}
}
}
@@ -270,9 +279,11 @@ public class TopologyManagerImport imple
}
private boolean alreadyImported(EndpointDescription epd, List<ImportRegistration> importRegistrations) {
- for (ImportRegistration ir : importRegistrations) {
- if (epd.equals(ir.getImportReference().getImportedEndpoint())) {
- return true;
+ if (importRegistrations != null) {
+ for (ImportRegistration ir : importRegistrations) {
+ if (epd.equals(ir.getImportReference().getImportedEndpoint())) {
+ return true;
+ }
}
}
return false;
@@ -296,21 +307,6 @@ public class TopologyManagerImport imple
}
/**
- * Returns the list of already imported services for the given filter
- *
- * @param filter
- * @return import registrations for filter (will never return null)
- */
- private List<ImportRegistration> getImportedServices(String filter) {
- List<ImportRegistration> irs = importedServices.get(filter);
- if (irs == null) {
- irs = new ArrayList<ImportRegistration>();
- importedServices.put(filter, irs);
- }
- return irs;
- }
-
- /**
* This method is called once a RemoteServiceAdminEvent for an removed import reference is received.
* However the current implementation has no special support for multiple topology managers, therefore this method
* does nothing for the moment.
Modified: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/rsatracker/RemoteServiceAdminTracker.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/rsatracker/RemoteServiceAdminTracker.java?rev=1487514&r1=1487513&r2=1487514&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/rsatracker/RemoteServiceAdminTracker.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/rsatracker/RemoteServiceAdminTracker.java Wed May 29 15:25:16 2013
@@ -20,6 +20,7 @@ package org.apache.cxf.dosgi.topologyman
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
@@ -35,7 +36,7 @@ public class RemoteServiceAdminTracker e
public RemoteServiceAdminTracker(BundleContext bc) {
super(bc, RemoteServiceAdmin.class.getName(), null);
- this.listeners = new ArrayList<RemoteServiceAdminLifeCycleListener>();
+ this.listeners = new CopyOnWriteArrayList<RemoteServiceAdminLifeCycleListener>();
}
public void addListener(RemoteServiceAdminLifeCycleListener listener) {
Modified: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java?rev=1487514&r1=1487513&r2=1487514&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java Wed May 29 15:25:16 2013
@@ -55,6 +55,8 @@ public class TopologyManagerImportTest {
BundleContext bc = c.createMock(BundleContext.class);
RemoteServiceAdminTracker rsaTracker = c.createMock(RemoteServiceAdminTracker.class);
ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
+ sreg.unregister();
+ EasyMock.expectLastCall().once();
EasyMock.expect(bc.registerService((String)EasyMock.anyObject(),
EasyMock.anyObject(),
(Dictionary)EasyMock.anyObject())).andReturn(sreg).anyTimes();