You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2009/02/09 09:06:06 UTC
svn commit: r742323 - in
/servicemix/components/engines/servicemix-eip/trunk/src:
main/java/org/apache/servicemix/eip/patterns/
main/java/org/apache/servicemix/eip/support/
test/java/org/apache/servicemix/eip/
Author: ffang
Date: Mon Feb 9 08:06:05 2009
New Revision: 742323
URL: http://svn.apache.org/viewvc?rev=742323&view=rev
Log:
[SM-1792]LockManager impl causes memory leak in ServiceMix EIP
Modified:
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/WireTapTest.java
Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java?rev=742323&r1=742322&r2=742323&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java Mon Feb 9 08:06:05 2009
@@ -25,7 +25,10 @@
import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.messaging.RobustInOnly;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.eip.support.AbstractAggregator;
import org.apache.servicemix.eip.support.ExchangeTarget;
import org.apache.servicemix.common.util.MessageUtil;
@@ -42,6 +45,8 @@
*/
public class StaticRecipientList extends EIPEndpoint {
+ private static final Log LOG = LogFactory.getLog(StaticRecipientList.class);
+
public static final String RECIPIENT_LIST_COUNT = "org.apache.servicemix.eip.recipientList.count";
public static final String RECIPIENT_LIST_INDEX = "org.apache.servicemix.eip.recipientList.index";
public static final String RECIPIENT_LIST_CORRID = "org.apache.servicemix.eip.recipientList.corrid";
@@ -154,10 +159,12 @@
if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
String corrId = (String) exchange.getMessage("in").getProperty(RECIPIENT_LIST_CORRID);
int count = (Integer) exchange.getMessage("in").getProperty(RECIPIENT_LIST_COUNT);
+ Integer acks = null;
Lock lock = lockManager.getLock(corrId);
lock.lock();
+ boolean removeLock = true;
try {
- Integer acks = (Integer) store.load(corrId + ".acks");
+ acks = (Integer) store.load(corrId + ".acks");
if (exchange.getStatus() == ExchangeStatus.DONE) {
// If the acks integer is not here anymore, the message response has been sent already
if (acks != null) {
@@ -166,6 +173,7 @@
done(me);
} else {
store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+ removeLock = false;
}
}
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
@@ -179,6 +187,7 @@
done(me);
} else {
store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+ removeLock = false;
}
}
} else if (exchange.getFault() != null) {
@@ -194,13 +203,21 @@
done(me);
} else {
store.store(corrId + ".acks", Integer.valueOf(acks + 1));
+ removeLock = false;
}
} else {
done(exchange);
}
}
} finally {
- lock.unlock();
+ try {
+ lock.unlock();
+ } catch (Exception ex) {
+ LOG.info("Caught exception while attempting to release lock", ex);
+ }
+ if (removeLock) {
+ lockManager.removeLock(corrId);
+ }
}
} else {
if (!(exchange instanceof InOnly) && !(exchange instanceof RobustInOnly)) {
Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=742323&r1=742322&r2=742323&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Mon Feb 9 08:06:05 2009
@@ -206,7 +206,7 @@
* a DONE status.
*
* @param reportTimeoutAsErrors <code>boolean</code> indicating if exchanges received prior to a
- * timeout should be sent back with an ERROR status
+ * timeout should be sent back with an ERROR status
*/
public void setReportTimeoutAsErrors(boolean reportTimeoutAsErrors) {
this.reportTimeoutAsErrors = reportTimeoutAsErrors;
@@ -259,8 +259,8 @@
}
closedAggregates = closedAggregatesStoreFactory.open(getService().toString() + getEndpoint() + "-closed-aggregates");
if (reportTimeoutAsErrors && !reportErrors) {
- throw new IllegalArgumentException(
- "ReportTimeoutAsErrors property may only be set if ReportTimeout property is also set!");
+ throw new IllegalArgumentException(
+ "ReportTimeoutAsErrors property may only be set if ReportTimeout property is also set!");
}
}
@@ -312,6 +312,7 @@
// Load existing aggregation
Lock lock = getLockManager().getLock(correlationId);
lock.lock();
+ boolean removeLock = true;
try {
Object aggregation = store.load(correlationId);
Date timeout = null;
@@ -335,6 +336,7 @@
}
exchanges.add(exchange);
store.store(correlationId + "-exchanges", exchanges);
+ removeLock = false;
}
if (addMessage(aggregation, in, exchange)) {
sendAggregate(processCorrelationId, correlationId, aggregation, false, isSynchronous(exchange));
@@ -351,6 +353,7 @@
}, timeout);
timers.put(correlationId, t);
}
+ removeLock = false;
}
if (!reportErrors) {
done(exchange);
@@ -363,7 +366,14 @@
}
}
} finally {
- lock.unlock();
+ try {
+ lock.unlock();
+ } catch (Exception ex) {
+ LOG.info("Caught exception while attempting to release aggregation lock", ex);
+ }
+ if (removeLock) {
+ lockManager.removeLock(correlationId);
+ }
}
}
@@ -415,7 +425,7 @@
}
}
closeAggregation(correlationId);
- } else {
+ } else {
sendAggregate(processCorrelationId, correlationId, aggregation, true, isSynchronous());
}
} else if (!isAggregationClosed(correlationId)) {
@@ -428,7 +438,12 @@
} catch (Exception e) {
LOG.info("Caught exception while processing timeout aggregation", e);
} finally {
- lock.unlock();
+ try {
+ lock.unlock();
+ } catch (Exception ex) {
+ LOG.info("Caught exception while attempting to release timeout aggregation lock", ex);
+ }
+ lockManager.removeLock(correlationId);
}
}
Modified: servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/WireTapTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/WireTapTest.java?rev=742323&r1=742322&r2=742323&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/WireTapTest.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/test/java/org/apache/servicemix/eip/WireTapTest.java Mon Feb 9 08:06:05 2009
@@ -125,10 +125,19 @@
assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
assertNotNull(me.getFault());
client.done(me);
+
+ me = client.createRobustInOnlyExchange();
+ me.setService(new QName("wireTap"));
+ me.getInMessage().setContent(createSource("<hello/>"));
+ client.send(me);
+ me = (RobustInOnly) client.receive();
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertNotNull(me.getFault());
+ client.done(me);
- inReceiver.getMessageList().assertMessagesReceived(1);
+ inReceiver.getMessageList().assertMessagesReceived(2);
outReceiver.getMessageList().assertMessagesReceived(0);
- faultReceiver.getMessageList().assertMessagesReceived(1);
+ faultReceiver.getMessageList().assertMessagesReceived(2);
listener.assertExchangeCompleted();
}
@@ -144,9 +153,19 @@
assertNotNull(me.getFault());
client.fail(me, new Exception("I do not like faults"));
- inReceiver.getMessageList().assertMessagesReceived(1);
+ me = client.createRobustInOnlyExchange();
+ me.setService(new QName("wireTap"));
+ me.getInMessage().setContent(createSource("<hello/>"));
+ client.send(me);
+
+ me = (RobustInOnly) client.receive();
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertNotNull(me.getFault());
+ client.fail(me, new Exception("I do not like faults"));
+
+ inReceiver.getMessageList().assertMessagesReceived(2);
outReceiver.getMessageList().assertMessagesReceived(0);
- faultReceiver.getMessageList().assertMessagesReceived(1);
+ faultReceiver.getMessageList().assertMessagesReceived(2);
listener.assertExchangeCompleted();
}