You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2009/02/09 09:06:06 UTC

svn commit: r742323 - in /servicemix/components/engines/servicemix-eip/trunk/src: main/java/org/apache/servicemix/eip/patterns/ main/java/org/apache/servicemix/eip/support/ test/java/org/apache/servicemix/eip/

Author: ffang
Date: Mon Feb  9 08:06:05 2009
New Revision: 742323

URL: http://svn.apache.org/viewvc?rev=742323&view=rev
Log:
[SM-1792]LockManager impl causes memory leak in ServiceMix EIP

Modified:
    servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
    servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
    servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/WireTapTest.java

Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java?rev=742323&r1=742322&r2=742323&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java Mon Feb  9 08:06:05 2009
@@ -25,7 +25,10 @@
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.messaging.RobustInOnly;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.eip.support.AbstractAggregator;
 import org.apache.servicemix.eip.support.ExchangeTarget;
 import org.apache.servicemix.common.util.MessageUtil;
 
@@ -42,6 +45,8 @@
  */
 public class StaticRecipientList extends EIPEndpoint {
 
+    private static final Log LOG = LogFactory.getLog(StaticRecipientList.class);
+    
     public static final String RECIPIENT_LIST_COUNT = "org.apache.servicemix.eip.recipientList.count";
     public static final String RECIPIENT_LIST_INDEX = "org.apache.servicemix.eip.recipientList.index";
     public static final String RECIPIENT_LIST_CORRID = "org.apache.servicemix.eip.recipientList.corrid";
@@ -154,10 +159,12 @@
         if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
             String corrId = (String) exchange.getMessage("in").getProperty(RECIPIENT_LIST_CORRID);
             int count = (Integer) exchange.getMessage("in").getProperty(RECIPIENT_LIST_COUNT);
+            Integer acks = null;
             Lock lock = lockManager.getLock(corrId);
             lock.lock();
+            boolean removeLock = true;
             try {
-                Integer acks = (Integer) store.load(corrId + ".acks");
+                acks = (Integer) store.load(corrId + ".acks");
                 if (exchange.getStatus() == ExchangeStatus.DONE) {
                     // If the acks integer is not here anymore, the message response has been sent already
                     if (acks != null) {
@@ -166,6 +173,7 @@
                             done(me);
                         } else {
                             store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+                            removeLock = false;
                         }
                     }
                 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
@@ -179,6 +187,7 @@
                             done(me);
                         } else {
                             store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+                            removeLock = false;
                         }
                     }
                 } else if (exchange.getFault() != null) {
@@ -194,13 +203,21 @@
                             done(me);
                         } else {
                             store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+                            removeLock = false;
                         }
                     } else {
                         done(exchange);
                     }
                 }
             } finally {
-                lock.unlock();
+                try {
+                    lock.unlock();
+                } catch (Exception ex) {
+                    LOG.info("Caught exception while attempting to release lock", ex);
+                }
+                if (removeLock) {
+                    lockManager.removeLock(corrId);
+                }
             }
         } else {
             if (!(exchange instanceof InOnly) && !(exchange instanceof RobustInOnly)) {

Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=742323&r1=742322&r2=742323&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Mon Feb  9 08:06:05 2009
@@ -206,7 +206,7 @@
      * a DONE status.
      *  
      * @param reportTimeoutAsErrors <code>boolean</code> indicating if exchanges received prior to a
-     * 			timeout should be sent back with an ERROR status
+     *             timeout should be sent back with an ERROR status
      */
     public void setReportTimeoutAsErrors(boolean reportTimeoutAsErrors) {
         this.reportTimeoutAsErrors = reportTimeoutAsErrors;
@@ -259,8 +259,8 @@
         }
         closedAggregates = closedAggregatesStoreFactory.open(getService().toString() + getEndpoint() + "-closed-aggregates");
         if (reportTimeoutAsErrors && !reportErrors) {
-        	throw new IllegalArgumentException(
-        			"ReportTimeoutAsErrors property may only be set if ReportTimeout property is also set!");
+            throw new IllegalArgumentException(
+                    "ReportTimeoutAsErrors property may only be set if ReportTimeout property is also set!");
         }
     }
 
@@ -312,6 +312,7 @@
         // Load existing aggregation
         Lock lock = getLockManager().getLock(correlationId);
         lock.lock();
+        boolean removeLock = true;
         try {
             Object aggregation = store.load(correlationId);
             Date timeout = null;
@@ -335,6 +336,7 @@
                     }
                     exchanges.add(exchange);
                     store.store(correlationId + "-exchanges", exchanges);
+                    removeLock = false;
                 }
                 if (addMessage(aggregation, in, exchange)) {
                     sendAggregate(processCorrelationId, correlationId, aggregation, false, isSynchronous(exchange));
@@ -351,6 +353,7 @@
                         }, timeout);
                         timers.put(correlationId, t);
                     }
