You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/02/20 09:23:29 UTC

camel git commit: CAMEL-10862: camel-consul - ConsultRoutePolicy - Allow to configure host port easier

Repository: camel
Updated Branches:
  refs/heads/master 052fcc601 -> 0a487f8af


CAMEL-10862: camel-consul - ConsultRoutePolicy - Allow to configure host port easier


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0a487f8a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0a487f8a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0a487f8a

Branch: refs/heads/master
Commit: 0a487f8af13e0120e275640998e22701b945b5da
Parents: 052fcc6
Author: lburgazzoli <lb...@gmail.com>
Authored: Mon Feb 20 10:21:56 2017 +0100
Committer: lburgazzoli <lb...@gmail.com>
Committed: Mon Feb 20 10:22:15 2017 +0100

----------------------------------------------------------------------
 .../camel/component/consul/ConsulConstants.java |  4 +
 .../consul/policy/ConsulRoutePolicy.java        | 97 ++++++++++++--------
 .../consul/policy/ConsulRoutePolicyMain.java    |  6 +-
 3 files changed, 69 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0a487f8a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
index 1265fa5..fd8ea39 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
@@ -16,7 +16,11 @@
  */
 package org.apache.camel.component.consul;
 
+import com.orbitz.consul.Consul;
+
 public interface ConsulConstants {
+    String CONSUL_DEFAULT_URL = String.format("http://%s:%d", Consul.DEFAULT_HTTP_HOST, Consul.DEFAULT_HTTP_PORT);
+
     // Service Call EIP
     String CONSUL_SERVER_IP = "CamelConsulServerIp";
     String CONSUL_SERVER_PORT = "CamelConsulServerPort";

http://git-wip-us.apache.org/repos/asf/camel/blob/0a487f8a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
index 17c9c43..e7397a8 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
@@ -37,53 +37,49 @@ import org.apache.camel.CamelContextAware;
 import org.apache.camel.Route;
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
 import org.apache.camel.support.RoutePolicySupport;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @ManagedResource(description = "Route policy using Consul as clustered lock")
-public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContextAware {
+public final class ConsulRoutePolicy extends RoutePolicySupport implements CamelContextAware {
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsulRoutePolicy.class);
 
-    private final Object lock;
-    private final Consul consul;
-    private final SessionClient sessionClient;
-    private final KeyValueClient keyValueClient;
-    private final AtomicBoolean leader;
-    private final Set<Route> suspendedRoutes;
-    private final AtomicReference<BigInteger> index;
+    private final Object lock = new Object();
+    private final AtomicBoolean leader = new AtomicBoolean(false);
+    private final Set<Route> suspendedRoutes = new HashSet<>();
+    private final AtomicReference<BigInteger> index = new AtomicReference<>(BigInteger.valueOf(0));
 
     private Route route;
     private CamelContext camelContext;
     private String serviceName;
     private String servicePath;
-    private int ttl;
-    private int lockDelay;
     private ExecutorService executorService;
-    private boolean shouldStopConsumer;
+
+    private int ttl = 60;
+    private int lockDelay = 10;
+    private boolean shouldStopConsumer = true;
+    private String consulUrl = ConsulConstants.CONSUL_DEFAULT_URL;
+
+    private Consul consul;
+    private SessionClient sessionClient;
+    private KeyValueClient keyValueClient;
 
     private String sessionId;
 
     public ConsulRoutePolicy() {
-        this(Consul.builder().build());
     }
 
-    public ConsulRoutePolicy(Consul consul) {
-        this.consul = consul;
-        this.sessionClient = consul.sessionClient();
-        this.keyValueClient = consul.keyValueClient();
-        this.suspendedRoutes =  new HashSet<>();
-        this.leader = new AtomicBoolean(false);
-        this.lock = new Object();
-        this.index = new AtomicReference<>(BigInteger.valueOf(0));
-        this.serviceName = null;
-        this.servicePath = null;
-        this.ttl = 60;
-        this.lockDelay = 10;
-        this.executorService = null;
-        this.shouldStopConsumer = true;
-        this.sessionId = null;
+    public ConsulRoutePolicy(String consulUrl) {
+        this.consulUrl = consulUrl;
+    }
+
+    public ConsulRoutePolicy(ConsulConfiguration configuration) throws Exception {
+        this.consulUrl = configuration.getUrl();
+        this.consul = configuration.createConsulClient();
     }
 
     @Override
@@ -96,6 +92,14 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex
         this.camelContext = camelContext;
     }
 
+    public String getConsulUrl() {
+        return consulUrl;
+    }
+
+    public void setConsulUrl(String consulUrl) {
+        this.consulUrl = consulUrl;
+    }
+
     @Override
     public void onInit(Route route) {
         super.onInit(route);
@@ -125,6 +129,26 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex
 
     @Override
     protected void doStart() throws Exception {
+        ObjectHelper.notNull(camelContext, "camelContext");
+        ObjectHelper.notNull(serviceName, "serviceName");
+        ObjectHelper.notNull(servicePath, "servicePath");
+
+        if (consul == null) {
+            Consul.Builder builder = Consul.builder();
+            if (consulUrl != null) {
+                builder.withUrl(consulUrl);
+            }
+
+            consul = builder.build();
+        }
+
+        if (sessionClient == null) {
+            sessionClient = consul.sessionClient();
+        }
+        if (keyValueClient == null) {
+            keyValueClient = consul.keyValueClient();
+        }
+
         if (sessionId == null) {
             sessionId = sessionClient.createSession(
                 ImmutableSession.builder()
@@ -136,7 +160,7 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex
 
             LOGGER.debug("SessionID = {}", sessionId);
             if (executorService == null) {
-                executorService = getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "HazelcastRoutePolicy");
+                executorService = getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "ConsulRoutePolicy");
             }
 
             setLeader(keyValueClient.acquireLock(servicePath, sessionId));
@@ -154,10 +178,10 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex
         if (sessionId != null) {
             sessionClient.destroySession(sessionId);
             sessionId = null;
+        }
 
-            if (executorService != null) {
-                getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
-            }
+        if (executorService != null) {
+            getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
         }
     }
 
@@ -293,10 +317,10 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex
         @Override
         public void onComplete(ConsulResponse<Optional<Value>> consulResponse) {
             if (isRunAllowed()) {
-                Value response = consulResponse.getResponse().orNull();
-                if (response != null) {
-                    String sid = response.getSession().orNull();
-                    if (ObjectHelper.isEmpty(sid)) {
+                Optional<Value> value = consulResponse.getResponse();
+                if (value.isPresent()) {
+                    Optional<String> sid = value.get().getSession();
+                    if (sid.isPresent() && ObjectHelper.isNotEmpty(sid.get())) {
                         // If the key is not held by any session, try acquire a
                         // lock (become leader)
                         LOGGER.debug("Try to take leadership ...");
@@ -326,7 +350,8 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex
                 keyValueClient.getValue(
                     servicePath,
                     QueryOptions.blockSeconds(ttl / 3, index.get()).build(),
-                    this);
+                    this
+                );
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/0a487f8a/components/camel-consul/src/test/java/org/apache/camel/component/consul/policy/ConsulRoutePolicyMain.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/policy/ConsulRoutePolicyMain.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/policy/ConsulRoutePolicyMain.java
index 447f5b3..b93181b 100644
--- a/components/camel-consul/src/test/java/org/apache/camel/component/consul/policy/ConsulRoutePolicyMain.java
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/policy/ConsulRoutePolicyMain.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.consul.policy;
 
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.consul.ConsulConstants;
 import org.apache.camel.main.Main;
 
 public final class ConsulRoutePolicyMain {
@@ -29,15 +30,16 @@ public final class ConsulRoutePolicyMain {
         main.addRouteBuilder(new RouteBuilder() {
             public void configure() {
                 ConsulRoutePolicy policy = new ConsulRoutePolicy();
+                policy.setConsulUrl(ConsulConstants.CONSUL_DEFAULT_URL);
                 policy.setServiceName(args[0]);
                 policy.setTtl(15);
 
-                fromF("file:///tmp/camel?delete=true")
+                from("file:///tmp/camel?delete=true")
                     .routeId(args[1])
                     .routePolicy(policy)
                     .setHeader("ConsulRouteID", simple("${routeId}"))
                     .setHeader("ConsulServiceName", constant(args[0]))
-                    .to("log:org.apache.camel.component.etcd?level=INFO&showAll=true");
+                    .to("log:org.apache.camel.component.consul?level=INFO&showAll=true");
             }
         });