You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/02/20 10:05:16 UTC

camel git commit: camel-etcd: add JMX api and make it easy to configure etcd endpoint uris

Repository: camel
Updated Branches:
  refs/heads/master fdde06923 -> e5824c357


camel-etcd: add JMX api and make it easy to configure etcd endpoint uris


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e5824c35
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e5824c35
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e5824c35

Branch: refs/heads/master
Commit: e5824c357e195153a4d6307814bbba5b8bae4543
Parents: fdde069
Author: lburgazzoli <lb...@gmail.com>
Authored: Mon Feb 20 11:04:14 2017 +0100
Committer: lburgazzoli <lb...@gmail.com>
Committed: Mon Feb 20 11:04:23 2017 +0100

----------------------------------------------------------------------
 .../camel/component/etcd/EtcdConfiguration.java |  19 +--
 .../apache/camel/component/etcd/EtcdHelper.java |  23 +++-
 .../component/etcd/policy/EtcdRoutePolicy.java  | 115 +++++++++++++++----
 .../etcd/policy/EtcdRoutePolicyMain.java        |   4 +-
 4 files changed, 118 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e5824c35/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
index 4c97ceb..9ce6d2a 100644
--- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
+++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.etcd;
 
-import java.net.URI;
-
 import mousio.etcd4j.EtcdClient;
 import mousio.etcd4j.EtcdSecurityContext;
 import org.apache.camel.CamelContext;
