You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/08/21 15:01:38 UTC

svn commit: r687735 - in /servicemix/smx3/branches/servicemix-3.2: core/servicemix-services/src/main/java/org/apache/servicemix/store/memory/ core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/ deployables/serviceengines/servicem...

Author: gertv
Date: Thu Aug 21 06:01:36 2008
New Revision: 687735

URL: http://svn.apache.org/viewvc?rev=687735&view=rev
Log:
SM-1518: Aggregator keeps a list of closed aggregations causing a slight memory leak

Added:
    servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/
    servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/MemoryStoreFactoryTest.java
    servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/TimeoutMemoryStoreTest.java
Modified:
    servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/store/memory/MemoryStoreFactory.java
    servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java

Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/store/memory/MemoryStoreFactory.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/store/memory/MemoryStoreFactory.java?rev=687735&r1=687734&r2=687735&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/store/memory/MemoryStoreFactory.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/store/memory/MemoryStoreFactory.java Thu Aug 21 06:01:36 2008
@@ -24,10 +24,17 @@
 import org.apache.servicemix.store.Store;
 import org.apache.servicemix.store.StoreFactory;
 
+/**
+ * {@link StoreFactory} for creating memory-based {@link Store} implementations
+ * 
+ * If a timeout has been specified, a {@link TimeoutMemoryStore} will be created,
+ * otherwise the factory will build a plain {@link MemoryStore}
+ */
 public class MemoryStoreFactory implements StoreFactory {
 
     private IdGenerator idGenerator = new IdGenerator();
     private Map<String, MemoryStore> stores = new HashMap<String, MemoryStore>();
+    private long timeout = -1;
     
     /* (non-Javadoc)
      * @see org.apache.servicemix.store.ExchangeStoreFactory#get(java.lang.String)
@@ -35,7 +42,11 @@
     public synchronized Store open(String name) throws IOException {
         MemoryStore store = stores.get(name);
         if (store == null) {
-            store = new MemoryStore(idGenerator);
+            if (timeout <= 0) {
+                store = new MemoryStore(idGenerator);
+            } else {
+                store = new TimeoutMemoryStore(idGenerator, timeout);
+            }
             stores.put(name, store);
         }
         return store;
@@ -48,4 +59,7 @@
         stores.remove(store);
     }
     
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
 }

Added: servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/MemoryStoreFactoryTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/MemoryStoreFactoryTest.java?rev=687735&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/MemoryStoreFactoryTest.java (added)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/MemoryStoreFactoryTest.java Thu Aug 21 06:01:36 2008
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.store.memory;
+
+import junit.framework.TestCase;
+
+/**
+ * Test case for {@link MemoryStoreFactory}
+ */
+public class MemoryStoreFactoryTest extends TestCase {
+    
+    private MemoryStoreFactory factory;
+    
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        factory = new MemoryStoreFactory();
+    }
+    
+    public void testOpen() throws Exception {
+        assertTrue(factory.open("store1") instanceof MemoryStore);
+        factory.setTimeout(500);
+        assertTrue(factory.open("store1") instanceof MemoryStore);
+        assertTrue(factory.open("store2") instanceof TimeoutMemoryStore);
+    }
+
+}

Added: servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/TimeoutMemoryStoreTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/TimeoutMemoryStoreTest.java?rev=687735&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/TimeoutMemoryStoreTest.java (added)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/TimeoutMemoryStoreTest.java Thu Aug 21 06:01:36 2008
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.store.memory;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.store.Store;
+
+/**
+ * Test case for {@link TimeoutMemoryStore} 
+ */
+public class TimeoutMemoryStoreTest extends TestCase {
+    
+    private static final long TIMEOUT = 250l; 
+    
+    private Store store;
+    private final MemoryStoreFactory factory = new MemoryStoreFactory();
+    
+    public TimeoutMemoryStoreTest() {
+        super();
+        factory.setTimeout(TIMEOUT);
+    }
+    
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        store = factory.open("test");
+    }
+    
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        factory.close(store);
+    }
+    
+    public void testTimeout() throws Exception {
+        String id = store.store("Any kind of data...");
+        Object data = store.load(id);
+        assertNotNull(data);
+        //now store it again and load it after the timeout
+        store.store(id, data);
+        synchronized (this) {
+            wait(TIMEOUT * 2);
+        }
+        assertNull("Data should have been removed from store after timeout", store.load(id));
+    }
+
+}

Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=687735&r1=687734&r2=687735&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Thu Aug 21 06:01:36 2008
@@ -31,6 +31,10 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.eip.EIPEndpoint;
 import org.apache.servicemix.jbi.util.MessageUtil;
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.StoreFactory;
+import org.apache.servicemix.store.memory.MemoryStore;
+import org.apache.servicemix.store.memory.MemoryStoreFactory;
 import org.apache.servicemix.timers.Timer;
 import org.apache.servicemix.timers.TimerListener;
 