+                    removeLock = false;
                 }
                 if (!reportErrors) {
                     done(exchange);
@@ -363,7 +366,14 @@
                 }
             }
         } finally {
-            lock.unlock();
+            try {
+                lock.unlock();
+            } catch (Exception ex) {
+                LOG.info("Caught exception while attempting to release aggregation lock", ex);
+            }
+            if (removeLock) {
+                lockManager.removeLock(correlationId);
+            }
         }
     }
 
@@ -415,7 +425,7 @@
                         }
                     }
                     closeAggregation(correlationId);
-            	} else {
+                } else {
                     sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
                 }
             } else if (!isAggregationClosed(correlationId)) {
@@ -428,7 +438,12 @@
         } catch (Exception e) {
             LOG.info("Caught exception while processing timeout aggregation", e);
         } finally {
-            lock.unlock();
+            try {
+                lock.unlock();
+            } catch (Exception ex) {
+                LOG.info("Caught exception while attempting to release timeout aggregation lock", ex);
+            } 
+            lockManager.removeLock(correlationId);
         }
     }
 

Modified: servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/WireTapTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/WireTapTest.java?rev=742323&r1=742322&r2=742323&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/WireTapTest.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/WireTapTest.java Mon Feb  9 08:06:05 2009
@@ -125,10 +125,19 @@
         assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
         assertNotNull(me.getFault());
         client.done(me);
+
+        me = client.createRobustInOnlyExchange();
+        me.setService(new QName("wireTap"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.send(me);
+        me = (RobustInOnly) client.receive();
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertNotNull(me.getFault());
+        client.done(me);
         
-        inReceiver.getMessageList().assertMessagesReceived(1);
+        inReceiver.getMessageList().assertMessagesReceived(2);
         outReceiver.getMessageList().assertMessagesReceived(0);
-        faultReceiver.getMessageList().assertMessagesReceived(1);
+        faultReceiver.getMessageList().assertMessagesReceived(2);
         
         listener.assertExchangeCompleted();
     }
@@ -144,9 +153,19 @@
         assertNotNull(me.getFault());
         client.fail(me, new Exception("I do not like faults"));
         
-        inReceiver.getMessageList().assertMessagesReceived(1);
+        me = client.createRobustInOnlyExchange();
+        me.setService(new QName("wireTap"));
+        me.getInMessage().setContent(createSource("<hello/>"));
+        client.send(me);
+        
+        me = (RobustInOnly) client.receive(); 
+        assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+        assertNotNull(me.getFault());
+        client.fail(me, new Exception("I do not like faults"));
+        
+        inReceiver.getMessageList().assertMessagesReceived(2);
         outReceiver.getMessageList().assertMessagesReceived(0);
-        faultReceiver.getMessageList().assertMessagesReceived(1);
+        faultReceiver.getMessageList().assertMessagesReceived(2);
         
         listener.assertExchangeCompleted();
     }