You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/12/30 18:45:55 UTC

[camel] 02/03: CAMEL-12969 : Map based Service Usage counting to remove memory leak (#2695)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.23.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0ecf3a91287fd02751ace579b53c465cde25205a
Author: Bob Paulin <bo...@bobpaulin.com>
AuthorDate: Sun Dec 30 11:57:36 2018 -0600

    CAMEL-12969 : Map based Service Usage counting to remove memory leak (#2695)
---
 .../camel/core/osgi/OsgiServiceRegistry.java       | 58 ++++++++++++++++------
 1 file changed, 42 insertions(+), 16 deletions(-)

diff --git a/components/camel-core-osgi/src/main/java/org/apache/camel/core/osgi/OsgiServiceRegistry.java b/components/camel-core-osgi/src/main/java/org/apache/camel/core/osgi/OsgiServiceRegistry.java
index ce24bb0..8045c95 100644
--- a/components/camel-core-osgi/src/main/java/org/apache/camel/core/osgi/OsgiServiceRegistry.java
+++ b/components/camel-core-osgi/src/main/java/org/apache/camel/core/osgi/OsgiServiceRegistry.java
@@ -19,9 +19,9 @@ package org.apache.camel.core.osgi;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.Service;
 import org.apache.camel.spi.Registry;
@@ -30,6 +30,8 @@ import org.apache.camel.util.ObjectHelper;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
 import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceEvent;
+import org.osgi.framework.ServiceListener;
 import org.osgi.framework.ServiceReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,13 +39,14 @@ import org.slf4j.LoggerFactory;
 /**
  * The OsgiServiceRegistry support to get the service object from the bundle context
  */
-public class OsgiServiceRegistry extends LifecycleStrategySupport implements Registry, Service {
-    private static final Logger LOG = LoggerFactory.getLogger(OsgiCamelContextHelper.class);
+public class OsgiServiceRegistry extends LifecycleStrategySupport implements Registry, Service, ServiceListener {
+    private static final Logger LOG = LoggerFactory.getLogger(OsgiServiceRegistry.class);
     private final BundleContext bundleContext;
-    private final Queue<ServiceReference<?>> serviceReferenceQueue = new ConcurrentLinkedQueue<>();
+    private final Map<ServiceReference<?>, AtomicLong> serviceReferenceUsageMap = new ConcurrentHashMap<>();
     
     public OsgiServiceRegistry(BundleContext bc) {
         bundleContext = bc;
+        bundleContext.addServiceListener(this);
     }
 
     /**
@@ -57,7 +60,7 @@ public class OsgiServiceRegistry extends LifecycleStrategySupport implements Reg
             if (refs != null && refs.length > 0) {
                 // just return the first one
                 sr = refs[0];
-                serviceReferenceQueue.add(sr);
+                incrementServiceUsage(sr);
                 service = bundleContext.getService(sr);
             }
         } catch (Exception ex) {
@@ -88,9 +91,7 @@ public class OsgiServiceRegistry extends LifecycleStrategySupport implements Reg
             }
         }
         if (sr != null) {
-            // Need to keep the track of Service
-            // and call ungetService when the camel context is closed 
-            serviceReferenceQueue.add(sr);
+            incrementServiceUsage(sr);
             service = bundleContext.getService(sr);
         }
         return service;
@@ -105,7 +106,7 @@ public class OsgiServiceRegistry extends LifecycleStrategySupport implements Reg
                 for (ServiceReference<?> sr : refs) {
                     if (sr != null) {
                         Object service = bundleContext.getService(sr);
-                        serviceReferenceQueue.add(sr);
+                        incrementServiceUsage(sr);
                         if (service != null) {
                             String name = (String)sr.getProperty("name");
                             if (name != null) {
@@ -151,12 +152,37 @@ public class OsgiServiceRegistry extends LifecycleStrategySupport implements Reg
     public void stop() throws Exception {
         // Unget the OSGi service as OSGi uses reference counting
         // and we should do this as one of the last actions when stopping Camel
-        ServiceReference<?> sr = serviceReferenceQueue.poll();
-        while (sr != null) {
-            bundleContext.ungetService(sr);
-            sr = serviceReferenceQueue.poll();
+        this.serviceReferenceUsageMap.forEach(this::drainServiceUsage);
+        this.serviceReferenceUsageMap.clear();
+    }
+
+    void drainServiceUsage(ServiceReference<?> serviceReference, AtomicLong serviceUsageCount) {
+        if (serviceUsageCount != null && serviceReference != null) {
+            while(serviceUsageCount.decrementAndGet() >= 0) {
+                this.bundleContext.ungetService(serviceReference);
+            }
+        }
+    }
+    
+    void incrementServiceUsage(ServiceReference<?> sr) {
+        AtomicLong serviceUsageCount = this.serviceReferenceUsageMap.get(sr);
+        if (serviceUsageCount != null) {
+            serviceUsageCount.incrementAndGet();
+        } else {
+            this.serviceReferenceUsageMap.merge(sr, new AtomicLong(1), 
+                (existingServiceUsageCount, newServiceUsageCount)->{
+                        existingServiceUsageCount.getAndAdd(newServiceUsageCount.get());
+                    return existingServiceUsageCount;
+                });
+        }
+    }
+
+    @Override
+    public void serviceChanged(ServiceEvent event) {
+        if (event.getType() == ServiceEvent.UNREGISTERING) {
+            ServiceReference<?> serviceReference = event.getServiceReference();
+            AtomicLong serviceUsageCount = this.serviceReferenceUsageMap.remove(serviceReference);
+            drainServiceUsage(serviceReference, serviceUsageCount);
         }
-        // Clean up the OSGi Service Cache
-        serviceReferenceQueue.clear();
     }
 }