@@ -40,7 +44,9 @@
  * <a href="http://www.enterpriseintegrationpatterns.com/Aggregator.html">Aggregator</a>
  * pattern.
  *
- * TODO: keep list of closed aggregations for a certain time
+ * Closed aggregations are being kept in a {@link Store}.  By default, we will use a simple 
+ * {@link MemoryStore}, but you can set your own {@link StoreFactory} to use other implementations.
+ * 
  * TODO: distributed lock manager
  * TODO: persistent / transactional timer
  *
@@ -57,10 +63,12 @@
 
     private boolean synchronous;
 
-    private ConcurrentMap<String, Boolean> closedAggregates = new ConcurrentHashMap<String, Boolean>();
+    private Store closedAggregates;
+    private StoreFactory closedAggregatesStoreFactory;
 
     private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
 
+
     /**
      * @return the synchronous
      */
@@ -102,6 +110,24 @@
     public void setTarget(ExchangeTarget target) {
         this.target = target;
     }
+    
+    /**
+     * Access the currently configured {@link StoreFactory} for storing closed aggregations
+     */
+    public StoreFactory getClosedAggregatesStoreFactory() {
+        return closedAggregatesStoreFactory;
+    }
+
+    /**
+     * Set a new {@link StoreFactory} for creating the {@link Store} to hold closed aggregations
+     * 
+     * If it hasn't been set, a simple {@link MemoryStoreFactory} will be used by default.
+     * 
+     * @param closedAggregatesStoreFactory
+     */
+    public void setClosedAggregatesStoreFactory(StoreFactory closedAggregatesStoreFactory) {
+        this.closedAggregatesStoreFactory = closedAggregatesStoreFactory;
+    }
 
     /* (non-Javadoc)
      * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
@@ -116,6 +142,15 @@
     protected void processAsync(MessageExchange exchange) throws Exception {
         throw new IllegalStateException();
     }
+    
+    @Override
+    public void start() throws Exception {
+        super.start();
+        if (closedAggregatesStoreFactory == null) {
+            closedAggregatesStoreFactory = new MemoryStoreFactory();
+        }
+        closedAggregates = closedAggregatesStoreFactory.open(getService().toString() + getEndpoint() + "-closed-aggregates");
+    }
 
     /* (non-Javadoc)
      * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
@@ -249,19 +284,25 @@
      *
      * @param correlationId
      * @return
+     * @throws Exception 
      */
-    protected boolean isAggregationClosed(String correlationId) {
+    protected boolean isAggregationClosed(String correlationId) throws Exception {
         // TODO: implement this using a persistent / cached behavior
-        return closedAggregates.containsKey(correlationId);
+        Object data = store.load(correlationId);
+        if (data != null) {
+            store.store(correlationId, data);
+        }
+        return data != null;
     }
 
     /**
      * Mark an aggregation as closed
      * @param correlationId
+     * @throws Exception 
      */
-    protected void closeAggregation(String correlationId) {
+    protected void closeAggregation(String correlationId) throws Exception {
         // TODO: implement this using a persistent / cached behavior
-        closedAggregates.put(correlationId, Boolean.TRUE);
+        closedAggregates.store(correlationId, Boolean.TRUE);
     }
     
     /**