You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/03/31 09:31:30 UTC
[2/2] git commit: CAMEL-7333: When route is removed then remove
entries from RuntimeEndpointRegistry.
CAMEL-7333: When route is removed then remove entries from RuntimeEndpointRegistry.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9928f5de
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9928f5de
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9928f5de
Branch: refs/heads/camel-2.13.x
Commit: 9928f5de93972c6ca3cb15a1cc733609ba36f8a0
Parents: f0fbc84
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 31 09:34:16 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 31 09:34:49 2014 +0200
----------------------------------------------------------------------
.../impl/DefaultRuntimeEndpointRegistry.java | 78 +++++++++++---------
.../camel/spi/RuntimeEndpointRegistry.java | 4 +-
2 files changed, 44 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9928f5de/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
index e4ca093..890b3d3 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRuntimeEndpointRegistry.java
@@ -19,15 +19,17 @@ package org.apache.camel.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EventObject;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.management.event.ExchangeSendingEvent;
-import org.apache.camel.management.event.RouteStartedEvent;
+import org.apache.camel.management.event.RouteAddedEvent;
+import org.apache.camel.management.event.RouteRemovedEvent;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.RuntimeEndpointRegistry;
import org.apache.camel.spi.UnitOfWork;
@@ -36,9 +38,9 @@ import org.apache.camel.util.LRUCache;
public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport implements RuntimeEndpointRegistry {
- // endpoint uri -> route ids
+ // route id -> endpoint urls
private Map<String, Set<String>> inputs;
- private Map<String, Set<String>> outputs;
+ private Map<String, Map<String, String>> outputs;
private int limit = 1000;
private boolean enabled = true;
@@ -54,9 +56,13 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
public List<String> getAllEndpoints(boolean includeInputs) {
List<String> answer = new ArrayList<String>();
if (includeInputs) {
- answer.addAll(inputs.keySet());
+ for (Map.Entry<String, Set<String>> entry : inputs.entrySet()) {
+ answer.addAll(entry.getValue());
+ }
+ }
+ for (Map.Entry<String, Map<String, String>> entry : outputs.entrySet()) {
+ answer.addAll(entry.getValue().keySet());
}
- answer.addAll(outputs.keySet());
return Collections.unmodifiableList(answer);
}
@@ -64,16 +70,14 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
public List<String> getEndpointsPerRoute(String routeId, boolean includeInputs) {
List<String> answer = new ArrayList<String>();
if (includeInputs) {
- for (Map.Entry<String, Set<String>> entry : inputs.entrySet()) {
- if (entry.getValue().contains(routeId)) {
- answer.add(entry.getKey());
- }
+ Set<String> uris = inputs.get(routeId);
+ if (uris != null) {
+ answer.addAll(uris);
}
}
- for (Map.Entry<String, Set<String>> entry : outputs.entrySet()) {
- if (entry.getValue().contains(routeId)) {
- answer.add(entry.getKey());
- }
+ Map<String, String> uris = outputs.get(routeId);
+ if (uris != null) {
+ answer.addAll(uris.keySet());
}
return Collections.unmodifiableList(answer);
}
@@ -96,18 +100,18 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
@Override
public int size() {
- int total = inputs.size();
- total += outputs.size();
+ int total = inputs.values().size();
+ total += outputs.values().size();
return total;
}
@Override
protected void doStart() throws Exception {
if (inputs == null) {
- inputs = new LRUCache<String, Set<String>>(limit);
+ inputs = new HashMap<String, Set<String>>();
}
if (outputs == null) {
- outputs = new LRUCache<String, Set<String>>(limit);
+ outputs = new HashMap<String, Map<String, String>>();
}
}
@@ -118,31 +122,32 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
@Override
public void notify(EventObject event) throws Exception {
- if (event instanceof RouteStartedEvent) {
- RouteStartedEvent rse = (RouteStartedEvent) event;
+ if (event instanceof RouteAddedEvent) {
+ RouteAddedEvent rse = (RouteAddedEvent) event;
Endpoint endpoint = rse.getRoute().getEndpoint();
String routeId = rse.getRoute().getId();
- Set<String> routes = inputs.get(endpoint);
- if (routeId != null && (routes == null || !routes.contains(routeId))) {
- if (routes == null) {
- routes = new ConcurrentSkipListSet<String>();
- }
- routes.add(routeId);
- inputs.put(endpoint.getEndpointUri(), routes);
- }
+ // a HashSet is fine for inputs as we only have a limited number of those
+ Set<String> uris = new HashSet<String>();
+ uris.add(endpoint.getEndpointUri());
+ inputs.put(routeId, uris);
+ // use a LRUCache for outputs as we could potential have unlimited uris if dynamic routing is in use
+ // and therefore need to have the limit in use
+ outputs.put(routeId, new LRUCache<String, String>(limit));
+ } else if (event instanceof RouteRemovedEvent) {
+ RouteRemovedEvent rse = (RouteRemovedEvent) event;
+ String routeId = rse.getRoute().getId();
+ inputs.remove(routeId);
+ outputs.remove(routeId);
} else {
ExchangeSendingEvent ese = (ExchangeSendingEvent) event;
Endpoint endpoint = ese.getEndpoint();
String routeId = getRouteId(ese.getExchange());
+ String uri = endpoint.getEndpointUri();
- Set<String> routes = outputs.get(endpoint);
- if (routeId != null && (routes == null || !routes.contains(routeId))) {
- if (routes == null) {
- routes = new ConcurrentSkipListSet<String>();
- }
- routes.add(routeId);
- outputs.put(endpoint.getEndpointUri(), routes);
+ Map<String, String> uris = outputs.get(routeId);
+ if (uris != null && !uris.containsKey(uri)) {
+ uris.put(uri, uri);
}
}
}
@@ -164,6 +169,7 @@ public class DefaultRuntimeEndpointRegistry extends EventNotifierSupport impleme
@Override
public boolean isEnabled(EventObject event) {
return enabled && event instanceof ExchangeSendingEvent
- || event instanceof RouteStartedEvent;
+ || event instanceof RouteAddedEvent
+ || event instanceof RouteRemovedEvent;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9928f5de/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java b/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
index dcec8a0..b0d71d9 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/RuntimeEndpointRegistry.java
@@ -36,14 +36,14 @@ public interface RuntimeEndpointRegistry extends Service {
void setEnabled(boolean enabled);
/**
- * Maximum number of endpoints to keep in the cache.
+ * Maximum number of endpoints to keep in the cache per route.
* <p/>
* The default value is <tt>1000</tt>
*/
int getLimit();
/**
- * Sets the maximum number of endpoints to keep in the cache.
+ * Sets the maximum number of endpoints to keep in the cache per route.
*/
void setLimit(int limit);