You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/03/31 09:31:29 UTC

[1/2] git commit: CAMEL-7333: When route is removed then remove entries from RuntimeEndpointRegistry.

Repository: camel
Updated Branches:
  refs/heads/camel-2.13.x f0fbc8430 -> 9928f5de9
  refs/heads/master 859a1ace0 -> 9f299e914


CAMEL-7333: When route is removed then remove entries from RuntimeEndpointRegistry.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9f299e91
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9f299e91
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9f299e91

Branch: refs/heads/master
Commit: 9f299e9140615914d3fd6b7f559306a3e7d1f60e
Parents: 859a1ac
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 31 09:34:16 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 31 09:34:27 2014 +0200

----------------------------------------------------------------------
 .../impl/DefaultRuntimeEndpointRegistry.java    | 78 +++++++++++---------
 .../camel/spi/RuntimeEndpointRegistry.java      |  4 +-
 2 files changed, 44 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9f299e91/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
index e4ca093..890b3d3 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
@@ -19,15 +19,17 @@ package org.apache.camel.impl;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EventObject;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.management.event.ExchangeSendingEvent;
-import org.apache.camel.management.event.RouteStartedEvent;
+import org.apache.camel.management.event.RouteAddedEvent;
+import org.apache.camel.management.event.RouteRemovedEvent;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.RuntimeEndpointRegistry;
 import org.apache.camel.spi.UnitOfWork;
