You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/10/01 11:18:17 UTC

[2/2] git commit: CAMEL-7885: Restarting a timer endpoint may not trigger at expected time the first time

CAMEL-7885: Restarting a timer endpoint may not trigger at expected time the first time


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3cc05a4b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3cc05a4b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3cc05a4b

Branch: refs/heads/master
Commit: 3cc05a4bd3ac92b543084181b1562a2ef90a3502
Parents: 9714db67
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Oct 1 11:17:14 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Oct 1 11:17:14 2014 +0200

----------------------------------------------------------------------
 .../camel/component/timer/TimerComponent.java   | 39 ++++++++++++++++---
 .../camel/component/timer/TimerConsumer.java    | 13 ++++++-
 .../camel/component/timer/TimerEndpoint.java    | 41 ++++++++++++++------
 3 files changed, 74 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3cc05a4b/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java b/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java
index aed97e0..ec67f01 100644
--- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java
+++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerComponent.java
@@ -22,6 +22,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Timer;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.UriEndpointComponent;
@@ -34,14 +35,15 @@ import org.apache.camel.impl.UriEndpointComponent;
  */
 public class TimerComponent extends UriEndpointComponent {
     private final Map<String, Timer> timers = new HashMap<String, Timer>();
+    private final Map<String, AtomicInteger> refCounts = new HashMap<>();
 
     public TimerComponent() {
         super(TimerEndpoint.class);
     }
 
-    public Timer getTimer(TimerEndpoint endpoint) {
-        String key = endpoint.getTimerName();
-        if (!endpoint.isDaemon()) {
+    public Timer getTimer(TimerConsumer consumer) {
+        String key = consumer.getEndpoint().getTimerName();
+        if (!consumer.getEndpoint().isDaemon()) {
             key = "nonDaemon:" + key;
         }
 
@@ -50,14 +52,40 @@ public class TimerComponent extends UriEndpointComponent {
             answer = timers.get(key);
             if (answer == null) {
                 // the timer name is also the thread name, so lets resolve a name to be used
-                String name = endpoint.getCamelContext().getExecutorServiceManager().resolveThreadName("timer://" + endpoint.getTimerName());
-                answer = new Timer(name, endpoint.isDaemon());
+                String name = consumer.getEndpoint().getCamelContext().getExecutorServiceManager().resolveThreadName("timer://" + consumer.getEndpoint().getTimerName());
+                answer = new Timer(name, consumer.getEndpoint().isDaemon());
                 timers.put(key, answer);
+                // store new reference counter
+                refCounts.put(key, new AtomicInteger(1));
+            } else {
+                // increase reference counter
+                AtomicInteger counter = refCounts.get(key);
+                counter.incrementAndGet();
             }
         }
         return answer;
     }
 
+    public void removeTimer(TimerConsumer consumer) {
+        String key = consumer.getEndpoint().getTimerName();
+        if (!consumer.getEndpoint().isDaemon()) {
+            key = "nonDaemon:" + key;
+        }
+
+        synchronized (timers) {
+            // decrease reference counter
+            AtomicInteger counter = refCounts.get(key);
+            if (counter.decrementAndGet() <= 0) {
+                refCounts.remove(key);
+                // remove timer as its no longer in use
+                Timer timer = timers.remove(key);
+                if (timer != null) {
+                    timer.cancel();
+                }
+            }
+        }
+    }
+
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
         TimerEndpoint answer = new TimerEndpoint(uri, this, remaining);
@@ -89,5 +117,6 @@ public class TimerComponent extends UriEndpointComponent {
             timer.cancel();
         }
         timers.clear();
+        refCounts.clear();
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3cc05a4b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
index 66617d2..d0b2e6b 100644
--- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.StartupListener;
@@ -47,6 +48,11 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener {
     }
 
     @Override
+    public TimerEndpoint getEndpoint() {
+        return (TimerEndpoint) super.getEndpoint();
+    }
+
+    @Override
     protected void doStart() throws Exception {
         task = new TimerTask() {
             // counter
@@ -81,7 +87,7 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener {
         // only configure task if CamelContext already started, otherwise the StartupListener
         // is configuring the task later
         if (!configured && endpoint.getCamelContext().getStatus().isStarted()) {
-            Timer timer = endpoint.getTimer();
+            Timer timer = endpoint.getTimer(this);
             configureTask(task, timer);
         }
     }
@@ -93,12 +99,15 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener {
         }
         task = null;
         configured = false;
+
+        // remove timer
+        endpoint.removeTimer(this);
     }
 
     @Override
     public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception {
         if (task != null && !configured) {
-            Timer timer = endpoint.getTimer();
+            Timer timer = endpoint.getTimer(this);
             configureTask(task, timer);
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3cc05a4b/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
index 5b4dc3f..89c170e 100644
--- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerEndpoint.java
@@ -64,6 +64,11 @@ public class TimerEndpoint extends DefaultEndpoint implements MultipleConsumersS
         this.timerName = timerName;
     }
 
+    @Override
+    public TimerComponent getComponent() {
+        return (TimerComponent) super.getComponent();
+    }
+
     public Producer createProducer() throws Exception {
         throw new RuntimeCamelException("Cannot produce to a TimerEndpoint: " + getEndpointUri());
     }
@@ -167,18 +172,6 @@ public class TimerEndpoint extends DefaultEndpoint implements MultipleConsumersS
         return true;
     }
 
-    public synchronized Timer getTimer() {
-        if (timer == null) {
-            TimerComponent tc = (TimerComponent)getComponent();
-            timer = tc.getTimer(this);
-        }
-        return timer;
-    }
-
-    public synchronized void setTimer(Timer timer) {
-        this.timer = timer;
-    }
-
     @ManagedAttribute(description = "Camel id")
     public String getCamelId() {
         return this.getCamelContext().getName();
@@ -198,4 +191,28 @@ public class TimerEndpoint extends DefaultEndpoint implements MultipleConsumersS
     public String getState() {
         return getStatus().name();
     }
+
+    protected TimerEndpoint(String endpointUri, Component component) {
+        super(endpointUri, component);
+    }
+
+    public Timer getTimer(TimerConsumer consumer) {
+        if (timer != null) {
+            // use custom timer
+            return timer;
+        }
+        return getComponent().getTimer(consumer);
+    }
+
+    public void setTimer(Timer timer) {
+        this.timer = timer;
+    }
+
+    public void removeTimer(TimerConsumer consumer) {
+        if (timer == null) {
+            // only remove timer if we are not using a custom timer
+            getComponent().removeTimer(consumer);
+        }
+    }
+
 }