@@ -178,21 +176,6 @@ public class EtcdConfiguration {
     }
 
     public EtcdClient createClient() throws Exception {
-        String[] uris;
-        if (getUris() != null) {
-            uris = getUris().split(",");
-        } else {
-            uris = EtcdConstants.ETCD_DEFAULT_URIS.split(",");
-        }
-
-        URI[] etcdUriList = new URI[uris.length];
-
-        for (int i = 0; i < uris.length; i++) {
-            etcdUriList[i] = camelContext != null
-                ? URI.create(camelContext.resolvePropertyPlaceholders(uris[i]))
-                : URI.create(uris[i]);
-        }
-
         return new EtcdClient(
             new EtcdSecurityContext(
                 sslContextParameters != null
@@ -200,7 +183,7 @@ public class EtcdConfiguration {
                     : null,
                 userName,
                 password),
-            etcdUriList
+            EtcdHelper.resolveURIs(camelContext, getUris())
         );
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e5824c35/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
index a2954bc..b2a52aa 100644
--- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
+++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.etcd;
 
+import java.net.URI;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
@@ -24,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import mousio.etcd4j.responses.EtcdErrorCode;
 import mousio.etcd4j.responses.EtcdException;
 import mousio.etcd4j.responses.EtcdKeysResponse;
+import org.apache.camel.CamelContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,8 +50,6 @@ public final class EtcdHelper  {
             .setSerializationInclusion(JsonInclude.Include.NON_NULL);
     }
 
-
-
     public static void setIndex(AtomicLong index, EtcdKeysResponse response) {
         if (response != null && response.node != null) {
             index.set(response.node.modifiedIndex + 1);
@@ -59,4 +59,23 @@ public final class EtcdHelper  {
             LOGGER.debug("Index received={}, next={}", response.node.modifiedIndex, index.get());
         }
     }
+
+    public static URI[] resolveURIs(CamelContext camelContext, String uriList) throws Exception {
+        String[] uris;
+        if (uriList != null) {
+            uris = uriList.split(",");
+        } else {
+            uris = EtcdConstants.ETCD_DEFAULT_URIS.split(",");
+        }
+
+        URI[] etcdUriList = new URI[uris.length];
+
+        for (int i = 0; i < uris.length; i++) {
+            etcdUriList[i] = camelContext != null
+                ? URI.create(camelContext.resolvePropertyPlaceholders(uris[i]))
+                : URI.create(uris[i]);
+        }
+
+        return etcdUriList;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e5824c35/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
index 57a582d..cf36df8 100644
--- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
+++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
@@ -28,33 +28,50 @@ import mousio.etcd4j.EtcdClient;
 import mousio.etcd4j.responses.EtcdErrorCode;
 import mousio.etcd4j.responses.EtcdException;
 import mousio.etcd4j.responses.EtcdKeysResponse;
-import org.apache.camel.NonManagedService;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Route;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.component.etcd.EtcdConfiguration;
+import org.apache.camel.component.etcd.EtcdConstants;
 import org.apache.camel.component.etcd.EtcdHelper;
 import org.apache.camel.support.RoutePolicySupport;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromise.IsSimplePromiseResponseHandler<EtcdKeysResponse>, NonManagedService {
+@ManagedResource(description = "Route policy using Etcd as clustered lock")
+public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromise.IsSimplePromiseResponseHandler<EtcdKeysResponse>, CamelContextAware {
     private static final Logger LOGGER = LoggerFactory.getLogger(EtcdRoutePolicy.class);
 
-    private final Object lock;
-    private final EtcdClient client;
-    private final boolean managedClient;
-    private final AtomicBoolean leader;
-    private final Set<Route> suspendedRoutes;
-    private final AtomicLong index;
+    private final Object lock = new Object();
+    private final AtomicBoolean leader = new AtomicBoolean(false);
+    private final Set<Route> suspendedRoutes = new HashSet<>();
+    private final AtomicLong index = new AtomicLong(0);
+
+    private int ttl = 60;
+    private int watchTimeout = 60 / 3;
+    private boolean shouldStopConsumer = true;
+
+    private Route route;
+    private CamelContext camelContext;
 
     private String serviceName;
     private String servicePath;
-    private int ttl;
-    private int watchTimeout;
-    private boolean shouldStopConsumer;
+    private EtcdClient client;
+    private boolean managedClient;
+    private String clientUris = EtcdConstants.ETCD_DEFAULT_URIS;
 
     public EtcdRoutePolicy() {
-        this(new EtcdClient(), true);
+        this.client = null;
+        this.managedClient = false;
+    }
+
+    public EtcdRoutePolicy(EtcdConfiguration configuration) throws Exception {
+        this.client = configuration.createClient();
+        this.managedClient = true;
     }
 
     public EtcdRoutePolicy(EtcdClient client) {
@@ -64,15 +81,26 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
     public EtcdRoutePolicy(EtcdClient client, boolean managedClient) {
         this.client = client;
         this.managedClient = managedClient;
-        this.suspendedRoutes =  new HashSet<>();
-        this.leader = new AtomicBoolean(false);
-        this.lock = new Object();
-        this.index = new AtomicLong(0);
-        this.serviceName = null;
-        this.servicePath = null;
-        this.ttl = 60;
-        this.watchTimeout = ttl / 3;
-        this.shouldStopConsumer = true;
+    }
+
+    public EtcdRoutePolicy(String clientUris) {
+        this.clientUris = clientUris;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public void onInit(Route route) {
+        super.onInit(route);
+        this.route = route;
     }
 
     @Override
@@ -98,6 +126,14 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
 
     @Override
     protected void doStart() throws Exception {
+        ObjectHelper.notNull(camelContext, "camelContext");
+        ObjectHelper.notNull(clientUris, "clientUris");
+
+        if (client == null) {
+            client = new EtcdClient(EtcdHelper.resolveURIs(camelContext, clientUris));
+            managedClient = true;
+        }
+
         setLeader(tryTakeLeadership());
         watch();
 
@@ -178,14 +214,32 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
         return client;
     }
 
+    @ManagedAttribute(description = "The route id")
+    public String getRouteId() {
+        if (route != null) {
+            return route.getId();
+        }
+        return null;
+    }
+
+    @ManagedAttribute(description = "The consumer endpoint", mask = true)
+    public String getEndpointUrl() {
+        if (route != null && route.getConsumer() != null && route.getConsumer().getEndpoint() != null) {
+            return route.getConsumer().getEndpoint().toString();
+        }
+        return null;
+    }
+
     public String getServiceName() {
         return serviceName;
     }
 
+    @ManagedAttribute(description = "The etcd service name")
     public void setServiceName(String serviceName) {
         this.serviceName = serviceName;
     }
 
+    @ManagedAttribute(description = "The etcd service path")
     public String getServicePath() {
         return servicePath;
     }
@@ -194,6 +248,7 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
         this.servicePath = servicePath;
     }
 
+    @ManagedAttribute(description = "The time to live (seconds)")
     public int getTtl() {
         return ttl;
     }
@@ -202,6 +257,7 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
         this.ttl = ttl;
     }
 
+    @ManagedAttribute(description = "The watch timeout (seconds)")
     public int getWatchTimeout() {
         return watchTimeout;
     }
@@ -210,6 +266,7 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
         this.watchTimeout = watchTimeout;
     }
 
+    @ManagedAttribute(description = "Whether to stop consumer when starting up and failed to become master")
     public boolean isShouldStopConsumer() {
         return shouldStopConsumer;
     }
@@ -218,6 +275,20 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
         this.shouldStopConsumer = shouldStopConsumer;
     }
 
+    @ManagedAttribute(description = "Is this route the master or a slave")
+    public boolean isLeader() {
+        return leader.get();
+    }
+
+    @ManagedAttribute(description = "Etcd endpoints")
+    public String getClientUris() {
+        return clientUris;
+    }
+
+    public void setClientUris(String clientUris) {
+        this.clientUris = clientUris;
+    }
+
     // *************************************************************************
     // Watch
     // *************************************************************************
@@ -279,7 +350,7 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi
 
             client.get(servicePath)
                 .waitForChange(index.get())
-                .timeout(ttl / 3, TimeUnit.SECONDS)
+                .timeout(watchTimeout, TimeUnit.SECONDS)
                 .send()
                 .addListener(this);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/camel/blob/e5824c35/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicyMain.java
----------------------------------------------------------------------
diff --git a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicyMain.java b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicyMain.java
index ad46647..7348b81 100644
--- a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicyMain.java
+++ b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicyMain.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.etcd.policy;
 
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.etcd.EtcdConstants;
 import org.apache.camel.main.Main;
 
 public final class EtcdRoutePolicyMain {
@@ -29,11 +30,12 @@ public final class EtcdRoutePolicyMain {
         main.addRouteBuilder(new RouteBuilder() {
             public void configure() {
                 EtcdRoutePolicy policy = new EtcdRoutePolicy();
+                policy.setClientUris(EtcdConstants.ETCD_DEFAULT_URIS);
                 policy.setServicePath("/camel/services/leader");
                 policy.setServiceName(args[1]);
                 policy.setTtl(15);
 
-                fromF("file:///tmp/camel?delete=true")
+                from("file:///tmp/camel?delete=true")
                     .routeId(args[1])
                     .routePolicy(policy)
                     .setHeader("EtcdRouteID", constant(args[1]))