You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/08/21 15:28:32 UTC
svn commit: r687751 -
/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
Author: gertv
Date: Thu Aug 21 06:28:31 2008
New Revision: 687751
URL: http://svn.apache.org/viewvc?rev=687751&view=rev
Log:
SM-1518: Aggregator keeps a list of closed aggregations causing a slight memory leak
Modified:
servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=687751&r1=687750&r2=687751&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Thu Aug 21 06:28:31 2008
@@ -29,9 +29,13 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.eip.EIPEndpoint;
import org.apache.servicemix.common.JbiConstants;
import org.apache.servicemix.common.util.MessageUtil;
+import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.StoreFactory;
+import org.apache.servicemix.store.memory.MemoryStore;
+import org.apache.servicemix.store.memory.MemoryStoreFactory;
import org.apache.servicemix.timers.Timer;
import org.apache.servicemix.timers.TimerListener;
@@ -41,7 +45,9 @@
* <a href="http://www.enterpriseintegrationpatterns.com/Aggregator.html">Aggregator</a>
* pattern.
*
- * TODO: keep list of closed aggregations for a certain time
+ * Closed aggregations are being kept in a {@link Store}. By default, we will use a simple
+ * {@link MemoryStore}, but you can set your own {@link StoreFactory} to use other implementations.
+ *
* TODO: distributed lock manager
* TODO: persistent / transactional timer
*
@@ -58,7 +64,8 @@
private boolean synchronous;
- private ConcurrentMap<String, Boolean> closedAggregates = new ConcurrentHashMap<String, Boolean>();
+ private Store closedAggregates;
+ private StoreFactory closedAggregatesStoreFactory;
private boolean copyProperties = true;
@@ -128,6 +135,24 @@
protected void processSync(MessageExchange exchange) throws Exception {
throw new IllegalStateException();
}
+
+ /**
+ * Access the currently configured {@link StoreFactory} for storing closed aggregations
+ */
+ public StoreFactory getClosedAggregatesStoreFactory() {
+ return closedAggregatesStoreFactory;
+ }
+
+ /**
+ * Set a new {@link StoreFactory} for creating the {@link Store} to hold closed aggregations
+ *
+ * If it hasn't been set, a simple {@link MemoryStoreFactory} will be used by default.
+ *
+ * @param closedAggregatesStoreFactory
+ */
+ public void setClosedAggregatesStoreFactory(StoreFactory closedAggregatesStoreFactory) {
+ this.closedAggregatesStoreFactory = closedAggregatesStoreFactory;
+ }
/* (non-Javadoc)
* @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
@@ -135,6 +160,15 @@
protected void processAsync(MessageExchange exchange) throws Exception {
throw new IllegalStateException();
}
+
+ @Override
+ public void start() throws Exception {
+ super.start();
+ if (closedAggregatesStoreFactory == null) {
+ closedAggregatesStoreFactory = new MemoryStoreFactory();
+ }
+ closedAggregates = closedAggregatesStoreFactory.open(getService().toString() + getEndpoint() + "-closed-aggregates");
+ }
/* (non-Javadoc)
* @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
@@ -261,19 +295,25 @@
*
* @param correlationId
* @return
+ * @throws Exception
*/
- protected boolean isAggregationClosed(String correlationId) {
+ protected boolean isAggregationClosed(String correlationId) throws Exception {
// TODO: implement this using a persistent / cached behavior
- return closedAggregates.containsKey(correlationId);
+ Object data = store.load(correlationId);
+ if (data != null) {
+ store.store(correlationId, data);
+ }
+ return data != null;
}
/**
* Mark an aggregation as closed
* @param correlationId
+ * @throws Exception
*/
- protected void closeAggregation(String correlationId) {
+ protected void closeAggregation(String correlationId) throws Exception {
// TODO: implement this using a persistent / cached behavior
- closedAggregates.put(correlationId, Boolean.TRUE);
+ closedAggregates.store(correlationId, Boolean.TRUE);
}
private boolean isSynchronous(MessageExchange exchange) {