You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/03/04 11:22:35 UTC

svn commit: r918924 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/aggregate/ main/java/org/apache/camel/util/ test/java/org/apache/camel/processor/aggregator/ test/java/org/apache/camel/util/

Author: davsclaus
Date: Thu Mar  4 10:22:34 2010
New Revision: 918924

URL: http://svn.apache.org/viewvc?rev=918924&view=rev
Log:
CAMEL-2521: TimeoutMap now evicts values sorted so first expired first and last expired last.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=918924&r1=918923&r2=918924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Thu Mar  4 10:22:34 2010
@@ -41,7 +41,6 @@
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.TimeoutMap;
-import org.apache.camel.util.TimeoutMapEntry;
 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -436,17 +435,14 @@
             super(executor, requestMapPollTimeMillis);
         }
 
-        protected boolean isValidForEviction(TimeoutMapEntry<Object, Exchange> entry) {
-            Object key = entry.getKey();
-            Exchange exchange = entry.getValue();
-
+        @Override
+        public void onEviction(Object key, Exchange exchange) {
             if (log.isDebugEnabled()) {
                 log.debug("Completion timeout triggered for correlation key: " + key);
             }
 
             exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
             onCompletion(key, exchange, true);
-            return true;
         }
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java?rev=918924&r1=918923&r2=918924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java Thu Mar  4 10:22:34 2010
@@ -16,6 +16,10 @@
  */
 package org.apache.camel.util;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -127,15 +131,44 @@
         }
         long now = currentTime();
 
+        List<TimeoutMapEntry<K, V>> expired = new ArrayList<TimeoutMapEntry<K,V>>();
+
         lock.lock();
         try {
+            // need to find the expired entries and add to the expired list
             for (Map.Entry<K, TimeoutMapEntry<K, V>> entry : map.entrySet()) {
                 if (entry.getValue().getExpireTime() < now) {
                     if (isValidForEviction(entry.getValue())) {
                         if (log.isDebugEnabled()) {
                             log.debug("Evicting inactive request for correlationID: " + entry);
                         }
-                        map.remove(entry.getKey(), entry.getValue());
+                        expired.add(entry.getValue());
+                    }
+                }
+            }
+
+            // if we found any expired then we need to sort, onEviction and remove
+            if (!expired.isEmpty()) {
+                // sort according to the expired time so we got the first expired first
+                Collections.sort(expired, new Comparator<TimeoutMapEntry<K, V>>() {
+                    public int compare(TimeoutMapEntry<K, V> a, TimeoutMapEntry<K, V> b) {
+                        long diff = a.getExpireTime() - b.getExpireTime();
+                        if (diff == 0) {
+                            return 0;
+                        }
+                        return diff > 0 ? 1 : -1;
+                    }
+                });
+
+                try {
+                    // now fire eviction notification
+                    for (TimeoutMapEntry<K, V> entry : expired) {
+                        onEviction(entry.getKey(), entry.getValue());
+                    }
+                } finally {
+                    // and must remove from list after we have fired the notifications
+                    for (TimeoutMapEntry<K, V> entry : expired) {
+                        map.remove(entry.getKey());
                     }
                 }
             }
@@ -174,6 +207,10 @@
         return true;
     }
 
+    public void onEviction(K key, V value) {
+        // noop
+    }
+
     protected void updateExpireTime(TimeoutMapEntry entry) {
         long now = currentTime();
         entry.setExpireTime(entry.getTimeout() + now);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java?rev=918924&r1=918923&r2=918924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMap.java Thu Mar  4 10:22:34 2010
@@ -34,21 +34,37 @@
 
     /**
      * Returns a copy of the keys in the map
+     *
+     * @return the keys
      */
     Object[] getKeys();
 
     /**
      * Returns the size of the map
+     *
+     * @return the size
      */
     int size();
 
     /**
      * Adds the key value pair into the map such that some time after the given
      * timeout the entry will be evicted
+     *
+     * @param key   the key
+     * @param value the value
+     * @param timeoutMillis  timeout in millis
      */
     void put(K key, V value, long timeoutMillis);
 
     /**
+     * Callback when the value has been evicted
+     *
+     * @param key the key
+     * @param value the value
+     */
+    void onEviction(K key, V value);
+
+    /**
      * Removes the object with the given key
      *
      * @param key  key for the object to remove

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java?rev=918924&r1=918923&r2=918924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java Thu Mar  4 10:22:34 2010
@@ -43,7 +43,7 @@
         template.sendBodyAndHeader("direct:start", "180", "foo", "B");
 
         // to force B to timeout first as A is added last
-        Thread.sleep(100);
+        Thread.sleep(250);
         template.sendBodyAndHeader("direct:start", "120", "foo", "A");
 
         assertMockEndpointsSatisfied();

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java?rev=918924&r1=918923&r2=918924&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java Thu Mar  4 10:22:34 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.util;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
 import junit.framework.TestCase;
@@ -118,4 +120,44 @@
         assertSame(e, map.getExecutor());
     }
 
+    public void testExpiredInCorrectOrder() throws Exception {
+        final List<String> keys = new ArrayList<String>();
+        final List<Integer> values = new ArrayList<Integer>();
+
+        DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>() {
+            @Override
+            public void onEviction(String key, Integer value) {
+                keys.add(key);
+                values.add(value);
+            }
+        };
+        assertEquals(0, map.size());
+
+        map.put("A", 1, 500);
+        map.put("B", 2, 300);
+        map.put("C", 3, 400);
+        map.put("D", 4, 200);
+        map.put("E", 5, 400);
+        // is not expired
+        map.put("F", 6, 8000);
+
+        Thread.sleep(2000);
+
+        // force purge
+        map.purge();
+
+        assertEquals("D", keys.get(0));
+        assertEquals(4, values.get(0).intValue());
+        assertEquals("B", keys.get(1));
+        assertEquals(2, values.get(1).intValue());
+        assertEquals("C", keys.get(2));
+        assertEquals(3, values.get(2).intValue());
+        assertEquals("E", keys.get(3));
+        assertEquals(5, values.get(3).intValue());
+        assertEquals("A", keys.get(4));
+        assertEquals(1, values.get(4).intValue());
+
+        assertEquals(1, map.size());
+    }
+
 }