You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/02/20 09:23:29 UTC
camel git commit: CAMEL-10862: camel-consul - ConsultRoutePolicy -
Allow to configure host port easier
Repository: camel
Updated Branches:
refs/heads/master 052fcc601 -> 0a487f8af
CAMEL-10862: camel-consul - ConsultRoutePolicy - Allow to configure host port easier
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0a487f8a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0a487f8a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0a487f8a
Branch: refs/heads/master
Commit: 0a487f8af13e0120e275640998e22701b945b5da
Parents: 052fcc6
Author: lburgazzoli <lb...@gmail.com>
Authored: Mon Feb 20 10:21:56 2017 +0100
Committer: lburgazzoli <lb...@gmail.com>
Committed: Mon Feb 20 10:22:15 2017 +0100
----------------------------------------------------------------------
.../camel/component/consul/ConsulConstants.java | 4 +
.../consul/policy/ConsulRoutePolicy.java | 97 ++++++++++++--------
.../consul/policy/ConsulRoutePolicyMain.java | 6 +-
3 files changed, 69 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0a487f8a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
index 1265fa5..fd8ea39 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ConsulConstants.java
@@ -16,7 +16,11 @@
*/
package org.apache.camel.component.consul;
+import com.orbitz.consul.Consul;
+
public interface ConsulConstants {
+ String CONSUL_DEFAULT_URL = String.format("http://%s:%d", Consul.DEFAULT_HTTP_HOST, Consul.DEFAULT_HTTP_PORT);
+
// Service Call EIP
String CONSUL_SERVER_IP = "CamelConsulServerIp";
String CONSUL_SERVER_PORT = "CamelConsulServerPort";
http://git-wip-us.apache.org/repos/asf/camel/blob/0a487f8a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
index 17c9c43..e7397a8 100644
--- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/policy/ConsulRoutePolicy.java
@@ -37,53 +37,49 @@ import org.apache.camel.CamelContextAware;
import org.apache.camel.Route;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.component.consul.ConsulConfiguration;
+import org.apache.camel.component.consul.ConsulConstants;
import org.apache.camel.support.RoutePolicySupport;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ManagedResource(description = "Route policy using Consul as clustered lock")
-public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContextAware {
+public final class ConsulRoutePolicy extends RoutePolicySupport implements CamelContextAware {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsulRoutePolicy.class);
- private final Object lock;
- private final Consul consul;
- private final SessionClient sessionClient;
- private final KeyValueClient keyValueClient;
- private final AtomicBoolean leader;
- private final Set<Route> suspendedRoutes;
- private final AtomicReference<BigInteger> index;
+ private final Object lock = new Object();
+ private final AtomicBoolean leader = new AtomicBoolean(false);
+ private final Set<Route> suspendedRoutes = new HashSet<>();
+ private final AtomicReference<BigInteger> index = new AtomicReference<>(BigInteger.valueOf(0));
private Route route;
private CamelContext camelContext;
private String serviceName;
private String servicePath;
- private int ttl;
- private int lockDelay;
private ExecutorService executorService;
- private boolean shouldStopConsumer;
+
+ private int ttl = 60;
+ private int lockDelay = 10;
+ private boolean shouldStopConsumer = true;
+ private String consulUrl = ConsulConstants.CONSUL_DEFAULT_URL;
+
+ private Consul consul;
+ private SessionClient sessionClient;
+ private KeyValueClient keyValueClient;
private String sessionId;
public ConsulRoutePolicy() {
- this(Consul.builder().build());
}
- public ConsulRoutePolicy(Consul consul) {
- this.consul = consul;
- this.sessionClient = consul.sessionClient();
- this.keyValueClient = consul.keyValueClient();
- this.suspendedRoutes = new HashSet<>();
- this.leader = new AtomicBoolean(false);
- this.lock = new Object();
- this.index = new AtomicReference<>(BigInteger.valueOf(0));
- this.serviceName = null;
- this.servicePath = null;
- this.ttl = 60;
- this.lockDelay = 10;
- this.executorService = null;
- this.shouldStopConsumer = true;
- this.sessionId = null;
+ public ConsulRoutePolicy(String consulUrl) {
+ this.consulUrl = consulUrl;
+ }
+
+ public ConsulRoutePolicy(ConsulConfiguration configuration) throws Exception {
+ this.consulUrl = configuration.getUrl();
+ this.consul = configuration.createConsulClient();
}
@Override
@@ -96,6 +92,14 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex
this.camelContext = camelContext;
}
+ public String getConsulUrl() {
+ return consulUrl;
+ }
+
+ public void setConsulUrl(String consulUrl) {
+ this.consulUrl = consulUrl;
+ }
+
@Override
public void onInit(Route route) {
super.onInit(route);
@@ -125,6 +129,26 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex
@Override
protected void doStart() throws Exception {
+ ObjectHelper.notNull(camelContext, "camelContext");
+ ObjectHelper.notNull(serviceName, "serviceName");
+ ObjectHelper.notNull(servicePath, "servicePath");
+
+ if (consul == null) {
+ Consul.Builder builder = Consul.builder();
+ if (consulUrl != null) {
+ builder.withUrl(consulUrl);
+ }
+
+ consul = builder.build();
+ }
+
+ if (sessionClient == null) {
+ sessionClient = consul.sessionClient();
+ }
+ if (keyValueClient == null) {
+ keyValueClient = consul.keyValueClient();
+ }
+
if (sessionId == null) {
sessionId = sessionClient.createSession(
ImmutableSession.builder()
@@ -136,7 +160,7 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex
LOGGER.debug("SessionID = {}", sessionId);
if (executorService == null) {
- executorService = getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "HazelcastRoutePolicy");
+ executorService = getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "ConsulRoutePolicy");
}
setLeader(keyValueClient.acquireLock(servicePath, sessionId));
@@ -154,10 +178,10 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex
if (sessionId != null) {
sessionClient.destroySession(sessionId);
sessionId = null;
+ }
- if (executorService != null) {
- getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
- }
+ if (executorService != null) {
+ getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
}
}
@@ -293,10 +317,10 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex
@Override
public void onComplete(ConsulResponse<Optional<Value>> consulResponse) {
if (isRunAllowed()) {
- Value response = consulResponse.getResponse().orNull();
- if (response != null) {
- String sid = response.getSession().orNull();
- if (ObjectHelper.isEmpty(sid)) {
+ Optional<Value> value = consulResponse.getResponse();
+ if (value.isPresent()) {
+ Optional<String> sid = value.get().getSession();
+ if (sid.isPresent() && ObjectHelper.isNotEmpty(sid.get())) {
// If the key is not held by any session, try acquire a
// lock (become leader)
LOGGER.debug("Try to take leadership ...");
@@ -326,7 +350,8 @@ public class ConsulRoutePolicy extends RoutePolicySupport implements CamelContex
keyValueClient.getValue(
servicePath,
QueryOptions.blockSeconds(ttl / 3, index.get()).build(),
- this);
+ this
+ );
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0a487f8a/components/camel-consul/src/test/java/org/apache/camel/component/consul/policy/ConsulRoutePolicyMain.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/policy/ConsulRoutePolicyMain.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/policy/ConsulRoutePolicyMain.java
index 447f5b3..b93181b 100644
--- a/components/camel-consul/src/test/java/org/apache/camel/component/consul/policy/ConsulRoutePolicyMain.java
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/policy/ConsulRoutePolicyMain.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.consul.policy;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.consul.ConsulConstants;
import org.apache.camel.main.Main;
public final class ConsulRoutePolicyMain {
@@ -29,15 +30,16 @@ public final class ConsulRoutePolicyMain {
main.addRouteBuilder(new RouteBuilder() {
public void configure() {
ConsulRoutePolicy policy = new ConsulRoutePolicy();
+ policy.setConsulUrl(ConsulConstants.CONSUL_DEFAULT_URL);
policy.setServiceName(args[0]);
policy.setTtl(15);
- fromF("file:///tmp/camel?delete=true")
+ from("file:///tmp/camel?delete=true")
.routeId(args[1])
.routePolicy(policy)
.setHeader("ConsulRouteID", simple("${routeId}"))
.setHeader("ConsulServiceName", constant(args[0]))
- .to("log:org.apache.camel.component.etcd?level=INFO&showAll=true");
+ .to("log:org.apache.camel.component.consul?level=INFO&showAll=true");
}
});