You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/10/01 11:18:17 UTC
[2/2] git commit: CAMEL-7885: Restarting a timer endpoint may not
trigger at expected time the first time
CAMEL-7885: Restarting a timer endpoint may not trigger at expected time the first time
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3cc05a4b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3cc05a4b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3cc05a4b
Branch: refs/heads/master
Commit: 3cc05a4bd3ac92b543084181b1562a2ef90a3502
Parents: 9714db67
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Oct 1 11:17:14 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Oct 1 11:17:14 2014 +0200
----------------------------------------------------------------------
.../camel/component/timer/TimerComponent.java | 39 ++++++++++++++++---
.../camel/component/timer/TimerConsumer.java | 13 ++++++-
.../camel/component/timer/TimerEndpoint.java | 41 ++++++++++++++------
3 files changed, 74 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3cc05a4b/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java b/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java
index aed97e0..ec67f01 100644
--- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java
+++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java
@@ -22,6 +22,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.UriEndpointComponent;
@@ -34,14 +35,15 @@ import org.apache.camel.impl.UriEndpointComponent;
*/
public class TimerComponent extends UriEndpointComponent {
private final Map<String, Timer> timers = new HashMap<String, Timer>();
+ private final Map<String, AtomicInteger> refCounts = new HashMap<>();
public TimerComponent() {
super(TimerEndpoint.class);
}
- public Timer getTimer(TimerEndpoint endpoint) {
- String key = endpoint.getTimerName();
- if (!endpoint.isDaemon()) {
+ public Timer getTimer(TimerConsumer consumer) {
+ String key = consumer.getEndpoint().getTimerName();
+ if (!consumer.getEndpoint().isDaemon()) {
key = "nonDaemon:" + key;
}
@@ -50,14 +52,40 @@ public class TimerComponent extends UriEndpointComponent {
answer = timers.get(key);
if (answer == null) {
// the timer name is also the thread name, so lets resolve a name to be used
- String name = endpoint.getCamelContext().getExecutorServiceManager().resolveThreadName("timer://" + endpoint.getTimerName());
- answer = new Timer(name, endpoint.isDaemon());
+ String name = consumer.getEndpoint().getCamelContext().getExecutorServiceManager().resolveThreadName("timer://" + consumer.getEndpoint().getTimerName());
+ answer = new Timer(name, consumer.getEndpoint().isDaemon());
timers.put(key, answer);
+ // store new reference counter
+ refCounts.put(key, new AtomicInteger(1));
+ } else {
+ // increase reference counter
+ AtomicInteger counter = refCounts.get(key);
+ counter.incrementAndGet();
}
}
return answer;
}
+ public void removeTimer(TimerConsumer consumer) {
+ String key = consumer.getEndpoint().getTimerName();
+ if (!consumer.getEndpoint().isDaemon()) {
+ key = "nonDaemon:" + key;
+ }
+
+ synchronized (timers) {
+ // decrease reference counter
+ AtomicInteger counter = refCounts.get(key);
+ if (counter.decrementAndGet() <= 0) {
+ refCounts.remove(key);
+ // remove timer as its no longer in use
+ Timer timer = timers.remove(key);
+ if (timer != null) {
+ timer.cancel();
+ }
+ }
+ }
+ }
+
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
TimerEndpoint answer = new TimerEndpoint(uri, this, remaining);
@@ -89,5 +117,6 @@ public class TimerComponent extends UriEndpointComponent {
timer.cancel();
}
timers.clear();
+ refCounts.clear();
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3cc05a4b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
index 66617d2..d0b2e6b 100644
--- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.StartupListener;
@@ -47,6 +48,11 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener {
}
@Override
+ public TimerEndpoint getEndpoint() {
+ return (TimerEndpoint) super.getEndpoint();
+ }
+
+ @Override
protected void doStart() throws Exception {
task = new TimerTask() {
// counter
@@ -81,7 +87,7 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener {
// only configure task if CamelContext already started, otherwise the StartupListener
// is configuring the task later
if (!configured && endpoint.getCamelContext().getStatus().isStarted()) {
- Timer timer = endpoint.getTimer();
+ Timer timer = endpoint.getTimer(this);
configureTask(task, timer);
}
}
@@ -93,12 +99,15 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener {
}
task = null;
configured = false;
+
+ // remove timer
+ endpoint.removeTimer(this);
}
@Override
public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception {
if (task != null && !configured) {
- Timer timer = endpoint.getTimer();
+ Timer timer = endpoint.getTimer(this);
configureTask(task, timer);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3cc05a4b/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
index 5b4dc3f..89c170e 100644
--- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
@@ -64,6 +64,11 @@ public class TimerEndpoint extends DefaultEndpoint implements MultipleConsumersS
this.timerName = timerName;
}
+ @Override
+ public TimerComponent getComponent() {
+ return (TimerComponent) super.getComponent();
+ }
+
public Producer createProducer() throws Exception {
throw new RuntimeCamelException("Cannot produce to a TimerEndpoint: " + getEndpointUri());
}
@@ -167,18 +172,6 @@ public class TimerEndpoint extends DefaultEndpoint implements MultipleConsumersS
return true;
}
- public synchronized Timer getTimer() {
- if (timer == null) {
- TimerComponent tc = (TimerComponent)getComponent();
- timer = tc.getTimer(this);
- }
- return timer;
- }
-
- public synchronized void setTimer(Timer timer) {
- this.timer = timer;
- }
-
@ManagedAttribute(description = "Camel id")
public String getCamelId() {
return this.getCamelContext().getName();
@@ -198,4 +191,28 @@ public class TimerEndpoint extends DefaultEndpoint implements MultipleConsumersS
public String getState() {
return getStatus().name();
}
+
+ protected TimerEndpoint(String endpointUri, Component component) {
+ super(endpointUri, component);
+ }
+
+ public Timer getTimer(TimerConsumer consumer) {
+ if (timer != null) {
+ // use custom timer
+ return timer;
+ }
+ return getComponent().getTimer(consumer);
+ }
+
+ public void setTimer(Timer timer) {
+ this.timer = timer;
+ }
+
+ public void removeTimer(TimerConsumer consumer) {
+ if (timer == null) {
+ // only remove timer if we are not using a custom timer
+ getComponent().removeTimer(consumer);
+ }
+ }
+
}