You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by am...@apache.org on 2013/05/29 17:25:16 UTC

svn commit: r1487514 - in /cxf/dosgi/trunk/dsw/cxf-topology-manager/src: main/java/org/apache/cxf/dosgi/topologymanager/importer/ main/java/org/apache/cxf/dosgi/topologymanager/rsatracker/ test/java/org/apache/cxf/dosgi/topologymanager/importer/

Author: amichai
Date: Wed May 29 15:25:16 2013
New Revision: 1487514

URL: http://svn.apache.org/r1487514
Log:
DOSGI-174 Fix synchronization and resource leaks in TopologyManagerImport and related classes

Modified:
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerManager.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImpl.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/rsatracker/RemoteServiceAdminTracker.java
    cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java

Modified: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerManager.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerManager.java?rev=1487514&r1=1487513&r2=1487514&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerManager.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/EndpointListenerManager.java Wed May 29 15:25:16 2013
@@ -37,9 +37,9 @@ public class EndpointListenerManager {
     private static final Logger LOG = LoggerFactory.getLogger(EndpointListenerManager.class);
     
     private final BundleContext bctx;
-    private ServiceRegistration serviceRegistration;
-    private List<String> filters = new ArrayList<String>();
-    private EndpointListener endpointListener;
+    private volatile ServiceRegistration serviceRegistration;
+    private final List<String> filters = new ArrayList<String>();
+    private final EndpointListener endpointListener;
 
     protected EndpointListenerManager(BundleContext bc, EndpointListener endpointListener) {
         this.bctx = bc;
@@ -52,7 +52,9 @@ public class EndpointListenerManager {
     }
     
     public void stop() {
-        serviceRegistration.unregister();
+        if (serviceRegistration != null) {
+            serviceRegistration.unregister();
+        }
     }
 
     protected void extendScope(String filter) {
@@ -84,8 +86,7 @@ public class EndpointListenerManager {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Current filter: " + filters);
             }
-            // TODO: make a copy of the filter list
-            p.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, filters);
+            p.put(EndpointListener.ENDPOINT_LISTENER_SCOPE, new ArrayList<String>(filters));
         }
 
         return p;

Modified: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImpl.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImpl.java?rev=1487514&r1=1487513&r2=1487514&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImpl.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/ListenerHookImpl.java Wed May 29 15:25:16 2013
@@ -56,11 +56,11 @@ public class ListenerHookImpl implements
         SYSTEM_PACKAGES.add("java.net.ContentHandler");
     }
 
-    private BundleContext bctx;
-    private ServiceInterestListener serviceInterestListener;
+    private final BundleContext bctx;
+    private final ServiceInterestListener serviceInterestListener;
 
     public ListenerHookImpl(BundleContext bc, ServiceInterestListener serviceInterestListener) {
-        bctx = bc;
+        this.bctx = bc;
         this.serviceInterestListener = serviceInterestListener;
     }
 
@@ -114,7 +114,7 @@ public class ListenerHookImpl implements
 
     }
 
