You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/06/27 14:43:31 UTC
camel git commit: camel-consul: create a consul cluster-service
Repository: camel
Updated Branches:
refs/heads/master 36878ba04 -> 828a0953c
camel-consul: create a consul cluster-service
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/828a0953
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/828a0953
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/828a0953
Branch: refs/heads/master
Commit: 828a0953c0705b5d8d446f2149959a5f53905fad
Parents: 36878ba
Author: lburgazzoli <lb...@gmail.com>
Authored: Tue Jun 27 16:42:36 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Tue Jun 27 16:42:58 2017 +0200
----------------------------------------------------------------------
.../consul/ha/ConsulClusterConfiguration.java | 76 +++++
.../consul/ha/ConsulClusterService.java | 167 +++++++++++
.../component/consul/ha/ConsulClusterView.java | 277 +++++++++++++++++++
.../ha/ConsulClusteredRoutePolicyTest.java | 105 +++++++
.../src/test/resources/log4j2.properties | 5 +
5 files changed, 630 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/828a0953/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterConfiguration.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterConfiguration.java
new file mode 100644
index 0000000..f992487
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterConfiguration.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.ha;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.consul.ConsulConfiguration;
+
+public class ConsulClusterConfiguration extends ConsulConfiguration {
+ private int sessionTtl = 60;
+ private int sessionLockDelay = 5;
+ private int sessionRefreshInterval = 5;
+ private String rootPath = "/camel";
+
+ // ***********************************************
+ // Properties
+ // ***********************************************
+
+ public int getSessionTtl() {
+ return sessionTtl;
+ }
+
+ public void setSessionTtl(int sessionTtl) {
+ this.sessionTtl = sessionTtl;
+ }
+
+ public int getSessionLockDelay() {
+ return sessionLockDelay;
+ }
+
+ public void setSessionLockDelay(int sessionLockDelay) {
+ this.sessionLockDelay = sessionLockDelay;
+ }
+
+ public String getRootPath() {
+ return rootPath;
+ }
+
+ public void setRootPath(String rootPath) {
+ this.rootPath = rootPath;
+ }
+
+ public int getSessionRefreshInterval() {
+ return sessionRefreshInterval;
+ }
+
+ public void setSessionRefreshInterval(int sessionRefreshInterval) {
+ this.sessionRefreshInterval = sessionRefreshInterval;
+ }
+
+ // ***********************************************
+ //
+ // ***********************************************
+
+ @Override
+ public ConsulClusterConfiguration copy() {
+ try {
+ return (ConsulClusterConfiguration)super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeCamelException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/828a0953/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterService.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterService.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterService.java
new file mode 100644
index 0000000..c77b453
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterService.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.ha;
+
+import org.apache.camel.impl.ha.AbstractCamelClusterService;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.jsse.SSLContextParameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ConsulClusterService extends AbstractCamelClusterService<ConsulClusterView> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsulClusterService.class);
+
+ private ConsulClusterConfiguration configuration;
+
+ public ConsulClusterService() {
+ this.configuration = new ConsulClusterConfiguration();
+ }
+
+ public ConsulClusterService(ConsulClusterConfiguration configuration) {
+ this.configuration = configuration.copy();
+ }
+
+ // *********************************************
+ // Properties
+ // *********************************************
+
+ public ConsulClusterConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(ConsulClusterConfiguration configuration) {
+ this.configuration = configuration.copy();
+ }
+
+ public String getUrl() {
+ return configuration.getUrl();
+ }
+
+ public void setUrl(String url) {
+ configuration.setUrl(url);
+ }
+
+ public String getDatacenter() {
+ return configuration.getDatacenter();
+ }
+
+ public void setDatacenter(String datacenter) {
+ configuration.setDatacenter(datacenter);
+ }
+
+ public SSLContextParameters getSslContextParameters() {
+ return configuration.getSslContextParameters();
+ }
+
+ public void setSslContextParameters(SSLContextParameters sslContextParameters) {
+ configuration.setSslContextParameters(sslContextParameters);
+ }
+
+ public String getAclToken() {
+ return configuration.getAclToken();
+ }
+
+ public void setAclToken(String aclToken) {
+ configuration.setAclToken(aclToken);
+ }
+
+ public String getUserName() {
+ return configuration.getUserName();
+ }
+
+ public void setUserName(String userName) {
+ configuration.setUserName(userName);
+ }
+
+ public String getPassword() {
+ return configuration.getPassword();
+ }
+
+ public void setPassword(String password) {
+ configuration.setPassword(password);
+ }
+
+ public Long getConnectTimeoutMillis() {
+ return configuration.getConnectTimeoutMillis();
+ }
+
+ public void setConnectTimeoutMillis(Long connectTimeoutMillis) {
+ configuration.setConnectTimeoutMillis(connectTimeoutMillis);
+ }
+
+ public Long getReadTimeoutMillis() {
+ return configuration.getReadTimeoutMillis();
+ }
+
+ public void setReadTimeoutMillis(Long readTimeoutMillis) {
+ configuration.setReadTimeoutMillis(readTimeoutMillis);
+ }
+
+ public Long getWriteTimeoutMillis() {
+ return configuration.getWriteTimeoutMillis();
+ }
+
+ public void setWriteTimeoutMillis(Long writeTimeoutMillis) {
+ configuration.setWriteTimeoutMillis(writeTimeoutMillis);
+ }
+
+ public Integer getBlockSeconds() {
+ return configuration.getBlockSeconds();
+ }
+
+ public void setBlockSeconds(Integer blockSeconds) {
+ configuration.setBlockSeconds(blockSeconds);
+ }
+
+ public int getTtl() {
+ return configuration.getSessionTtl();
+ }
+
+ public void setTtl(int ttl) {
+ configuration.setSessionTtl(ttl);
+ }
+
+ public int getLockDelay() {
+ return configuration.getSessionLockDelay();
+ }
+
+ public void setLockDelay(int lockDelay) {
+ configuration.setSessionLockDelay(lockDelay);
+ }
+
+ public String getRootPath() {
+ return configuration.getRootPath();
+ }
+
+ public void setRootPath(String rootPath) {
+ configuration.setRootPath(rootPath);
+ }
+
+ // *********************************************
+ //
+ // *********************************************
+
+ @Override
+ protected ConsulClusterView createView(String namespace) throws Exception {
+
+ // Validate parameters
+ ObjectHelper.notNull(getCamelContext(), "Camel Context");
+ ObjectHelper.notNull(getRootPath(), "Consul root path");
+
+ return new ConsulClusterView(this, configuration, namespace);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/828a0953/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java
new file mode 100644
index 0000000..1956d9e
--- /dev/null
+++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/ha/ConsulClusterView.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.ha;
+
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import com.orbitz.consul.Consul;
+import com.orbitz.consul.KeyValueClient;
+import com.orbitz.consul.SessionClient;
+import com.orbitz.consul.async.ConsulResponseCallback;
+import com.orbitz.consul.model.ConsulResponse;
+import com.orbitz.consul.model.kv.Value;
+import com.orbitz.consul.model.session.ImmutableSession;
+import com.orbitz.consul.model.session.SessionInfo;
+import com.orbitz.consul.option.QueryOptions;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ha.CamelClusterMember;
+import org.apache.camel.impl.ha.AbstractCamelClusterView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ConsulClusterView extends AbstractCamelClusterView {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsulClusterService.class);
+
+ private final ConsulClusterConfiguration configuration;
+ private final ConsulLocalMember localMember;
+ private final ConsulClusterMember nullMember;
+ private final Watcher watcher;
+
+ private Consul client;
+ private SessionClient sessionClient;
+ private KeyValueClient keyValueClient;
+ private String sessionId;
+ private String path;
+
+ ConsulClusterView(ConsulClusterService service, ConsulClusterConfiguration configuration, String namespace) throws Exception {
+ super(service, namespace);
+
+ this.configuration = configuration;
+ this.localMember = new ConsulLocalMember();
+ this.nullMember = new ConsulClusterMember();
+ this.watcher = new Watcher();
+ this.path = configuration.getRootPath() + "/" + namespace;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ super.setCamelContext(camelContext);
+
+ this.configuration.setCamelContext(camelContext);
+ }
+
+ @Override
+ public Optional<CamelClusterMember> getMaster() {
+ if (keyValueClient == null) {
+ return Optional.empty();
+ }
+
+ return Optional.ofNullable(
+ keyValueClient.getSession(configuration.getRootPath())
+ .transform(ConsulClusterMember::new)
+ .orNull()
+ );
+ }
+
+ @Override
+ public CamelClusterMember getLocalMember() {
+ return this.localMember;
+ }
+
+ @Override
+ public List<CamelClusterMember> getMembers() {
+ if (sessionClient == null) {
+ return Collections.emptyList();
+ }
+
+ return sessionClient.listSessions().stream()
+ .filter(i -> i.getName().equals(getNamespace()))
+ .map(ConsulClusterMember::new)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (sessionId == null) {
+ client = configuration.createConsulClient();
+ sessionClient = client.sessionClient();
+ keyValueClient = client.keyValueClient();
+
+ sessionId = sessionClient.createSession(
+ ImmutableSession.builder()
+ .name(getNamespace())
+ .ttl(configuration.getSessionTtl() + "s")
+ .lockDelay(configuration.getSessionLockDelay() + "s")
+ .build()
+ ).getId();
+
+ LOGGER.debug("Acquired session with id '{}'", sessionId);
+ boolean lock = keyValueClient.acquireLock(this.path, sessionId);
+ LOGGER.debug("Acquire lock on path '{}' with id '{}' result '{}'", path, sessionId, lock);
+
+ localMember.setMaster(lock);
+ watcher.watch();
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (sessionId != null) {
+ if (keyValueClient.releaseLock(this.path, sessionId)) {
+ LOGGER.debug("Successfully released lock on path '{}' with id '{}'", path, sessionId);
+ }
+
+ sessionClient.destroySession(sessionId);
+ localMember.setMaster(false);
+ }
+ }
+
+ // ***********************************************
+ //
+ // ***********************************************
+
+ private final class ConsulLocalMember implements CamelClusterMember {
+ private AtomicBoolean master = new AtomicBoolean(false);
+
+ void setMaster(boolean master) {
+ if (master && this.master.compareAndSet(false, true)) {
+ LOGGER.debug("Leadership taken for session id {}", sessionId);
+ fireLeadershipChangedEvent(this);
+ return;
+ }
+ if (!master && this.master.compareAndSet(true, false)) {
+ LOGGER.debug("Leadership lost for session id {}", sessionId);
+ fireLeadershipChangedEvent(getMaster().orElse(nullMember));
+ return;
+ }
+ }
+
+ @Override
+ public boolean isMaster() {
+ return master.get();
+ }
+
+ @Override
+ public String getId() {
+ return sessionId;
+ }
+
+ @Override
+ public String toString() {
+ return "ConsulLocalMember{"
+ + "master=" + master
+ + '}';
+ }
+ }
+
+ private final class ConsulClusterMember implements CamelClusterMember {
+ private final String id;
+
+ ConsulClusterMember() {
+ this.id = null;
+ }
+
+ ConsulClusterMember(SessionInfo info) {
+ this(info.getId());
+ }
+
+ ConsulClusterMember(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public boolean isMaster() {
+ if (keyValueClient == null) {
+ return false;
+ }
+ if (id == null) {
+ return false;
+ }
+
+ return id.equals(keyValueClient.getSession(path));
+ }
+
+ @Override
+ public String toString() {
+ return "ConsulClusterMember{"
+ + "id='" + id + '\''
+ + '}';
+ }
+ }
+
+ // *************************************************************************
+ // Watch
+ // *************************************************************************
+
+ private class Watcher implements ConsulResponseCallback<com.google.common.base.Optional<Value>> {
+ private final AtomicReference<BigInteger> index;
+
+ public Watcher() {
+ this.index = new AtomicReference<>(new BigInteger("0"));
+ }
+
+ @Override
+ public void onComplete(ConsulResponse<com.google.common.base.Optional<Value>> consulResponse) {
+ if (isRunAllowed()) {
+ com.google.common.base.Optional<Value> value = consulResponse.getResponse();
+ if (value.isPresent()) {
+ com.google.common.base.Optional<String> sid = value.get().getSession();
+ if (!sid.isPresent()) {
+ // If the key is not held by any session, try acquire a
+ // lock (become leader)
+ boolean lock = keyValueClient.acquireLock(configuration.getRootPath(), sessionId);
+ LOGGER.debug("Try to acquire lock on path '{}' with id '{}', result '{}'", path, sessionId, lock);
+
+ localMember.setMaster(lock);
+ } else {
+ localMember.setMaster(sessionId.equals(sid.get()));
+ }
+ }
+
+ index.set(consulResponse.getIndex());
+ watch();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ LOGGER.debug("", throwable);
+
+ if (sessionId != null) {
+ keyValueClient.releaseLock(configuration.getRootPath(), sessionId);
+ }
+
+ localMember.setMaster(false);
+ watch();
+ }
+
+ public void watch() {
+ if (isRunAllowed()) {
+ // Watch for changes
+ keyValueClient.getValue(
+ path,
+ QueryOptions.blockSeconds(configuration.getSessionRefreshInterval(), index.get()).build(),
+ this
+ );
+
+ // Refresh session
+ sessionClient.renewSession(sessionId);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/828a0953/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyTest.java b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyTest.java
new file mode 100644
index 0000000..cc6381c
--- /dev/null
+++ b/components/camel-consul/src/test/java/org/apache/camel/component/consul/ha/ConsulClusteredRoutePolicyTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.consul.ha;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsulClusteredRoutePolicyTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsulClusteredRoutePolicyTest.class);
+ private static final List<String> CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList());
+ private static final List<String> RESULTS = new ArrayList<>();
+ private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2);
+ private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size());
+
+ // ************************************
+ // Test
+ // ************************************
+
+ @Test
+ public void test() throws Exception {
+ for (String id : CLIENTS) {
+ SCHEDULER.submit(() -> run(id));
+ }
+
+ LATCH.await(1, TimeUnit.MINUTES);
+ SCHEDULER.shutdownNow();
+
+ Assert.assertEquals(CLIENTS.size(), RESULTS.size());
+ Assert.assertTrue(RESULTS.containsAll(CLIENTS));
+ }
+
+ // ************************************
+ // Run a Camel node
+ // ************************************
+
+ private static void run(String id) {
+ try {
+ CountDownLatch contextLatch = new CountDownLatch(1);
+
+ ConsulClusterService service = new ConsulClusterService();
+ service.setId("node-" + id);
+
+ DefaultCamelContext context = new DefaultCamelContext();
+ context.disableJMX();
+ context.setName("context-" + id);
+ context.addService(service);
+ context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("timer:consul?delay=1s&period=1s&repeatCount=1")
+ .routeId("route-" + id)
+ .process(e -> {
+ LOGGER.debug("Node {} done", id);
+ RESULTS.add(id);
+ // Shutdown the context later on to give a chance to
+ // other members to catch-up
+ SCHEDULER.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS);
+ });
+ }
+ });
+
+ // Start the context after some random time so the startup order
+ // changes for each test.
+ Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+ context.start();
+
+ contextLatch.await();
+ context.stop();
+
+ LATCH.countDown();
+ } catch (Exception e) {
+ LOGGER.warn("", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/828a0953/components/camel-consul/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-consul/src/test/resources/log4j2.properties b/components/camel-consul/src/test/resources/log4j2.properties
index 7519263..a1bcd10 100644
--- a/components/camel-consul/src/test/resources/log4j2.properties
+++ b/components/camel-consul/src/test/resources/log4j2.properties
@@ -28,3 +28,8 @@ logger.consul.name = org.apache.camel.component.consul
logger.consul.level = DEBUG
rootLogger.level = INFO
rootLogger.appenderRef.out.ref = file
+
+logger.camel-ha.name = org.apache.camel.ha
+logger.camel-ha.level = DEBUG
+logger.camel-impl-ha.name = org.apache.camel.impl.ha
+logger.camel-impl-ha.level = DEBUG
\ No newline at end of file