You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/12/09 07:37:31 UTC
svn commit: r1043844 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/util/DefaultTimeoutMap.java
main/java/org/apache/camel/util/TimeoutMapEntry.java
test/java/org/apache/camel/util/DefaultTimeoutMapTest.java
Author: davsclaus
Date: Thu Dec 9 06:37:30 2010
New Revision: 1043844
URL: http://svn.apache.org/viewvc?rev=1043844&view=rev
Log:
CAMEL-3416: DefaultTimeoutMap now forces a executor service in the ctr.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java
camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java?rev=1043844&r1=1043843&r2=1043844&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java Thu Dec 9 06:37:30 2010
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.camel.Service;
+import org.apache.camel.impl.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,10 +38,14 @@ import org.apache.commons.logging.LogFac
* <p/>
* This implementation supports thread safe and non thread safe, in the manner you can enable locking or not.
* By default locking is enabled and thus we are thread safe.
+ * <p/>
+ * You must provide a {@link java.util.concurrent.ScheduledExecutorService} in the constructor which is used
+ * to schedule a background task which check for old entries to purge. This implementation will shutdown the scheduler
+ * if its being stopped.
*
* @version $Revision$
*/
-public class DefaultTimeoutMap<K, V> implements TimeoutMap<K, V>, Runnable, Service {
+public class DefaultTimeoutMap<K, V> extends ServiceSupport implements TimeoutMap<K, V>, Runnable {
protected final transient Log log = LogFactory.getLog(getClass());
@@ -52,12 +56,8 @@ public class DefaultTimeoutMap<K, V> imp
private final Lock lock = new ReentrantLock();
private boolean useLock = true;
- public DefaultTimeoutMap() {
- this(null, 1000L);
- }
-
- public DefaultTimeoutMap(boolean useLock) {
- this(null, 1000L, useLock);
+ public DefaultTimeoutMap(ScheduledExecutorService executor) {
+ this(executor, 1000);
}
public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
@@ -65,6 +65,7 @@ public class DefaultTimeoutMap<K, V> imp
}
public DefaultTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis, boolean useLock) {
+ ObjectHelper.notNull(executor, "ScheduledExecutorService");
this.executor = executor;
this.purgePollTime = requestMapPollTimeMillis;
this.useLock = useLock;
@@ -147,14 +148,18 @@ public class DefaultTimeoutMap<K, V> imp
* The timer task which purges old requests and schedules another poll
*/
public void run() {
- if (log.isTraceEnabled()) {
- log.trace("Running purge task to see if any entries has been timed out");
+ // only run if allowed
+ if (!isRunAllowed()) {
+ log.trace("Purge task not allowed to run");
+ return;
}
+
+ log.trace("Running purge task to see if any entries has been timed out");
try {
purge();
} catch (Throwable t) {
// must catch and log exception otherwise the executor will now schedule next run
- log.error("Exception occurred during purge task", t);
+ log.warn("Exception occurred during purge task. This exception will be ignored.", t);
}
}
@@ -175,7 +180,7 @@ public class DefaultTimeoutMap<K, V> imp
if (entry.getValue().getExpireTime() < now) {
if (isValidForEviction(entry.getValue())) {
if (log.isDebugEnabled()) {
- log.debug("Evicting inactive request for correlationID: " + entry);
+ log.debug("Evicting inactive entry ID: " + entry.getValue());
}
expired.add(entry.getValue());
}
@@ -237,9 +242,7 @@ public class DefaultTimeoutMap<K, V> imp
* lets schedule each time to allow folks to change the time at runtime
*/
protected void schedulePoll() {
- if (executor != null) {
- executor.scheduleWithFixedDelay(this, initialDelay, purgePollTime, TimeUnit.MILLISECONDS);
- }
+ executor.scheduleWithFixedDelay(this, initialDelay, purgePollTime, TimeUnit.MILLISECONDS);
}
/**
@@ -262,13 +265,17 @@ public class DefaultTimeoutMap<K, V> imp
return System.currentTimeMillis();
}
- public void start() throws Exception {
+ @Override
+ protected void doStart() throws Exception {
+ if (executor.isShutdown()) {
+ throw new IllegalStateException("The ScheduledExecutorService is shutdown");
+ }
}
- public void stop() throws Exception {
- if (executor != null) {
- executor.shutdown();
- }
+ @Override
+ protected void doStop() throws Exception {
+ // clear map if we stop
map.clear();
}
+
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java?rev=1043844&r1=1043843&r2=1043844&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/TimeoutMapEntry.java Thu Dec 9 06:37:30 2010
@@ -87,6 +87,6 @@ public class TimeoutMapEntry<K, V> imple
}
public String toString() {
- return "Entry for key: " + key;
+ return key + " (times out after " + timeout + " millis)";
}
}
\ No newline at end of file
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java?rev=1043844&r1=1043843&r2=1043844&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/DefaultTimeoutMapTest.java Thu Dec 9 06:37:30 2010
@@ -19,6 +19,7 @@ package org.apache.camel.util;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import junit.framework.TestCase;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
@@ -28,15 +29,17 @@ import org.apache.camel.util.concurrent.
*/
public class DefaultTimeoutMapTest extends TestCase {
+ private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
+
public void testDefaultTimeoutMap() {
- DefaultTimeoutMap map = new DefaultTimeoutMap();
+ DefaultTimeoutMap map = new DefaultTimeoutMap(executor);
assertTrue(map.currentTime() > 0);
assertEquals(0, map.size());
}
public void testDefaultTimeoutMapPurge() throws Exception {
- DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>();
+ DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>(executor);
assertTrue(map.currentTime() > 0);
assertEquals(0, map.size());
@@ -44,16 +47,13 @@ public class DefaultTimeoutMapTest exten
map.put("A", 123, 500);
assertEquals(1, map.size());
- Thread.sleep(2000);
-
- // will purge and remove old entries
- map.purge();
+ Thread.sleep(1500);
assertEquals(0, map.size());
}
- public void testDefaultTimeoutMapGetPurge() throws Exception {
- DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>();
+ public void testDefaultTimeoutMapForcePurge() throws Exception {
+ DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>(executor);
assertTrue(map.currentTime() > 0);
assertEquals(0, map.size());
@@ -61,19 +61,16 @@ public class DefaultTimeoutMapTest exten
map.put("A", 123, 500);
assertEquals(1, map.size());
- Thread.sleep(2000);
-
- assertEquals(123, (int)map.get("A"));
+ Thread.sleep(1500);
// will purge and remove old entries
map.purge();
- // but we just used get to get it so its refreshed
- assertEquals(1, map.size());
+ assertEquals(0, map.size());
}
public void testDefaultTimeoutMapGetRemove() throws Exception {
- DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>();
+ DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>(executor);
assertTrue(map.currentTime() > 0);
assertEquals(0, map.size());
@@ -90,7 +87,7 @@ public class DefaultTimeoutMapTest exten
}
public void testDefaultTimeoutMapGetKeys() throws Exception {
- DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>();
+ DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>(executor);
assertTrue(map.currentTime() > 0);
assertEquals(0, map.size());
@@ -115,7 +112,7 @@ public class DefaultTimeoutMapTest exten
Thread.sleep(2000);
- // should be gone now
+ // should have been timed out now
assertEquals(0, map.size());
assertSame(e, map.getExecutor());
@@ -125,7 +122,7 @@ public class DefaultTimeoutMapTest exten
final List<String> keys = new ArrayList<String>();
final List<Integer> values = new ArrayList<Integer>();
- DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>() {
+ DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>(executor) {
@Override
public boolean onEviction(String key, Integer value) {
keys.add(key);
@@ -143,7 +140,7 @@ public class DefaultTimeoutMapTest exten
// is not expired
map.put("F", 6, 8000);
- Thread.sleep(2000);
+ Thread.sleep(1500);
// force purge
map.purge();
@@ -166,7 +163,7 @@ public class DefaultTimeoutMapTest exten
final List<String> keys = new ArrayList<String>();
final List<Integer> values = new ArrayList<Integer>();
- DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>() {
+ DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>(executor) {
@Override
public boolean onEviction(String key, Integer value) {
// do not evict special key
@@ -202,4 +199,28 @@ public class DefaultTimeoutMapTest exten
assertEquals(Integer.valueOf(9), map.get("gold"));
}
+ public void testDefaultTimeoutMapStopStart() throws Exception {
+ DefaultTimeoutMap<String, Integer> map = new DefaultTimeoutMap<String, Integer>(executor);
+ map.put("A", 1, 5000);
+
+ assertEquals(1, map.size());
+ map.stop();
+
+ assertEquals(0, map.size());
+ map.put("A", 1, 500);
+
+ // should not timeout as the scheduler doesn't run
+
+ Thread.sleep(1000);
+ assertEquals(1, map.size());
+
+ // start
+ map.start();
+
+ // start and wait for scheduler to purge
+ Thread.sleep(2000);
+ // now it should be gone
+ assertEquals(0, map.size());
+ }
+
}