You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/08/21 15:28:32 UTC

svn commit: r687751 - /servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java

Author: gertv
Date: Thu Aug 21 06:28:31 2008
New Revision: 687751

URL: http://svn.apache.org/viewvc?rev=687751&view=rev
Log:
SM-1518: Aggregator keeps a list of closed aggregations causing a slight memory leak

Modified:
    servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java

Modified: servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=687751&r1=687750&r2=687751&view=diff
==============================================================================
--- servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/components/engines/servicemix-eip/trunk/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Thu Aug 21 06:28:31 2008
@@ -29,9 +29,13 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.eip.EIPEndpoint;
 import org.apache.servicemix.common.JbiConstants;
 import org.apache.servicemix.common.util.MessageUtil;
+import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.StoreFactory;
+import org.apache.servicemix.store.memory.MemoryStore;
+import org.apache.servicemix.store.memory.MemoryStoreFactory;
 import org.apache.servicemix.timers.Timer;
 import org.apache.servicemix.timers.TimerListener;
 
@@ -41,7 +45,9 @@
  * <a href="http://www.enterpriseintegrationpatterns.com/Aggregator.html">Aggregator</a>
  * pattern.
  *
- * TODO: keep list of closed aggregations for a certain time
+ * Closed aggregations are being kept in a {@link Store}.  By default, we will use a simple 
+ * {@link MemoryStore}, but you can set your own {@link StoreFactory} to use other implementations.
+ * 
  * TODO: distributed lock manager
  * TODO: persistent / transactional timer
  *
@@ -58,7 +64,8 @@
     
     private boolean synchronous;
 
-    private ConcurrentMap<String, Boolean> closedAggregates = new ConcurrentHashMap<String, Boolean>();
+    private Store closedAggregates;
+    private StoreFactory closedAggregatesStoreFactory;
 
     private boolean copyProperties = true;
 
@@ -128,6 +135,24 @@
     protected void processSync(MessageExchange exchange) throws Exception {
         throw new IllegalStateException();
     }
+    
+    /**
+     * Access the currently configured {@link StoreFactory} for storing closed aggregations
+     */
+    public StoreFactory getClosedAggregatesStoreFactory() {
+        return closedAggregatesStoreFactory;
+    }
+
+    /**
+     * Set a new {@link StoreFactory} for creating the {@link Store} to hold closed aggregations
+     * 
+     * If it hasn't been set, a simple {@link MemoryStoreFactory} will be used by default.
+     * 
+     * @param closedAggregatesStoreFactory
+     */
+    public void setClosedAggregatesStoreFactory(StoreFactory closedAggregatesStoreFactory) {
+        this.closedAggregatesStoreFactory = closedAggregatesStoreFactory;
+    }
 
     /* (non-Javadoc)
      * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
@@ -135,6 +160,15 @@
     protected void processAsync(MessageExchange exchange) throws Exception {
         throw new IllegalStateException();
     }
+    
+    @Override
+    public void start() throws Exception {
+        super.start();
+        if (closedAggregatesStoreFactory == null) {
+            closedAggregatesStoreFactory = new MemoryStoreFactory();
+        }
+        closedAggregates = closedAggregatesStoreFactory.open(getService().toString() + getEndpoint() + "-closed-aggregates");
+    }
 
     /* (non-Javadoc)
      * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
@@ -261,19 +295,25 @@
      *
      * @param correlationId
      * @return
+     * @throws Exception 
      */
-    protected boolean isAggregationClosed(String correlationId) {
+    protected boolean isAggregationClosed(String correlationId) throws Exception {
         // TODO: implement this using a persistent / cached behavior
-        return closedAggregates.containsKey(correlationId);
+        Object data = store.load(correlationId);
+        if (data != null) {
+            store.store(correlationId, data);
+        }
+        return data != null;
     }
 
     /**
      * Mark an aggregation as closed
      * @param correlationId
+     * @throws Exception 
      */
-    protected void closeAggregation(String correlationId) {
+    protected void closeAggregation(String correlationId) throws Exception {
         // TODO: implement this using a persistent / cached behavior
-        closedAggregates.put(correlationId, Boolean.TRUE);
+        closedAggregates.store(correlationId, Boolean.TRUE);
     }
 
     private boolean isSynchronous(MessageExchange exchange) {