You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/12/09 07:37:31 UTC

svn commit: r1043844 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/util/DefaultTimeoutMap.java main/java/org/apache/camel/util/TimeoutMapEntry.java test/java/org/apache/camel/util/DefaultTimeoutMapTest.java

Author: davsclaus
Date: Thu Dec  9 06:37:30 2010
New Revision: 1043844

URL: http://svn.apache.org/viewvc?rev=1043844&view=rev
Log:
CAMEL-3416: DefaultTimeoutMap now forces a executor service in the ctr.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java?rev=1043844&r1=1043843&r2=1043844&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java Thu Dec  9 06:37:30 2010
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.camel.Service;
+import org.apache.camel.impl.ServiceSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -38,10 +38,14 @@ import org.apache.commons.logging.LogFac
  * <p/>
  * This implementation supports thread safe and non thread safe, in the manner you can enable locking or not.
  * By default locking is enabled and thus we are thread safe.
+ * <p/>
+ * You must provide a {@link java.util.concurrent.ScheduledExecutorService} in the constructor which is used
+ * to schedule a background task which check for old entries to purge. This implementation will shutdown the scheduler
+ * if its being stopped.
  *
  * @version $Revision$
  */
-public class DefaultTimeoutMap<K, V> implements TimeoutMap<K, V>, Runnable, Service {
+public class DefaultTimeoutMap<K, V> extends ServiceSupport implements TimeoutMap<K, V>, Runnable {
 
     protected final transient Log log = LogFactory.getLog(getClass());
 
@@ -52,12 +56,8 @@ public class DefaultTimeoutMap<K, V> imp
     private final Lock lock = new ReentrantLock();
     private boolean useLock = true;
 
-    public DefaultTimeoutMap() {
-        this(null, 1000L);
-    }
-
-    public DefaultTimeoutMap(boolean useLock) {
-        this(null, 1000L, useLock);
+    public DefaultTimeoutMap(ScheduledExecutorService executor) {
+        this(executor, 1000);
     }
 
     public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
@@ -65,6 +65,7 @@ public class DefaultTimeoutMap<K, V> imp
     }
 
     public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis, boolean useLock) {
+        ObjectHelper.notNull(executor, "ScheduledExecutorService");
         this.executor = executor;
         this.purgePollTime = requestMapPollTimeMillis;
         this.useLock = useLock;
@@ -147,14 +148,18 @@ public class DefaultTimeoutMap<K, V> imp
      * The timer task which purges old requests and schedules another poll
      */
     public void run() {
-        if (log.isTraceEnabled()) {
-            log.trace("Running purge task to see if any entries has been timed out");
+        // only run if allowed
+        if (!isRunAllowed()) {
+            log.trace("Purge task not allowed to run");
+            return;
         }
+
+        log.trace("Running purge task to see if any entries has been timed out");
         try {
             purge();
         } catch (Throwable t) {
             // must catch and log exception otherwise the executor will now schedule next run
-            log.error("Exception occurred during purge task", t);
+            log.warn("Exception occurred during purge task. This exception will be ignored.", t);
         }
     }
 
@@ -175,7 +180,7 @@ public class DefaultTimeoutMap<K, V> imp
                 if (entry.getValue().getExpireTime() < now) {
                     if (isValidForEviction(entry.getValue())) {
                         if (log.isDebugEnabled()) {
-                            log.debug("Evicting inactive request for correlationID: " + entry);
+                            log.debug("Evicting inactive entry ID: " + entry.getValue());
                         }
                         expired.add(entry.getValue());
                     }
@@ -237,9 +242,7 @@ public class DefaultTimeoutMap<K, V> imp
      * lets schedule each time to allow folks to change the time at runtime
      */
     protected void schedulePoll() {
-        if (executor != null) {
-            executor.scheduleWithFixedDelay(this, initialDelay, purgePollTime, TimeUnit.MILLISECONDS);
-        }
+        executor.scheduleWithFixedDelay(this, initialDelay, purgePollTime, TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -262,13 +265,17 @@ public class DefaultTimeoutMap<K, V> imp
         return System.currentTimeMillis();
     }
 
-    public void start() throws Exception {
+    @Override
+    protected void doStart() throws Exception {
+        if (executor.isShutdown()) {
+            throw new IllegalStateException("The ScheduledExecutorService is shutdown");
+        }
     }
 
-    public void stop() throws Exception {
-        if (executor != null) {
-            executor.shutdown();
-        }
+    @Override
+    protected void doStop() throws Exception {
+        // clear map if we stop
         map.clear();
     }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java?rev=1043844&r1=1043843&r2=1043844&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java Thu Dec  9 06:37:30 2010
@@ -87,6 +87,6 @@ public class TimeoutMapEntry<K, V> imple
     }
 
     public String toString() {
-        return "Entry for key: " + key;
+        return key + " (times out after " + timeout + " millis)";
     }
 }
\ No newline at end of file

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java?rev=1043844&r1=1043843&r2=1043844&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java Thu Dec  9 06:37:30 2010
@@ -19,6 +19,7 @@ package org.apache.camel.util;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import junit.framework.TestCase;
 import org.apache.camel.util.concurrent.ExecutorServiceHelper;
@@ -28,15 +29,17 @@ import org.apache.camel.util.concurrent.
  */
 public class DefaultTimeoutMapTest extends TestCase {
 
+    private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
+
     public void testDefaultTimeoutMap() {
-        DefaultTimeoutMap map = new DefaultTimeoutMap();
+        DefaultTimeoutMap map = new DefaultTimeoutMap(executor);
         assertTrue(map.currentTime() > 0);
 
         assertEquals(0, map.size());
     }
 
     public void testDefaultTimeoutMapPurge() throws Exception {
-        DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>();
+        DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>(executor);
         assertTrue(map.currentTime() > 0);
 
         assertEquals(0, map.size());
@@ -44,16 +47,13 @@ public class DefaultTimeoutMapTest exten
         map.put("A", 123, 500);
         assertEquals(1, map.size());
 
-        Thread.sleep(2000);
-
-        // will purge and remove old entries
-        map.purge();
+        Thread.sleep(1500);
 
         assertEquals(0, map.size());
     }
 
-    public void testDefaultTimeoutMapGetPurge() throws Exception {
-        DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>();
+    public void testDefaultTimeoutMapForcePurge() throws Exception {
+        DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>(executor);
         assertTrue(map.currentTime() > 0);
 
         assertEquals(0, map.size());
@@ -61,19 +61,16 @@ public class DefaultTimeoutMapTest exten
         map.put("A", 123, 500);
         assertEquals(1, map.size());
 
-        Thread.sleep(2000);
-
-        assertEquals(123, (int)map.get("A"));
+        Thread.sleep(1500);
 
         // will purge and remove old entries
         map.purge();
 
-        // but we just used get to get it so its refreshed
-        assertEquals(1, map.size());
+        assertEquals(0, map.size());
     }
 
     public void testDefaultTimeoutMapGetRemove() throws Exception {
-        DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>();
+        DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>(executor);
         assertTrue(map.currentTime() > 0);
 
         assertEquals(0, map.size());
@@ -90,7 +87,7 @@ public class DefaultTimeoutMapTest exten
     }
 
     public void testDefaultTimeoutMapGetKeys() throws Exception {
-        DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>();
+        DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>(executor);
         assertTrue(map.currentTime() > 0);
 
         assertEquals(0, map.size());
@@ -115,7 +112,7 @@ public class DefaultTimeoutMapTest exten
 
         Thread.sleep(2000);
 
-        // should be gone now
+        // should have been timed out now
         assertEquals(0, map.size());
 
         assertSame(e, map.getExecutor());
@@ -125,7 +122,7 @@ public class DefaultTimeoutMapTest exten
         final List<String> keys = new ArrayList<String>();
         final List<Integer> values = new ArrayList<Integer>();
 
-        DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>() {
+        DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>(executor) {
             @Override
             public boolean onEviction(String key, Integer value) {
                 keys.add(key);
@@ -143,7 +140,7 @@ public class DefaultTimeoutMapTest exten
         // is not expired
         map.put("F", 6, 8000);
 
-        Thread.sleep(2000);
+        Thread.sleep(1500);
 
         // force purge
         map.purge();
@@ -166,7 +163,7 @@ public class DefaultTimeoutMapTest exten
         final List<String> keys = new ArrayList<String>();
         final List<Integer> values = new ArrayList<Integer>();
 
-        DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>() {
+        DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>(executor) {
             @Override
             public boolean onEviction(String key, Integer value) {
                 // do not evict special key
@@ -202,4 +199,28 @@ public class DefaultTimeoutMapTest exten
         assertEquals(Integer.valueOf(9), map.get("gold"));
     }
 
+    public void testDefaultTimeoutMapStopStart() throws Exception {
+        DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>(executor);
+        map.put("A", 1, 5000);
+
+        assertEquals(1, map.size());
+        map.stop();
+
+        assertEquals(0, map.size());
+        map.put("A", 1, 500);
+
+        // should not timeout as the scheduler doesn't run
+
+        Thread.sleep(1000);
+        assertEquals(1, map.size());
+
+        // start
+        map.start();
+
+        // start and wait for scheduler to purge
+        Thread.sleep(2000);
+        // now it should be gone
+        assertEquals(0, map.size());
+    }
+
 }