@@ -36,9 +38,9 @@ import org.apache.camel.util.LRUCache;
 
 public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport implements RuntimeEndpointRegistry {
 
-    // endpoint uri -> route ids
+    // route id -> endpoint urls
     private Map<String, Set<String>> inputs;
-    private Map<String, Set<String>> outputs;
+    private Map<String, Map<String, String>> outputs;
     private int limit = 1000;
     private boolean enabled = true;
 
@@ -54,9 +56,13 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
     public List<String> getAllEndpoints(boolean includeInputs) {
         List<String> answer = new ArrayList<String>();
         if (includeInputs) {
-            answer.addAll(inputs.keySet());
+            for (Map.Entry<String, Set<String>> entry : inputs.entrySet()) {
+                answer.addAll(entry.getValue());
+            }
+        }
+        for (Map.Entry<String, Map<String, String>> entry : outputs.entrySet()) {
+            answer.addAll(entry.getValue().keySet());
         }
-        answer.addAll(outputs.keySet());
         return Collections.unmodifiableList(answer);
     }
 
@@ -64,16 +70,14 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
     public List<String> getEndpointsPerRoute(String routeId, boolean includeInputs) {
         List<String> answer = new ArrayList<String>();
         if (includeInputs) {
-            for (Map.Entry<String, Set<String>> entry : inputs.entrySet()) {
-                if (entry.getValue().contains(routeId)) {
-                    answer.add(entry.getKey());
-                }
+            Set<String> uris = inputs.get(routeId);
+            if (uris != null) {
+                answer.addAll(uris);
             }
         }
-        for (Map.Entry<String, Set<String>> entry : outputs.entrySet()) {
-            if (entry.getValue().contains(routeId)) {
-                answer.add(entry.getKey());
-            }
+        Map<String, String> uris = outputs.get(routeId);
+        if (uris != null) {
+            answer.addAll(uris.keySet());
         }
         return Collections.unmodifiableList(answer);
     }
@@ -96,18 +100,18 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
 
     @Override
     public int size() {
-        int total = inputs.size();
-        total += outputs.size();
+        int total = inputs.values().size();
+        total += outputs.values().size();
         return total;
     }
 
     @Override
     protected void doStart() throws Exception {
         if (inputs == null) {
-            inputs = new LRUCache<String, Set<String>>(limit);
+            inputs = new HashMap<String, Set<String>>();
         }
         if (outputs == null) {
-            outputs = new LRUCache<String, Set<String>>(limit);
+            outputs = new HashMap<String, Map<String, String>>();
         }
     }
 
@@ -118,31 +122,32 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
 
     @Override
     public void notify(EventObject event) throws Exception {
-        if (event instanceof RouteStartedEvent) {
-            RouteStartedEvent rse = (RouteStartedEvent) event;
+        if (event instanceof RouteAddedEvent) {
+            RouteAddedEvent rse = (RouteAddedEvent) event;
             Endpoint endpoint = rse.getRoute().getEndpoint();
             String routeId = rse.getRoute().getId();
 
-            Set<String> routes = inputs.get(endpoint);
-            if (routeId != null && (routes == null || !routes.contains(routeId))) {
-                if (routes == null) {
-                    routes = new ConcurrentSkipListSet<String>();
-                }
-                routes.add(routeId);
-                inputs.put(endpoint.getEndpointUri(), routes);
-            }
+            // a HashSet is fine for inputs as we only have a limited number of those
+            Set<String> uris = new HashSet<String>();
+            uris.add(endpoint.getEndpointUri());
+            inputs.put(routeId, uris);
+            // use a LRUCache for outputs as we could potential have unlimited uris if dynamic routing is in use
+            // and therefore need to have the limit in use
+            outputs.put(routeId, new LRUCache<String, String>(limit));
+        } else if (event instanceof RouteRemovedEvent) {
+            RouteRemovedEvent rse = (RouteRemovedEvent) event;
+            String routeId = rse.getRoute().getId();
+            inputs.remove(routeId);
+            outputs.remove(routeId);
         } else {
             ExchangeSendingEvent ese = (ExchangeSendingEvent) event;
             Endpoint endpoint = ese.getEndpoint();
             String routeId = getRouteId(ese.getExchange());
+            String uri = endpoint.getEndpointUri();
 
-            Set<String> routes = outputs.get(endpoint);
-            if (routeId != null && (routes == null || !routes.contains(routeId))) {
-                if (routes == null) {
-                    routes = new ConcurrentSkipListSet<String>();
-                }
-                routes.add(routeId);
-                outputs.put(endpoint.getEndpointUri(), routes);
+            Map<String, String> uris = outputs.get(routeId);
+            if (uris != null && !uris.containsKey(uri)) {
+                uris.put(uri, uri);
             }
         }
     }
@@ -164,6 +169,7 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
     @Override
     public boolean isEnabled(EventObject event) {
         return enabled && event instanceof ExchangeSendingEvent
-                || event instanceof RouteStartedEvent;
+                || event instanceof RouteAddedEvent
+                || event instanceof RouteRemovedEvent;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9f299e91/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java b/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
index dcec8a0..b0d71d9 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
@@ -36,14 +36,14 @@ public interface RuntimeEndpointRegistry extends Service {
     void setEnabled(boolean enabled);
 
     /**
-     * Maximum number of endpoints to keep in the cache.
+     * Maximum number of endpoints to keep in the cache per route.
      * <p/>
      * The default value is <tt>1000</tt>
      */
     int getLimit();
 
     /**
-     * Sets the maximum number of endpoints to keep in the cache.
+     * Sets the maximum number of endpoints to keep in the cache per route.
      */
     void setLimit(int limit);
 


[2/2] git commit: CAMEL-7333: When route is removed then remove entries from RuntimeEndpointRegistry.

Posted by da...@apache.org.
CAMEL-7333: When route is removed then remove entries from RuntimeEndpointRegistry.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9928f5de
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9928f5de
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9928f5de

Branch: refs/heads/camel-2.13.x
Commit: 9928f5de93972c6ca3cb15a1cc733609ba36f8a0
Parents: f0fbc84
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 31 09:34:16 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 31 09:34:49 2014 +0200

----------------------------------------------------------------------
 .../impl/DefaultRuntimeEndpointRegistry.java    | 78 +++++++++++---------
 .../camel/spi/RuntimeEndpointRegistry.java      |  4 +-
 2 files changed, 44 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9928f5de/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
index e4ca093..890b3d3 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
@@ -19,15 +19,17 @@ package org.apache.camel.impl;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EventObject;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.management.event.ExchangeSendingEvent;
-import org.apache.camel.management.event.RouteStartedEvent;
+import org.apache.camel.management.event.RouteAddedEvent;
+import org.apache.camel.management.event.RouteRemovedEvent;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.RuntimeEndpointRegistry;
 import org.apache.camel.spi.UnitOfWork;
@@ -36,9 +38,9 @@ import org.apache.camel.util.LRUCache;
 
 public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport implements RuntimeEndpointRegistry {
 
-    // endpoint uri -> route ids
+    // route id -> endpoint urls
     private Map<String, Set<String>> inputs;
-    private Map<String, Set<String>> outputs;
+    private Map<String, Map<String, String>> outputs;
     private int limit = 1000;
     private boolean enabled = true;
 
@@ -54,9 +56,13 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
     public List<String> getAllEndpoints(boolean includeInputs) {
         List<String> answer = new ArrayList<String>();
         if (includeInputs) {
-            answer.addAll(inputs.keySet());
+            for (Map.Entry<String, Set<String>> entry : inputs.entrySet()) {
+                answer.addAll(entry.getValue());
+            }
+        }
+        for (Map.Entry<String, Map<String, String>> entry : outputs.entrySet()) {
+            answer.addAll(entry.getValue().keySet());
         }
-        answer.addAll(outputs.keySet());
         return Collections.unmodifiableList(answer);
     }
 
@@ -64,16 +70,14 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
     public List<String> getEndpointsPerRoute(String routeId, boolean includeInputs) {
         List<String> answer = new ArrayList<String>();
         if (includeInputs) {
-            for (Map.Entry<String, Set<String>> entry : inputs.entrySet()) {
-                if (entry.getValue().contains(routeId)) {
-                    answer.add(entry.getKey());
-                }
+            Set<String> uris = inputs.get(routeId);
+            if (uris != null) {
+                answer.addAll(uris);
             }
         }
-        for (Map.Entry<String, Set<String>> entry : outputs.entrySet()) {
-            if (entry.getValue().contains(routeId)) {
-                answer.add(entry.getKey());
-            }
+        Map<String, String> uris = outputs.get(routeId);
+        if (uris != null) {
+            answer.addAll(uris.keySet());
         }
         return Collections.unmodifiableList(answer);
     }
@@ -96,18 +100,18 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
 
     @Override
     public int size() {
-        int total = inputs.size();
-        total += outputs.size();
+        int total = inputs.values().size();
+        total += outputs.values().size();
         return total;
     }
 
     @Override
     protected void doStart() throws Exception {
         if (inputs == null) {
-            inputs = new LRUCache<String, Set<String>>(limit);
+            inputs = new HashMap<String, Set<String>>();
         }
         if (outputs == null) {
-            outputs = new LRUCache<String, Set<String>>(limit);
+            outputs = new HashMap<String, Map<String, String>>();
         }
     }
 
@@ -118,31 +122,32 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
 
     @Override
     public void notify(EventObject event) throws Exception {
-        if (event instanceof RouteStartedEvent) {
-            RouteStartedEvent rse = (RouteStartedEvent) event;
+        if (event instanceof RouteAddedEvent) {
+            RouteAddedEvent rse = (RouteAddedEvent) event;
             Endpoint endpoint = rse.getRoute().getEndpoint();
             String routeId = rse.getRoute().getId();
 
-            Set<String> routes = inputs.get(endpoint);
-            if (routeId != null && (routes == null || !routes.contains(routeId))) {
-                if (routes == null) {
-                    routes = new ConcurrentSkipListSet<String>();
-                }
-                routes.add(routeId);
-                inputs.put(endpoint.getEndpointUri(), routes);
-            }
+            // a HashSet is fine for inputs as we only have a limited number of those
+            Set<String> uris = new HashSet<String>();
+            uris.add(endpoint.getEndpointUri());
+            inputs.put(routeId, uris);
+            // use a LRUCache for outputs as we could potential have unlimited uris if dynamic routing is in use
+            // and therefore need to have the limit in use
+            outputs.put(routeId, new LRUCache<String, String>(limit));
+        } else if (event instanceof RouteRemovedEvent) {
+            RouteRemovedEvent rse = (RouteRemovedEvent) event;
+            String routeId = rse.getRoute().getId();
+            inputs.remove(routeId);
+            outputs.remove(routeId);
         } else {
             ExchangeSendingEvent ese = (ExchangeSendingEvent) event;
             Endpoint endpoint = ese.getEndpoint();
             String routeId = getRouteId(ese.getExchange());
+            String uri = endpoint.getEndpointUri();
 
-            Set<String> routes = outputs.get(endpoint);
-            if (routeId != null && (routes == null || !routes.contains(routeId))) {
-                if (routes == null) {
-                    routes = new ConcurrentSkipListSet<String>();
-                }
-                routes.add(routeId);
-                outputs.put(endpoint.getEndpointUri(), routes);
+            Map<String, String> uris = outputs.get(routeId);
+            if (uris != null && !uris.containsKey(uri)) {
+                uris.put(uri, uri);
             }
         }
     }
@@ -164,6 +169,7 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
     @Override
     public boolean isEnabled(EventObject event) {
         return enabled && event instanceof ExchangeSendingEvent
-                || event instanceof RouteStartedEvent;
+                || event instanceof RouteAddedEvent
+                || event instanceof RouteRemovedEvent;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9928f5de/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java b/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
index dcec8a0..b0d71d9 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
@@ -36,14 +36,14 @@ public interface RuntimeEndpointRegistry extends Service {
     void setEnabled(boolean enabled);
 
     /**
-     * Maximum number of endpoints to keep in the cache.
+     * Maximum number of endpoints to keep in the cache per route.
      * <p/>
      * The default value is <tt>1000</tt>
      */
     int getLimit();
 
     /**
-     * Sets the maximum number of endpoints to keep in the cache.
+     * Sets the maximum number of endpoints to keep in the cache per route.
      */
     void setLimit(int limit);