You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/02/20 10:05:16 UTC
camel git commit: camel-etcd: add JMX api and make it easy to
configure etcd endpoint uris
Repository: camel
Updated Branches:
refs/heads/master fdde06923 -> e5824c357
camel-etcd: add JMX api and make it easy to configure etcd endpoint uris
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e5824c35
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e5824c35
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e5824c35
Branch: refs/heads/master
Commit: e5824c357e195153a4d6307814bbba5b8bae4543
Parents: fdde069
Author: lburgazzoli <lb...@gmail.com>
Authored: Mon Feb 20 11:04:14 2017 +0100
Committer: lburgazzoli <lb...@gmail.com>
Committed: Mon Feb 20 11:04:23 2017 +0100
----------------------------------------------------------------------
.../camel/component/etcd/EtcdConfiguration.java | 19 +--
.../apache/camel/component/etcd/EtcdHelper.java | 23 +++-
.../component/etcd/policy/EtcdRoutePolicy.java | 115 +++++++++++++++----
.../etcd/policy/EtcdRoutePolicyMain.java | 4 +-
4 files changed, 118 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e5824c35/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
index 4c97ceb..9ce6d2a 100644
--- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
+++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
@@ -16,8 +16,6 @@
*/
package org.apache.camel.component.etcd;
-import java.net.URI;
-
import mousio.etcd4j.EtcdClient;
import mousio.etcd4j.EtcdSecurityContext;
import org.apache.camel.CamelContext;
@@ -178,21 +176,6 @@ public class EtcdConfiguration {
}
public EtcdClient createClient() throws Exception {
- String[] uris;
- if (getUris() != null) {
- uris = getUris().split(",");
- } else {
- uris = EtcdConstants.ETCD_DEFAULT_URIS.split(",");
- }
-
- URI[] etcdUriList = new URI[uris.length];
-
- for (int i = 0; i < uris.length; i++) {
- etcdUriList[i] = camelContext != null
- ? URI.create(camelContext.resolvePropertyPlaceholders(uris[i]))
- : URI.create(uris[i]);
- }
-
return new EtcdClient(
new EtcdSecurityContext(
sslContextParameters != null
@@ -200,7 +183,7 @@ public class EtcdConfiguration {
: null,
userName,
password),
- etcdUriList
+ EtcdHelper.resolveURIs(camelContext, getUris())
);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e5824c35/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
index a2954bc..b2a52aa 100644
--- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
+++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.etcd;
+import java.net.URI;
import java.util.concurrent.atomic.AtomicLong;
import com.fasterxml.jackson.annotation.JsonInclude;
@@ -24,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import mousio.etcd4j.responses.EtcdErrorCode;
import mousio.etcd4j.responses.EtcdException;
import mousio.etcd4j.responses.EtcdKeysResponse;
+import org.apache.camel.CamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,8 +50,6 @@ public final class EtcdHelper {
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
-
-
public static void setIndex(AtomicLong index, EtcdKeysResponse response) {
if (response != null && response.node != null) {
index.set(response.node.modifiedIndex + 1);
@@ -59,4 +59,23 @@ public final class EtcdHelper {
LOGGER.debug("Index received={}, next={}", response.node.modifiedIndex, index.get());
}
}
+
+ public static URI[] resolveURIs(CamelContext camelContext, String uriList) throws Exception {
+ String[] uris;
+ if (uriList != null) {
+ uris = uriList.split(",");
+ } else {
+ uris = EtcdConstants.ETCD_DEFAULT_URIS.split(",");
+ }
+
+ URI[] etcdUriList = new URI[uris.length];
+
+ for (int i = 0; i < uris.length; i++) {
+ etcdUriList[i] = camelContext != null
+ ? URI.create(camelContext.resolvePropertyPlaceholders(uris[i]))
+ : URI.create(uris[i]);
+ }
+
+ return etcdUriList;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e5824c35/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
index 57a582d..cf36df8 100644
--- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
+++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
@@ -28,33 +28,50 @@ import mousio.etcd4j.EtcdClient;
import mousio.etcd4j.responses.EtcdErrorCode;
import mousio.etcd4j.responses.EtcdException;
import mousio.etcd4j.responses.EtcdKeysResponse;
-import org.apache.camel.NonManagedService;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.Route;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.component.etcd.EtcdConfiguration;
+import org.apache.camel.component.etcd.EtcdConstants;
import org.apache.camel.component.etcd.EtcdHelper;
import org.apache.camel.support.RoutePolicySupport;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromise.IsSimplePromiseResponseHandler<EtcdKeysResponse>, NonManagedService {
+@ManagedResource(description = "Route policy using Etcd as clustered lock")
+public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromise.IsSimplePromiseResponseHandler<EtcdKeysResponse>, CamelContextAware {
private static final Logger LOGGER = LoggerFactory.getLogger(EtcdRoutePolicy.class);
- private final Object lock;
- private final EtcdClient client;
- private final boolean managedClient;
- private final AtomicBoolean leader;
- private final Set<Route> suspendedRoutes;
- private final AtomicLong index;
+ private final Object lock = new Object();
+ private final AtomicBoolean leader = new AtomicBoolean(false);
+ private final Set<Route> suspendedRoutes = new HashSet<>();
+ private final AtomicLong index = new AtomicLong(0);
+
+ private int ttl = 60;
+ private int watchTimeout = 60 / 3;
+ private boolean shouldStopConsumer = true;
+
+ private Route route;
+ private CamelContext camelContext;
private String serviceName;
private String servicePath;
- private int ttl;
- private int watchTimeout;
- private boolean shouldStopConsumer;
+ private EtcdClient client;
+ private boolean managedClient;
+ private String clientUris = EtcdConstants.ETCD_DEFAULT_URIS;
public EtcdRoutePolicy() {
- this(new EtcdClient(), true);
+ this.client = null;
+ this.managedClient = false;
+ }
+
+ public EtcdRoutePolicy(EtcdConfiguration configuration) throws Exception {
+ this.client = configuration.createClient();
+ this.managedClient = true;
}
public EtcdRoutePolicy(EtcdClient client) {
@@ -64,15 +81,26 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
public EtcdRoutePolicy(EtcdClient client, boolean managedClient) {
this.client = client;
this.managedClient = managedClient;
- this.suspendedRoutes = new HashSet<>();
- this.leader = new AtomicBoolean(false);
- this.lock = new Object();
- this.index = new AtomicLong(0);
- this.serviceName = null;
- this.servicePath = null;
- this.ttl = 60;
- this.watchTimeout = ttl / 3;
- this.shouldStopConsumer = true;
+ }
+
+ public EtcdRoutePolicy(String clientUris) {
+ this.clientUris = clientUris;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public void onInit(Route route) {
+ super.onInit(route);
+ this.route = route;
}
@Override
@@ -98,6 +126,14 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
@Override
protected void doStart() throws Exception {
+ ObjectHelper.notNull(camelContext, "camelContext");
+ ObjectHelper.notNull(clientUris, "clientUris");
+
+ if (client == null) {
+ client = new EtcdClient(EtcdHelper.resolveURIs(camelContext, clientUris));
+ managedClient = true;
+ }
+
setLeader(tryTakeLeadership());
watch();
@@ -178,14 +214,32 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
return client;
}
+ @ManagedAttribute(description = "The route id")
+ public String getRouteId() {
+ if (route != null) {
+ return route.getId();
+ }
+ return null;
+ }
+
+ @ManagedAttribute(description = "The consumer endpoint", mask = true)
+ public String getEndpointUrl() {
+ if (route != null && route.getConsumer() != null && route.getConsumer().getEndpoint() != null) {
+ return route.getConsumer().getEndpoint().toString();
+ }
+ return null;
+ }
+
public String getServiceName() {
return serviceName;
}
+ @ManagedAttribute(description = "The etcd service name")
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
+ @ManagedAttribute(description = "The etcd service path")
public String getServicePath() {
return servicePath;
}
@@ -194,6 +248,7 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
this.servicePath = servicePath;
}
+ @ManagedAttribute(description = "The time to live (seconds)")
public int getTtl() {
return ttl;
}
@@ -202,6 +257,7 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
this.ttl = ttl;
}
+ @ManagedAttribute(description = "The watch timeout (seconds)")
public int getWatchTimeout() {
return watchTimeout;
}
@@ -210,6 +266,7 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
this.watchTimeout = watchTimeout;
}
+ @ManagedAttribute(description = "Whether to stop consumer when starting up and failed to become master")
public boolean isShouldStopConsumer() {
return shouldStopConsumer;
}
@@ -218,6 +275,20 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
this.shouldStopConsumer = shouldStopConsumer;
}
+ @ManagedAttribute(description = "Is this route the master or a slave")
+ public boolean isLeader() {
+ return leader.get();
+ }
+
+ @ManagedAttribute(description = "Etcd endpoints")
+ public String getClientUris() {
+ return clientUris;
+ }
+
+ public void setClientUris(String clientUris) {
+ this.clientUris = clientUris;
+ }
+
// *************************************************************************
// Watch
// *************************************************************************
@@ -279,7 +350,7 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
client.get(servicePath)
.waitForChange(index.get())
- .timeout(ttl / 3, TimeUnit.SECONDS)
+ .timeout(watchTimeout, TimeUnit.SECONDS)
.send()
.addListener(this);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/camel/blob/e5824c35/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicyMain.java
----------------------------------------------------------------------
diff --git a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicyMain.java b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicyMain.java
index ad46647..7348b81 100644
--- a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicyMain.java
+++ b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicyMain.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.etcd.policy;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.etcd.EtcdConstants;
import org.apache.camel.main.Main;
public final class EtcdRoutePolicyMain {
@@ -29,11 +30,12 @@ public final class EtcdRoutePolicyMain {
main.addRouteBuilder(new RouteBuilder() {
public void configure() {
EtcdRoutePolicy policy = new EtcdRoutePolicy();
+ policy.setClientUris(EtcdConstants.ETCD_DEFAULT_URIS);
policy.setServicePath("/camel/services/leader");
policy.setServiceName(args[1]);
policy.setTtl(15);
- fromF("file:///tmp/camel?delete=true")
+ from("file:///tmp/camel?delete=true")
.routeId(args[1])
.routePolicy(policy)
.setHeader("EtcdRouteID", constant(args[1]))