You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/04 11:22:35 UTC
svn commit: r918924 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/aggregate/
main/java/org/apache/camel/util/
test/java/org/apache/camel/processor/aggregator/
test/java/org/apache/camel/util/
Author: davsclaus
Date: Thu Mar 4 10:22:34 2010
New Revision: 918924
URL: http://svn.apache.org/viewvc?rev=918924&view=rev
Log:
CAMEL-2521: TimeoutMap now evicts values sorted so first expired first and last expired last.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=918924&r1=918923&r2=918924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Thu Mar 4 10:22:34 2010
@@ -41,7 +41,6 @@
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.TimeoutMap;
-import org.apache.camel.util.TimeoutMapEntry;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -436,17 +435,14 @@
super(executor, requestMapPollTimeMillis);
}
- protected boolean isValidForEviction(TimeoutMapEntry<Object, Exchange> entry) {
- Object key = entry.getKey();
- Exchange exchange = entry.getValue();
-
+ @Override
+ public void onEviction(Object key, Exchange exchange) {
if (log.isDebugEnabled()) {
log.debug("Completion timeout triggered for correlation key: " + key);
}
exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
onCompletion(key, exchange, true);
- return true;
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java?rev=918924&r1=918923&r2=918924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java Thu Mar 4 10:22:34 2010
@@ -16,6 +16,10 @@
*/
package org.apache.camel.util;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -127,15 +131,44 @@
}
long now = currentTime();
+ List<TimeoutMapEntry<K, V>> expired = new ArrayList<TimeoutMapEntry<K,V>>();
+
lock.lock();
try {
+ // need to find the expired entries and add to the expired list
for (Map.Entry<K, TimeoutMapEntry<K, V>> entry : map.entrySet()) {
if (entry.getValue().getExpireTime() < now) {
if (isValidForEviction(entry.getValue())) {
if (log.isDebugEnabled()) {
log.debug("Evicting inactive request for correlationID: " + entry);
}
- map.remove(entry.getKey(), entry.getValue());
+ expired.add(entry.getValue());
+ }
+ }
+ }
+
+ // if we found any expired then we need to sort, onEviction and remove
+ if (!expired.isEmpty()) {
+ // sort according to the expired time so we got the first expired first
+ Collections.sort(expired, new Comparator<TimeoutMapEntry<K, V>>() {
+ public int compare(TimeoutMapEntry<K, V> a, TimeoutMapEntry<K, V> b) {
+ long diff = a.getExpireTime() - b.getExpireTime();
+ if (diff == 0) {
+ return 0;
+ }
+ return diff > 0 ? 1 : -1;
+ }
+ });
+
+ try {
+ // now fire eviction notification
+ for (TimeoutMapEntry<K, V> entry : expired) {
+ onEviction(entry.getKey(), entry.getValue());
+ }
+ } finally {
+ // and must remove from list after we have fired the notifications
+ for (TimeoutMapEntry<K, V> entry : expired) {
+ map.remove(entry.getKey());
}
}
}
@@ -174,6 +207,10 @@
return true;
}
+ public void onEviction(K key, V value) {
+ // noop
+ }
+
protected void updateExpireTime(TimeoutMapEntry entry) {
long now = currentTime();
entry.setExpireTime(entry.getTimeout() + now);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java?rev=918924&r1=918923&r2=918924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java Thu Mar 4 10:22:34 2010
@@ -34,21 +34,37 @@
/**
* Returns a copy of the keys in the map
+ *
+ * @return the keys
*/
Object[] getKeys();
/**
* Returns the size of the map
+ *
+ * @return the size
*/
int size();
/**
* Adds the key value pair into the map such that some time after the given
* timeout the entry will be evicted
+ *
+ * @param key the key
+ * @param value the value
+ * @param timeoutMillis timeout in millis
*/
void put(K key, V value, long timeoutMillis);
/**
+ * Callback when the value has been evicted
+ *
+ * @param key the key
+ * @param value the value
+ */
+ void onEviction(K key, V value);
+
+ /**
* Removes the object with the given key
*
* @param key key for the object to remove
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java?rev=918924&r1=918923&r2=918924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java Thu Mar 4 10:22:34 2010
@@ -43,7 +43,7 @@
template.sendBodyAndHeader("direct:start", "180", "foo", "B");
// to force B to timeout first as A is added last
- Thread.sleep(100);
+ Thread.sleep(250);
template.sendBodyAndHeader("direct:start", "120", "foo", "A");
assertMockEndpointsSatisfied();
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java?rev=918924&r1=918923&r2=918924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java Thu Mar 4 10:22:34 2010
@@ -16,6 +16,8 @@
*/
package org.apache.camel.util;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import junit.framework.TestCase;
@@ -118,4 +120,44 @@
assertSame(e, map.getExecutor());
}
+ public void testExpiredInCorrectOrder() throws Exception {
+ final List<String> keys = new ArrayList<String>();
+ final List<Integer> values = new ArrayList<Integer>();
+
+ DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>() {
+ @Override
+ public void onEviction(String key, Integer value) {
+ keys.add(key);
+ values.add(value);
+ }
+ };
+ assertEquals(0, map.size());
+
+ map.put("A", 1, 500);
+ map.put("B", 2, 300);
+ map.put("C", 3, 400);
+ map.put("D", 4, 200);
+ map.put("E", 5, 400);
+ // is not expired
+ map.put("F", 6, 8000);
+
+ Thread.sleep(2000);
+
+ // force purge
+ map.purge();
+
+ assertEquals("D", keys.get(0));
+ assertEquals(4, values.get(0).intValue());
+ assertEquals("B", keys.get(1));
+ assertEquals(2, values.get(1).intValue());
+ assertEquals("C", keys.get(2));
+ assertEquals(3, values.get(2).intValue());
+ assertEquals("E", keys.get(3));
+ assertEquals(5, values.get(3).intValue());
+ assertEquals("A", keys.get(4));
+ assertEquals(1, values.get(4).intValue());
+
+ assertEquals(1, map.size());
+ }
+
}