You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/08/21 15:01:38 UTC
svn commit: r687735 - in /servicemix/smx3/branches/servicemix-3.2:
core/servicemix-services/src/main/java/org/apache/servicemix/store/memory/
core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/
deployables/serviceengines/servicem...
Author: gertv
Date: Thu Aug 21 06:01:36 2008
New Revision: 687735
URL: http://svn.apache.org/viewvc?rev=687735&view=rev
Log:
SM-1518: Aggregator keeps a list of closed aggregations causing a slight memory leak
Added:
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/MemoryStoreFactoryTest.java
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/TimeoutMemoryStoreTest.java
Modified:
servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/store/memory/MemoryStoreFactory.java
servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
Modified: servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/store/memory/MemoryStoreFactory.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/store/memory/MemoryStoreFactory.java?rev=687735&r1=687734&r2=687735&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/store/memory/MemoryStoreFactory.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/main/java/org/apache/servicemix/store/memory/MemoryStoreFactory.java Thu Aug 21 06:01:36 2008
@@ -24,10 +24,17 @@
import org.apache.servicemix.store.Store;
import org.apache.servicemix.store.StoreFactory;
+/**
+ * {@link StoreFactory} for creating memory-based {@link Store} implementations
+ *
+ * If a timeout has been specified, a {@link TimeoutMemoryStore} will be created,
+ * otherwise the factory will build a plain {@link MemoryStore}
+ */
public class MemoryStoreFactory implements StoreFactory {
private IdGenerator idGenerator = new IdGenerator();
private Map<String, MemoryStore> stores = new HashMap<String, MemoryStore>();
+ private long timeout = -1;
/* (non-Javadoc)
* @see org.apache.servicemix.store.ExchangeStoreFactory#get(java.lang.String)
@@ -35,7 +42,11 @@
public synchronized Store open(String name) throws IOException {
MemoryStore store = stores.get(name);
if (store == null) {
- store = new MemoryStore(idGenerator);
+ if (timeout <= 0) {
+ store = new MemoryStore(idGenerator);
+ } else {
+ store = new TimeoutMemoryStore(idGenerator, timeout);
+ }
stores.put(name, store);
}
return store;
@@ -48,4 +59,7 @@
stores.remove(store);
}
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
}
Added: servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/MemoryStoreFactoryTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/MemoryStoreFactoryTest.java?rev=687735&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/MemoryStoreFactoryTest.java (added)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/MemoryStoreFactoryTest.java Thu Aug 21 06:01:36 2008
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.store.memory;
+
+import junit.framework.TestCase;
+
+/**
+ * Test case for {@link MemoryStoreFactory}
+ */
+public class MemoryStoreFactoryTest extends TestCase {
+
+ private MemoryStoreFactory factory;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ factory = new MemoryStoreFactory();
+ }
+
+ public void testOpen() throws Exception {
+ assertTrue(factory.open("store1") instanceof MemoryStore);
+ factory.setTimeout(500);
+ assertTrue(factory.open("store1") instanceof MemoryStore);
+ assertTrue(factory.open("store2") instanceof TimeoutMemoryStore);
+ }
+
+}
Added: servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/TimeoutMemoryStoreTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/TimeoutMemoryStoreTest.java?rev=687735&view=auto
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/TimeoutMemoryStoreTest.java (added)
+++ servicemix/smx3/branches/servicemix-3.2/core/servicemix-services/src/test/java/org/apache/servicemix/store/memory/TimeoutMemoryStoreTest.java Thu Aug 21 06:01:36 2008
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.store.memory;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.store.Store;
+
+/**
+ * Test case for {@link TimeoutMemoryStore}
+ */
+public class TimeoutMemoryStoreTest extends TestCase {
+
+ private static final long TIMEOUT = 250l;
+
+ private Store store;
+ private final MemoryStoreFactory factory = new MemoryStoreFactory();
+
+ public TimeoutMemoryStoreTest() {
+ super();
+ factory.setTimeout(TIMEOUT);
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ store = factory.open("test");
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ factory.close(store);
+ }
+
+ public void testTimeout() throws Exception {
+ String id = store.store("Any kind of data...");
+ Object data = store.load(id);
+ assertNotNull(data);
+ //now store it again and load it after the timeout
+ store.store(id, data);
+ synchronized (this) {
+ wait(TIMEOUT * 2);
+ }
+ assertNull("Data should have been removed from store after timeout", store.load(id));
+ }
+
+}
Modified: servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java?rev=687735&r1=687734&r2=687735&view=diff
==============================================================================
--- servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java (original)
+++ servicemix/smx3/branches/servicemix-3.2/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java Thu Aug 21 06:01:36 2008
@@ -31,6 +31,10 @@
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.eip.EIPEndpoint;
import org.apache.servicemix.jbi.util.MessageUtil;
+import org.apache.servicemix.store.Store;
+import org.apache.servicemix.store.StoreFactory;
+import org.apache.servicemix.store.memory.MemoryStore;
+import org.apache.servicemix.store.memory.MemoryStoreFactory;
import org.apache.servicemix.timers.Timer;
import org.apache.servicemix.timers.TimerListener;
@@ -40,7 +44,9 @@
* <a href="http://www.enterpriseintegrationpatterns.com/Aggregator.html">Aggregator</a>
* pattern.
*
- * TODO: keep list of closed aggregations for a certain time
+ * Closed aggregations are being kept in a {@link Store}. By default, we will use a simple
+ * {@link MemoryStore}, but you can set your own {@link StoreFactory} to use other implementations.
+ *
* TODO: distributed lock manager
* TODO: persistent / transactional timer
*
@@ -57,10 +63,12 @@
private boolean synchronous;
- private ConcurrentMap<String, Boolean> closedAggregates = new ConcurrentHashMap<String, Boolean>();
+ private Store closedAggregates;
+ private StoreFactory closedAggregatesStoreFactory;
private ConcurrentMap<String, Timer> timers = new ConcurrentHashMap<String, Timer>();
+
/**
* @return the synchronous
*/
@@ -102,6 +110,24 @@
public void setTarget(ExchangeTarget target) {
this.target = target;
}
+
+ /**
+ * Access the currently configured {@link StoreFactory} for storing closed aggregations
+ */
+ public StoreFactory getClosedAggregatesStoreFactory() {
+ return closedAggregatesStoreFactory;
+ }
+
+ /**
+ * Set a new {@link StoreFactory} for creating the {@link Store} to hold closed aggregations
+ *
+ * If it hasn't been set, a simple {@link MemoryStoreFactory} will be used by default.
+ *
+ * @param closedAggregatesStoreFactory
+ */
+ public void setClosedAggregatesStoreFactory(StoreFactory closedAggregatesStoreFactory) {
+ this.closedAggregatesStoreFactory = closedAggregatesStoreFactory;
+ }
/* (non-Javadoc)
* @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
@@ -116,6 +142,15 @@
protected void processAsync(MessageExchange exchange) throws Exception {
throw new IllegalStateException();
}
+
+ @Override
+ public void start() throws Exception {
+ super.start();
+ if (closedAggregatesStoreFactory == null) {
+ closedAggregatesStoreFactory = new MemoryStoreFactory();
+ }
+ closedAggregates = closedAggregatesStoreFactory.open(getService().toString() + getEndpoint() + "-closed-aggregates");
+ }
/* (non-Javadoc)
* @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
@@ -249,19 +284,25 @@
*
* @param correlationId
* @return
+ * @throws Exception
*/
- protected boolean isAggregationClosed(String correlationId) {
+ protected boolean isAggregationClosed(String correlationId) throws Exception {
// TODO: implement this using a persistent / cached behavior
- return closedAggregates.containsKey(correlationId);
+ Object data = store.load(correlationId);
+ if (data != null) {
+ store.store(correlationId, data);
+ }
+ return data != null;
}
/**
* Mark an aggregation as closed
* @param correlationId
+ * @throws Exception
*/
- protected void closeAggregation(String correlationId) {
+ protected void closeAggregation(String correlationId) throws Exception {
// TODO: implement this using a persistent / cached behavior
- closedAggregates.put(correlationId, Boolean.TRUE);
+ closedAggregates.store(correlationId, Boolean.TRUE);
}
/**