You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/12/02 16:03:52 UTC

svn commit: r722487 - in /servicemix/smx3/trunk: core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ core/servicemix-core/src/test/java/org/apache/servicemix/jbi/monitoring/ distributions/apache-servicemix/src/main/release/conf/

Author: gertv
Date: Tue Dec  2 07:03:52 2008
New Revision: 722487

URL: http://svn.apache.org/viewvc?rev=722487&view=rev
Log:
SM-1714: Avoid memory leak through JMS connection factory and endpoint listener

Modified:
    servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/EndpointRegistry.java
    servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/monitoring/StatisticsServiceTest.java
    servicemix/smx3/trunk/distributions/apache-servicemix/src/main/release/conf/jndi.xml

Modified: servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/EndpointRegistry.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/EndpointRegistry.java?rev=722487&r1=722486&r2=722487&view=diff
==============================================================================
--- servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/EndpointRegistry.java (original)
+++ servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/EndpointRegistry.java Tue Dec  2 07:03:52 2008
@@ -24,6 +24,8 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 
 import javax.jbi.JBIException;
 import javax.jbi.component.ComponentContext;
@@ -65,6 +67,8 @@
     
     private List<EndpointProcessor> endpointProcessors;
     
+    private Executor executor = Executors.newSingleThreadExecutor();
+    
     /**
      * Constructor
      * 
@@ -78,6 +82,11 @@
         this.linkedEndpoints = new ConcurrentHashMap<String, ServiceEndpoint>();
         this.interfaceConnections = new ConcurrentHashMap<QName, InterfaceConnection>();
         this.endpointProcessors = getEndpointProcessors();
+        this.executor.execute(new Runnable() {
+            public void run() {
+                LOG.debug("Initializing endpoint event dispatch thread");
+            }
+        });
     }
     
     private List<EndpointProcessor> getEndpointProcessors() {
@@ -517,40 +526,42 @@
         }
     }
 
-    protected void fireEvent(ServiceEndpoint ep, int type) {
-        EndpointEvent event = new EndpointEvent(ep, type);
-        EndpointListener[] listeners = (EndpointListener[]) registry.getContainer().getListeners(EndpointListener.class);
-        for (int i = 0; i < listeners.length; i++) {
-            switch (type) {
-            case EndpointEvent.INTERNAL_ENDPOINT_REGISTERED:
-                listeners[i].internalEndpointRegistered(event);
-                break;
-            case EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED:
-                listeners[i].internalEndpointUnregistered(event);
-                break;
-            case EndpointEvent.EXTERNAL_ENDPOINT_REGISTERED:
-                listeners[i].externalEndpointRegistered(event);
-                break;
-            case EndpointEvent.EXTERNAL_ENDPOINT_UNREGISTERED:
-                listeners[i].externalEndpointUnregistered(event);
-                break;
-            case EndpointEvent.LINKED_ENDPOINT_REGISTERED:
-                listeners[i].linkedEndpointRegistered(event);
-                break;
-            case EndpointEvent.LINKED_ENDPOINT_UNREGISTERED:
-                listeners[i].linkedEndpointUnregistered(event);
-                break;
-            case EndpointEvent.REMOTE_ENDPOINT_REGISTERED:
-                listeners[i].remoteEndpointRegistered(event);
-                break;
-            case EndpointEvent.REMOTE_ENDPOINT_UNREGISTERED:
-                listeners[i].remoteEndpointUnregistered(event);
-                break;
-            default:
-                break;
+    protected synchronized void fireEvent(final ServiceEndpoint ep, final int type) {
+        executor.execute(new Runnable() {
+            public void run() {
+                EndpointEvent event = new EndpointEvent(ep, type);
+                EndpointListener[] listeners = (EndpointListener[]) registry.getContainer().getListeners(EndpointListener.class);
+                for (int i = 0; i < listeners.length; i++) {
+                    switch (type) {
+                    case EndpointEvent.INTERNAL_ENDPOINT_REGISTERED:
+                        listeners[i].internalEndpointRegistered(event);
+                        break;
+                    case EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED:
+                        listeners[i].internalEndpointUnregistered(event);
+                        break;
+                    case EndpointEvent.EXTERNAL_ENDPOINT_REGISTERED:
+                        listeners[i].externalEndpointRegistered(event);
+                        break;
+                    case EndpointEvent.EXTERNAL_ENDPOINT_UNREGISTERED:
+                        listeners[i].externalEndpointUnregistered(event);
+                        break;
+                    case EndpointEvent.LINKED_ENDPOINT_REGISTERED:
+                        listeners[i].linkedEndpointRegistered(event);
+                        break;
+                    case EndpointEvent.LINKED_ENDPOINT_UNREGISTERED:
+                        listeners[i].linkedEndpointUnregistered(event);
+                        break;
+                    case EndpointEvent.REMOTE_ENDPOINT_REGISTERED:
+                        listeners[i].remoteEndpointRegistered(event);
+                        break;
+                    case EndpointEvent.REMOTE_ENDPOINT_UNREGISTERED:
+                        listeners[i].remoteEndpointUnregistered(event);
+                        break;
+                    default:
+                        break;
+                    }
+                }
             }
-        }
-        
+        });
     }
-
 }

Modified: servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/monitoring/StatisticsServiceTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/monitoring/StatisticsServiceTest.java?rev=722487&r1=722486&r2=722487&view=diff
==============================================================================
--- servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/monitoring/StatisticsServiceTest.java (original)
+++ servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/monitoring/StatisticsServiceTest.java Tue Dec  2 07:03:52 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.servicemix.jbi.monitoring;
 
+import java.util.concurrent.CountDownLatch;
+
 import javax.jbi.JBIException;
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.xml.namespace.QName;
@@ -24,6 +26,8 @@
 
 import org.apache.servicemix.jbi.container.ActivationSpec;
 import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.event.EndpointAdapter;
+import org.apache.servicemix.jbi.event.EndpointEvent;
 import org.apache.servicemix.jbi.framework.ComponentContextImpl;
 import org.apache.servicemix.jbi.management.BaseSystemService;
 import org.apache.servicemix.jbi.messaging.DeliveryChannelImplTest.TestComponent;
@@ -51,13 +55,25 @@
         service = new StatisticsService();
     }
     
-    public void testAddEndpointStatsByListener() throws JBIException {
+    public void testAddEndpointStatsByListener() throws JBIException, InterruptedException {
+        // setup a latch to keep track of events being fired by a background thread
+        final CountDownLatch latch = new CountDownLatch(1);
+        container.addListener(new EndpointAdapter() {
+            @Override
+            public void internalEndpointRegistered(EndpointEvent event) {
+                latch.countDown();
+            } 
+        });
+        
         // initialize and start the StatisticsService
         service.init(container);
         service.start();
-
+        
         // now register a new endpoint
         ServiceEndpoint endpoint = registerEndpoint();
+        
+        // ensure that the event has been fired by the event dispatch thread
+        latch.await();
 
         // StatisticsService should know about the endpoint/component through listener callbacks
         assertNotNull(service.getComponentStats().get(COMPONENT));

Modified: servicemix/smx3/trunk/distributions/apache-servicemix/src/main/release/conf/jndi.xml
URL: http://svn.apache.org/viewvc/servicemix/smx3/trunk/distributions/apache-servicemix/src/main/release/conf/jndi.xml?rev=722487&r1=722486&r2=722487&view=diff
==============================================================================
--- servicemix/smx3/trunk/distributions/apache-servicemix/src/main/release/conf/jndi.xml (original)
+++ servicemix/smx3/trunk/distributions/apache-servicemix/src/main/release/conf/jndi.xml Tue Dec  2 07:03:52 2008
@@ -45,9 +45,7 @@
     
     <!-- wsn2005 jms connection factory -->
     <entry key="java:comp/env/jms/wsnotificationCF">
-      <bean class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
-        <constructor-arg value="${activemq.url}" />
-      </bean>
+      <amq:connectionFactory brokerURL="${activemq.url}" />
     </entry>
 
     <!-- transaction manager -->