-    private String getClassNameFromFilter(String filter) {
+    private static String getClassNameFromFilter(String filter) {
         if (filter != null) {
             Matcher matcher = CLASS_NAME_PATTERN.matcher(filter);
             if (matcher.matches() && matcher.groupCount() >= 1) {

Modified: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java?rev=1487514&r1=1487513&r2=1487514&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImport.java Wed May 29 15:25:16 2013
@@ -19,12 +19,11 @@
 package org.apache.cxf.dosgi.topologymanager.importer;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -90,8 +89,8 @@ public class TopologyManagerImport imple
 
     public TopologyManagerImport(BundleContext bc, RemoteServiceAdminTracker rsaTracker) {
         bctx = bc;
-        this.remoteServiceAdminTracker = rsaTracker;
-        this.remoteServiceAdminTracker.addListener(new RemoteServiceAdminLifeCycleListener() {
+        remoteServiceAdminTracker = rsaTracker;
+        remoteServiceAdminTracker.addListener(new RemoteServiceAdminLifeCycleListener() {
             public void added(RemoteServiceAdmin rsa) {
                 triggerImportsForRemoteServiceAdmin(rsa);
             }
@@ -111,7 +110,9 @@ public class TopologyManagerImport imple
     }
 
     public void stop() {
+        endpointListenerManager.stop();
         execService.shutdown();
+        // this is called from Activator.stop(), which implicitly unregisters our registered services
     }
     
     /* (non-Javadoc)
@@ -130,10 +131,10 @@ public class TopologyManagerImport imple
         if (importInterestsCounter.remove(filter) == 0) {
             LOG.debug("last reference to import interest is gone -> removing interest filter: {}", filter);
             endpointListenerManager.reduceScope(filter);
-            List<ImportRegistration> irs = importedServices.remove(filter);
-            if (irs != null) {
-                for (ImportRegistration ir : irs) {
-                    if (ir != null) {
+            synchronized (importedServices) {
+                List<ImportRegistration> irs = importedServices.remove(filter);
+                if (irs != null) {
+                    for (ImportRegistration ir : irs) {
                         ir.close();
                     }
                 }
@@ -177,8 +178,9 @@ public class TopologyManagerImport imple
             List<EndpointDescription> ips = importPossibilities.get(filter);
             if (ips != null) {
                 ips.remove(epd);
-            } else {
-                // should not happen
+                if (ips.isEmpty()) {
+                    importPossibilities.remove(filter);
+                }
             }
         }
     }
@@ -186,9 +188,8 @@ public class TopologyManagerImport imple
     public void triggerImportsForRemoteServiceAdmin(RemoteServiceAdmin rsa) {
         LOG.debug("New RSA detected trying to import services with it");
         synchronized (importPossibilities) {
-            Set<Map.Entry<String, List<EndpointDescription>>> entries = importPossibilities.entrySet();
-            for (Entry<String, List<EndpointDescription>> entry : entries) {
-                triggerImport(entry.getKey());
+            for (String filter : importPossibilities.keySet()) {
+                triggerImport(filter);
             }
         }
     }
@@ -199,12 +200,8 @@ public class TopologyManagerImport imple
         execService.execute(new Runnable() {
             public void run() {
                 try {
-                    synchronized (importedServices) { // deadlock possibility ?
-                        synchronized (importPossibilities) {
-                            unexportNotAvailableServices(filter);
-                            importServices(filter);
-                        }
-                    }
+                    unexportNotAvailableServices(filter);
+                    importServices(filter);
                 } catch (Exception e) {
                     LOG.error(e.getMessage(), e);
                 }
@@ -216,53 +213,65 @@ public class TopologyManagerImport imple
     }
     
     private void unexportNotAvailableServices(String filter) {
-        List<ImportRegistration> importRegistrations = getImportedServices(filter);
-        if (importRegistrations.isEmpty()) {
-            return;
-        }
-            
-        Iterator<ImportRegistration> it = importRegistrations.iterator();
-        while (it.hasNext()) {
-            ImportRegistration ir = it.next();
-            EndpointDescription ep = ir.getImportReference().getImportedEndpoint();
-            if (!isImportPossibilityAvailable(ep, filter)) {
-                // unexport service
-                ir.close();
-                it.remove();
+        synchronized (importedServices) {
+            List<ImportRegistration> importRegistrations = importedServices.get(filter);
+            if (importRegistrations == null) {
+                return;
             }
-        }
 
-        if (importRegistrations.isEmpty()) {
-            importedServices.remove(filter);
+            Iterator<ImportRegistration> it = importRegistrations.iterator();
+            while (it.hasNext()) {
+                ImportRegistration ir = it.next();
+                EndpointDescription ep = ir.getImportReference().getImportedEndpoint();
+                if (!isImportPossibilityAvailable(ep, filter)) {
+                    // unexport service
+                    ir.close();
+                    it.remove();
+                }
+            }
+
+            if (importRegistrations.isEmpty()) {
+                importedServices.remove(filter);
+            }
         }
     }
     
     private boolean isImportPossibilityAvailable(EndpointDescription ep, String filter) {
-        List<EndpointDescription> ips = importPossibilities.get(filter);
-        return ips != null && ips.contains(ep);
+        synchronized (importPossibilities) {
+            List<EndpointDescription> ips = importPossibilities.get(filter);
+            return ips != null && ips.contains(ep);
+        }
     }
 
-    /**
-     * TODO but optional: if the service is already imported and the endpoint is still
-     * in the list of possible imports check if a "better" endpoint is now in the list ?
-     * 
-     * @param filter
-     */
-    private void importServices(String filter) {        
-        List<ImportRegistration> importRegistrations = getImportedServices(filter);
-        List<EndpointDescription> possibilities = importPossibilities.get(filter);
-        if (possibilities == null) {
-            return;
+    // return a copy to prevent sync issues
+    private List<EndpointDescription> getImportPossibilitiesCopy(String filter) {
+        synchronized (importPossibilities) {
+            List<EndpointDescription> possibilities = importPossibilities.get(filter);
+            return possibilities == null
+                ? Collections.<EndpointDescription>emptyList()
+                : new ArrayList<EndpointDescription>(possibilities);
         }
-        for (EndpointDescription epd : possibilities) {
-            if (!alreadyImported(epd, importRegistrations)) {
-                // service not imported yet -> import it now
-                ImportRegistration ir = importService(epd);
-                if (ir != null) {
-                    // import was successful
-                    importRegistrations.add(ir);
-                    if (!importAllAvailable) {
-                        return;
+    }
+
+    private void importServices(String filter) {
+        synchronized (importedServices) {
+            List<ImportRegistration> importRegistrations = importedServices.get(filter);
+            for (EndpointDescription epd : getImportPossibilitiesCopy(filter)) {
+                // TODO but optional: if the service is already imported and the endpoint is still
+                // in the list of possible imports check if a "better" endpoint is now in the list
+                if (!alreadyImported(epd, importRegistrations)) {
+                    // service not imported yet -> import it now
+                    ImportRegistration ir = importService(epd);
+                    if (ir != null) {
+                        // import was successful
+                        if (importRegistrations == null) {
+                            importRegistrations = new ArrayList<ImportRegistration>();
+                            importedServices.put(filter, importRegistrations);
+                        }
+                        importRegistrations.add(ir);
+                        if (!importAllAvailable) {
+                            return;
+                        }
                     }
                 }
             }
@@ -270,9 +279,11 @@ public class TopologyManagerImport imple
     }
 
     private boolean alreadyImported(EndpointDescription epd, List<ImportRegistration> importRegistrations) {
-        for (ImportRegistration ir : importRegistrations) {
-            if (epd.equals(ir.getImportReference().getImportedEndpoint())) {
-                return true;
+        if (importRegistrations != null) {
+            for (ImportRegistration ir : importRegistrations) {
+                if (epd.equals(ir.getImportReference().getImportedEndpoint())) {
+                    return true;
+                }
             }
         }
         return false;
@@ -296,21 +307,6 @@ public class TopologyManagerImport imple
     }
 
     /**
-     * Returns the list of already imported services for the given filter
-     *  
-     * @param filter
-     * @return import registrations for filter (will never return null)
-     */
-    private List<ImportRegistration> getImportedServices(String filter) {
-        List<ImportRegistration> irs = importedServices.get(filter);
-        if (irs == null) {
-            irs = new ArrayList<ImportRegistration>();
-            importedServices.put(filter, irs);
-        }
-        return irs;
-    }
-
-    /**
      * This method is called once a RemoteServiceAdminEvent for an removed import reference is received.
      * However the current implementation has no special support for multiple topology managers, therefore this method
      * does nothing for the moment.

Modified: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/rsatracker/RemoteServiceAdminTracker.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/rsatracker/RemoteServiceAdminTracker.java?rev=1487514&r1=1487513&r2=1487514&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/rsatracker/RemoteServiceAdminTracker.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/main/java/org/apache/cxf/dosgi/topologymanager/rsatracker/RemoteServiceAdminTracker.java Wed May 29 15:25:16 2013
@@ -20,6 +20,7 @@ package org.apache.cxf.dosgi.topologyman
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
@@ -35,7 +36,7 @@ public class RemoteServiceAdminTracker e
 
     public RemoteServiceAdminTracker(BundleContext bc) {
         super(bc, RemoteServiceAdmin.class.getName(), null);
-        this.listeners = new ArrayList<RemoteServiceAdminLifeCycleListener>();
+        this.listeners = new CopyOnWriteArrayList<RemoteServiceAdminLifeCycleListener>();
     }
 
     public void addListener(RemoteServiceAdminLifeCycleListener listener) {

Modified: cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java
URL: http://svn.apache.org/viewvc/cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java?rev=1487514&r1=1487513&r2=1487514&view=diff
==============================================================================
--- cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java (original)
+++ cxf/dosgi/trunk/dsw/cxf-topology-manager/src/test/java/org/apache/cxf/dosgi/topologymanager/importer/TopologyManagerImportTest.java Wed May 29 15:25:16 2013
@@ -55,6 +55,8 @@ public class TopologyManagerImportTest {
         BundleContext bc = c.createMock(BundleContext.class);
         RemoteServiceAdminTracker rsaTracker = c.createMock(RemoteServiceAdminTracker.class);
         ServiceRegistration sreg = c.createMock(ServiceRegistration.class);
+        sreg.unregister();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(bc.registerService((String)EasyMock.anyObject(),
                                            EasyMock.anyObject(), 
                                            (Dictionary)EasyMock.anyObject())).andReturn(sreg).anyTimes();