You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/08/08 14:43:23 UTC
[05/14] camel git commit: CAMEL-11331: Implemented
KubernetesClusterService
CAMEL-11331: Implemented KubernetesClusterService
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/45481262
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/45481262
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/45481262
Branch: refs/heads/master
Commit: 45481262c44d4e7caa4749725e01687a53916668
Parents: 9fc6d0b
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Fri Jun 30 17:42:33 2017 +0200
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Tue Aug 8 16:39:43 2017 +0200
----------------------------------------------------------------------
components/camel-kubernetes/pom.xml | 6 +-
.../kubernetes/AbstractKubernetesEndpoint.java | 53 +---
.../kubernetes/KubernetesConfiguration.java | 15 +-
.../component/kubernetes/KubernetesHelper.java | 98 +++++++
.../kubernetes/ha/KubernetesClusterService.java | 151 +++++++++++
.../kubernetes/ha/KubernetesClusterView.java | 168 ++++++++++++
.../ha/lock/KubernetesClusterEvent.java | 46 ++++
.../ha/lock/KubernetesClusterEventHandler.java | 27 ++
.../ha/lock/KubernetesLeaderMonitor.java | 256 +++++++++++++++++++
.../ha/lock/KubernetesLeadershipController.java | 211 +++++++++++++++
.../ha/lock/KubernetesLockConfiguration.java | 153 +++++++++++
.../ha/lock/KubernetesMembersMonitor.java | 239 +++++++++++++++++
12 files changed, 1368 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/pom.xml b/components/camel-kubernetes/pom.xml
index e5409c8..c444068 100644
--- a/components/camel-kubernetes/pom.xml
+++ b/components/camel-kubernetes/pom.xml
@@ -44,12 +44,14 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
- <version>${kubernetes-client-version}</version>
+ <version>2.3-SNAPSHOT</version>
+ <!--<version>${kubernetes-client-version}</version>-->
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>openshift-client</artifactId>
- <version>${kubernetes-client-version}</version>
+ <version>2.3-SNAPSHOT</version>
+ <!--<version>${kubernetes-client-version}</version>-->
</dependency>
<!-- testing -->
<dependency>
http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
index f48bf6d..b7aeb37 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/AbstractKubernetesEndpoint.java
@@ -18,14 +18,10 @@ package org.apache.camel.component.kubernetes;
import java.util.concurrent.ExecutorService;
-import io.fabric8.kubernetes.client.Config;
-import io.fabric8.kubernetes.client.ConfigBuilder;
-import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.UriParam;
-import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +50,7 @@ public abstract class AbstractKubernetesEndpoint extends DefaultEndpoint {
@Override
protected void doStart() throws Exception {
super.doStart();
- client = configuration.getKubernetesClient() != null ? configuration.getKubernetesClient() : createKubernetesClient();
+ client = KubernetesHelper.getKubernetesClient(configuration);
}
@Override
@@ -80,52 +76,5 @@ public abstract class AbstractKubernetesEndpoint extends DefaultEndpoint {
return configuration;
}
- private KubernetesClient createKubernetesClient() {
- LOG.debug("Create Kubernetes client with the following Configuration: " + configuration.toString());
- ConfigBuilder builder = new ConfigBuilder();
- builder.withMasterUrl(configuration.getMasterUrl());
- if ((ObjectHelper.isNotEmpty(configuration.getUsername())
- && ObjectHelper.isNotEmpty(configuration.getPassword()))
- && ObjectHelper.isEmpty(configuration.getOauthToken())) {
- builder.withUsername(configuration.getUsername());
- builder.withPassword(configuration.getPassword());
- }
- if (ObjectHelper.isNotEmpty(configuration.getOauthToken())) {
- builder.withOauthToken(configuration.getOauthToken());
- }
- if (ObjectHelper.isNotEmpty(configuration.getCaCertData())) {
- builder.withCaCertData(configuration.getCaCertData());
- }
- if (ObjectHelper.isNotEmpty(configuration.getCaCertFile())) {
- builder.withCaCertFile(configuration.getCaCertFile());
- }
- if (ObjectHelper.isNotEmpty(configuration.getClientCertData())) {
- builder.withClientCertData(configuration.getClientCertData());
- }
- if (ObjectHelper.isNotEmpty(configuration.getClientCertFile())) {
- builder.withClientCertFile(configuration.getClientCertFile());
- }
- if (ObjectHelper.isNotEmpty(configuration.getApiVersion())) {
- builder.withApiVersion(configuration.getApiVersion());
- }
- if (ObjectHelper.isNotEmpty(configuration.getClientKeyAlgo())) {
- builder.withClientKeyAlgo(configuration.getClientKeyAlgo());
- }
- if (ObjectHelper.isNotEmpty(configuration.getClientKeyData())) {
- builder.withClientKeyData(configuration.getClientKeyData());
- }
- if (ObjectHelper.isNotEmpty(configuration.getClientKeyFile())) {
- builder.withClientKeyFile(configuration.getClientKeyFile());
- }
- if (ObjectHelper.isNotEmpty(configuration.getClientKeyPassphrase())) {
- builder.withClientKeyPassphrase(configuration.getClientKeyPassphrase());
- }
- if (ObjectHelper.isNotEmpty(configuration.getTrustCerts())) {
- builder.withTrustCerts(configuration.getTrustCerts());
- }
-
- Config conf = builder.build();
- return new DefaultKubernetesClient(conf);
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java
index 89d0d9a..271ef71 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java
@@ -19,13 +19,14 @@ package org.apache.camel.component.kubernetes;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;
@UriParams
-public class KubernetesConfiguration {
+public class KubernetesConfiguration implements Cloneable {
@UriPath
@Metadata(required = "true")
@@ -395,6 +396,18 @@ public class KubernetesConfiguration {
this.resourceName = resourceName;
}
+ // ****************************************
+ // Copy
+ // ****************************************
+
+ public KubernetesConfiguration copy() {
+ try {
+ return (KubernetesConfiguration) super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeCamelException(e);
+ }
+ }
+
@Override
public String toString() {
return "KubernetesConfiguration [masterUrl=" + masterUrl + ", category=" + category + ", kubernetesClient="
http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesHelper.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesHelper.java
new file mode 100644
index 0000000..62235ad
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesHelper.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes;
+
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.ConfigBuilder;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper moethods for Kubernetes resources.
+ */
+public final class KubernetesHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KubernetesHelper.class);
+
+ private KubernetesHelper() {
+ }
+
+ public static KubernetesClient getKubernetesClient(KubernetesConfiguration configuration) {
+ if (configuration.getKubernetesClient() != null) {
+ return configuration.getKubernetesClient();
+ } else if (configuration.getMasterUrl() != null) {
+ return createKubernetesClient(configuration);
+ } else {
+ LOG.info("Creating default kubernetes client without applying configuration");
+ return new DefaultKubernetesClient();
+ }
+ }
+
+ private static KubernetesClient createKubernetesClient(KubernetesConfiguration configuration) {
+ LOG.debug("Create Kubernetes client with the following Configuration: " + configuration.toString());
+
+ ConfigBuilder builder = new ConfigBuilder();
+ builder.withMasterUrl(configuration.getMasterUrl());
+ if ((ObjectHelper.isNotEmpty(configuration.getUsername())
+ && ObjectHelper.isNotEmpty(configuration.getPassword()))
+ && ObjectHelper.isEmpty(configuration.getOauthToken())) {
+ builder.withUsername(configuration.getUsername());
+ builder.withPassword(configuration.getPassword());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getOauthToken())) {
+ builder.withOauthToken(configuration.getOauthToken());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getCaCertData())) {
+ builder.withCaCertData(configuration.getCaCertData());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getCaCertFile())) {
+ builder.withCaCertFile(configuration.getCaCertFile());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getClientCertData())) {
+ builder.withClientCertData(configuration.getClientCertData());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getClientCertFile())) {
+ builder.withClientCertFile(configuration.getClientCertFile());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getApiVersion())) {
+ builder.withApiVersion(configuration.getApiVersion());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getClientKeyAlgo())) {
+ builder.withClientKeyAlgo(configuration.getClientKeyAlgo());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getClientKeyData())) {
+ builder.withClientKeyData(configuration.getClientKeyData());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getClientKeyFile())) {
+ builder.withClientKeyFile(configuration.getClientKeyFile());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getClientKeyPassphrase())) {
+ builder.withClientKeyPassphrase(configuration.getClientKeyPassphrase());
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getTrustCerts())) {
+ builder.withTrustCerts(configuration.getTrustCerts());
+ }
+
+ Config conf = builder.build();
+ return new DefaultKubernetesClient(conf);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
new file mode 100644
index 0000000..6d87d48
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha;
+
+import java.net.InetAddress;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+import org.apache.camel.component.kubernetes.ha.lock.KubernetesLockConfiguration;
+import org.apache.camel.impl.ha.AbstractCamelClusterService;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * A Kubernetes based cluster service leveraging Kubernetes optimistic locks on resources (specifically ConfigMaps).
+ */
+public class KubernetesClusterService extends AbstractCamelClusterService<KubernetesClusterView> {
+
+ public static final String DEFAULT_CONFIGMAP_NAME = "leaders";
+
+ private KubernetesConfiguration configuration;
+
+ private KubernetesLockConfiguration lockConfiguration;
+
+ public KubernetesClusterService() {
+ this.configuration = new KubernetesConfiguration();
+ this.lockConfiguration = new KubernetesLockConfiguration();
+ }
+
+ public KubernetesClusterService(KubernetesConfiguration configuration) {
+ this.configuration = configuration.copy();
+ this.lockConfiguration = new KubernetesLockConfiguration();
+ }
+
+ public KubernetesClusterService(CamelContext camelContext, KubernetesConfiguration configuration) {
+ super(null, camelContext);
+ this.configuration = configuration.copy();
+ this.lockConfiguration = new KubernetesLockConfiguration();
+ }
+
+ @Override
+ protected KubernetesClusterView createView(String namespace) throws Exception {
+ KubernetesLockConfiguration lockConfig = configWithGroupNameAndDefaults(namespace);
+ return new KubernetesClusterView(this, configuration, lockConfig);
+ }
+
+ protected KubernetesLockConfiguration configWithGroupNameAndDefaults(String groupName) {
+ KubernetesLockConfiguration config = this.lockConfiguration.copy();
+
+ config.setGroupName(ObjectHelper.notNull(groupName, "groupName"));
+
+ // Check defaults (Namespace and podName can be null)
+ if (config.getConfigMapName() == null) {
+ config.setConfigMapName(DEFAULT_CONFIGMAP_NAME);
+ }
+ if (config.getPodName() == null) {
+ config.setPodName(System.getenv("HOSTNAME"));
+ if (config.getPodName() == null) {
+ try {
+ config.setPodName(InetAddress.getLocalHost().getHostName());
+ } catch (Exception e) {
+ throw new RuntimeCamelException("Unable to determine pod name", e);
+ }
+ }
+ }
+
+ return config;
+ }
+
+ public String getMasterUrl() {
+ return configuration.getMasterUrl();
+ }
+
+ /**
+ * Set the URL of the Kubernetes master (read from Kubernetes client properties by default).
+ */
+ public void setMasterUrl(String masterUrl) {
+ configuration.setMasterUrl(masterUrl);
+ }
+
+ public String getKubernetesNamespace() {
+ return this.lockConfiguration.getKubernetesResourcesNamespace();
+ }
+
+ /**
+ * Set the name of the Kubernetes namespace containing the pods and the configmap (autodetected by default)
+ */
+ public void setKubernetesNamespace(String kubernetesNamespace) {
+ this.lockConfiguration.setKubernetesResourcesNamespace(kubernetesNamespace);
+ }
+
+ public String getConfigMapName() {
+ return this.lockConfiguration.getConfigMapName();
+ }
+
+ /**
+ * Set the name of the ConfigMap used to do optimistic locking (defaults to 'leaders').
+ */
+ public void setConfigMapName(String configMapName) {
+ this.lockConfiguration.setConfigMapName(configMapName);
+ }
+
+ public String getPodName() {
+ return this.lockConfiguration.getPodName();
+ }
+
+ /**
+ * Set the name of the current pod (autodetected from container host name by default).
+ */
+ public void setPodName(String podName) {
+ this.lockConfiguration.setPodName(podName);
+ }
+
+ public Map<String, String> getClusterLabels() {
+ return lockConfiguration.getClusterLabels();
+ }
+
+ /**
+ * Set the labels used to identify the pods composing the cluster.
+ */
+ public void setClusterLabels(Map<String, String> clusterLabels) {
+ lockConfiguration.setClusterLabels(clusterLabels);
+ }
+
+ public Long getWatchRefreshIntervalSeconds() {
+ return lockConfiguration.getWatchRefreshIntervalSeconds();
+ }
+
+ /**
+ * Indicates the maximum amount of time a Kubernetes watch should be kept active, before being recreated.
+ * Watch recreation can be disabled by putting a negative value (the default will be used in case of null).
+ */
+ public void setWatchRefreshIntervalSeconds(Long watchRefreshIntervalSeconds) {
+ lockConfiguration.setWatchRefreshIntervalSeconds(watchRefreshIntervalSeconds);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
new file mode 100644
index 0000000..9ac6a86
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+import org.apache.camel.component.kubernetes.KubernetesHelper;
+import org.apache.camel.component.kubernetes.ha.lock.KubernetesClusterEvent;
+import org.apache.camel.component.kubernetes.ha.lock.KubernetesLeadershipController;
+import org.apache.camel.component.kubernetes.ha.lock.KubernetesLockConfiguration;
+import org.apache.camel.ha.CamelClusterMember;
+import org.apache.camel.impl.ha.AbstractCamelClusterView;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * The cluster view on a specific Camel cluster namespace (not to be confused with Kubernetes namespaces).
+ * Namespaces are represented as keys in a Kubernetes ConfigMap (values are the current leader pods).
+ */
+public class KubernetesClusterView extends AbstractCamelClusterView {
+
+ private KubernetesClient kubernetesClient;
+
+ private KubernetesConfiguration configuration;
+
+ private KubernetesLockConfiguration lockConfiguration;
+
+ private KubernetesClusterMember localMember;
+
+ private Map<String, KubernetesClusterMember> memberCache;
+
+ private volatile Optional<CamelClusterMember> currentLeader = Optional.empty();
+
+ private volatile List<CamelClusterMember> currentMembers = Collections.emptyList();
+
+ private KubernetesLeadershipController controller;
+
+ public KubernetesClusterView(KubernetesClusterService cluster, KubernetesConfiguration configuration, KubernetesLockConfiguration lockConfiguration) {
+ super(cluster, lockConfiguration.getGroupName());
+ this.configuration = configuration;
+ this.lockConfiguration = lockConfiguration;
+ this.localMember = new KubernetesClusterMember(lockConfiguration.getPodName());
+ this.memberCache = new HashMap<>();
+ }
+
+ @Override
+ public Optional<CamelClusterMember> getMaster() {
+ return currentLeader;
+ }
+
+ @Override
+ public CamelClusterMember getLocalMember() {
+ return localMember;
+ }
+
+ @Override
+ public List<CamelClusterMember> getMembers() {
+ return currentMembers;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (controller == null) {
+ this.kubernetesClient = KubernetesHelper.getKubernetesClient(configuration);
+
+ controller = new KubernetesLeadershipController(kubernetesClient, this.lockConfiguration, event -> {
+ if (event instanceof KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) {
+ // New leader
+ Optional<String> leader = KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(event).getData();
+ currentLeader = leader.map(this::toMember);
+ if (currentLeader.isPresent()) {
+ fireLeadershipChangedEvent(currentLeader.get());
+ }
+ } else if (event instanceof KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) {
+ Set<String> members = KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent.class.cast(event).getData();
+ Set<String> oldMembers = currentMembers.stream().map(CamelClusterMember::getId).collect(Collectors.toSet());
+ currentMembers = members.stream().map(this::toMember).collect(Collectors.toList());
+
+ // Computing differences
+ Set<String> added = new HashSet<>(members);
+ added.removeAll(oldMembers);
+
+ Set<String> removed = new HashSet<>(oldMembers);
+ removed.removeAll(members);
+
+ for (String id : added) {
+ fireMemberAddedEvent(toMember(id));
+ }
+
+ for (String id : removed) {
+ fireMemberRemovedEvent(toMember(id));
+ }
+ }
+ });
+
+ controller.start();
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (controller != null) {
+ controller.stop();
+ controller = null;
+ kubernetesClient.close();
+ kubernetesClient = null;
+ }
+ }
+
+ protected KubernetesClusterMember toMember(String name) {
+ if (name.equals(localMember.getId())) {
+ return localMember;
+ }
+ return memberCache.computeIfAbsent(name, KubernetesClusterMember::new);
+ }
+
+ class KubernetesClusterMember implements CamelClusterMember {
+
+ private String podName;
+
+ public KubernetesClusterMember(String podName) {
+ this.podName = ObjectHelper.notNull(podName, "podName");
+ }
+
+ @Override
+ public boolean isMaster() {
+ return currentLeader.isPresent() && currentLeader.get().getId().equals(podName);
+ }
+
+ @Override
+ public String getId() {
+ return podName;
+ }
+
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("KubernetesClusterMember{");
+ sb.append("podName='").append(podName).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEvent.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEvent.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEvent.java
new file mode 100644
index 0000000..59f8768
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEvent.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Super interface for events produced by the Kubernetes cluster.
+ */
+@FunctionalInterface
+public interface KubernetesClusterEvent {
+
+ Object getData();
+
+ /**
+ * Event signalling that the list of members of the Kubernetes cluster has changed.
+ */
+ interface KubernetesClusterMemberListChangedEvent extends KubernetesClusterEvent {
+ @Override
+ Set<String> getData();
+ }
+
+ /**
+ * Event signalling the presence of a new leader.
+ */
+ interface KubernetesClusterLeaderChangedEvent extends KubernetesClusterEvent {
+ @Override
+ Optional<String> getData();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEventHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEventHandler.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEventHandler.java
new file mode 100644
index 0000000..0962847
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesClusterEventHandler.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+/**
+ * Interface for handling Kubernetes cluster events.
+ */
+@FunctionalInterface
+public interface KubernetesClusterEventHandler {
+
+ void onKubernetesClusterEvent(KubernetesClusterEvent event);
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java
new file mode 100644
index 0000000..5555fe1
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+
+import org.apache.camel.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Monitors continuously the configmap to detect changes in leadership.
+ * It calls the callback eventHandlers only when the leader changes w.r.t. the previous invocation.
+ */
+class KubernetesLeaderMonitor implements Service {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderMonitor.class);
+
+ private ScheduledExecutorService serializedExecutor;
+
+ private KubernetesClient kubernetesClient;
+
+ private KubernetesLockConfiguration lockConfiguration;
+
+ private List<KubernetesClusterEventHandler> eventHandlers;
+
+ private Watch watch;
+
+ private boolean terminated;
+
+ private boolean refreshing;
+
+ private ConfigMap latestConfigMap;
+
+ public KubernetesLeaderMonitor(ScheduledExecutorService serializedExecutor, KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration) {
+ this.serializedExecutor = serializedExecutor;
+ this.kubernetesClient = kubernetesClient;
+ this.lockConfiguration = lockConfiguration;
+ this.eventHandlers = new LinkedList<>();
+ }
+
+ public void addClusterEventHandler(KubernetesClusterEventHandler leaderEventHandler) {
+ this.eventHandlers.add(leaderEventHandler);
+ }
+
+ @Override
+ public void start() throws Exception {
+ this.terminated = false;
+ serializedExecutor.execute(this::startWatch);
+ serializedExecutor.execute(() -> doPoll(true));
+
+ long recreationDelay = lockConfiguration.getWatchRefreshIntervalSecondsOrDefault();
+ if (recreationDelay > 0) {
+ serializedExecutor.scheduleWithFixedDelay(this::refresh, recreationDelay, recreationDelay, TimeUnit.SECONDS);
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ this.terminated = true;
+ Watch watch = this.watch;
+ if (watch != null) {
+ watch.close();
+ }
+ }
+
+ public void refresh() {
+ serializedExecutor.execute(() -> {
+ if (!terminated) {
+ refreshing = true;
+ try {
+ doPoll(false);
+
+ Watch w = this.watch;
+ if (w != null) {
+ // It will be recreated
+ w.close();
+ }
+ } finally {
+ refreshing = false;
+ }
+ }
+ });
+ }
+
+ private void startWatch() {
+ try {
+ LOG.debug("Starting ConfigMap watcher for monitoring the current leader");
+ this.watch = kubernetesClient.configMaps()
+ .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+ .withName(this.lockConfiguration.getConfigMapName())
+ .watch(new Watcher<ConfigMap>() {
+
+ @Override
+ public void eventReceived(Action action, ConfigMap configMap) {
+ switch (action) {
+ case MODIFIED:
+ case DELETED:
+ case ADDED:
+ LOG.debug("Received update from watch on ConfigMap {}", configMap);
+ serializedExecutor.execute(() -> checkAndNotify(configMap));
+ break;
+ default:
+ }
+ }
+
+ @Override
+ public void onClose(KubernetesClientException e) {
+ if (!terminated) {
+ KubernetesLeaderMonitor.this.watch = null;
+ if (refreshing) {
+ LOG.info("Refreshing ConfigMap watcher...");
+ serializedExecutor.execute(KubernetesLeaderMonitor.this::startWatch);
+ } else {
+ LOG.warn("ConfigMap watcher has been closed unexpectedly. Recreating it in 1 second...", e);
+ serializedExecutor.schedule(KubernetesLeaderMonitor.this::startWatch, 1, TimeUnit.SECONDS);
+ }
+ }
+ }
+ });
+ } catch (Exception ex) {
+ LOG.warn("Unable to watch for configmap changes. Retrying in 5 seconds...");
+ LOG.debug("Error while trying to watch the configmap", ex);
+
+ this.serializedExecutor.schedule(this::startWatch, 5, TimeUnit.SECONDS);
+ }
+ }
+
+ private void doPoll(boolean retry) {
+ LOG.debug("Starting poll to get configmap {}", this.lockConfiguration.getConfigMapName());
+ ConfigMap configMap;
+ try {
+ configMap = pollConfigMap();
+ } catch (Exception ex) {
+ if (retry) {
+ LOG.warn("ConfigMap poll failed. Retrying in 5 seconds...", ex);
+ this.serializedExecutor.schedule(() -> doPoll(true), 5, TimeUnit.SECONDS);
+ } else {
+ LOG.warn("ConfigMap poll failed", ex);
+ }
+ return;
+ }
+
+ checkAndNotify(configMap);
+ }
+
+ private void checkAndNotify(ConfigMap candidateConfigMap) {
+ LOG.debug("Checking configMap {}", candidateConfigMap);
+ ConfigMap newConfigMap = newest(this.latestConfigMap, candidateConfigMap);
+ Optional<String> leader = extractLeader(newConfigMap);
+ Optional<String> oldLeader = extractLeader(this.latestConfigMap);
+
+ this.latestConfigMap = newConfigMap;
+
+ LOG.debug("The new leader is {}. Old leader was {}", leader, oldLeader);
+ if (!leader.equals(oldLeader)) {
+ LOG.debug("Notifying the new leader to all eventHandlers");
+ for (KubernetesClusterEventHandler eventHandler : eventHandlers) {
+ eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) () -> leader);
+ }
+ } else {
+ LOG.debug("Leader has not changed");
+ }
+ }
+
+ private ConfigMap pollConfigMap() {
+ return kubernetesClient.configMaps()
+ .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+ .withName(this.lockConfiguration.getConfigMapName())
+ .get();
+ }
+
+ private Optional<String> extractLeader(ConfigMap configMap) {
+ Optional<String> leader = Optional.empty();
+ if (configMap != null && configMap.getData() != null) {
+ leader = Optional.ofNullable(configMap.getData().get(this.lockConfiguration.getGroupName()));
+ }
+ return leader;
+ }
+
+ private ConfigMap newest(ConfigMap configMap1, ConfigMap configMap2) {
+ ConfigMap newest = null;
+
+ if (configMap1 != null && configMap2 == null) {
+ newest = configMap1;
+ } else if (configMap1 == null && configMap2 != null) {
+ newest = configMap2;
+ }
+
+ if (newest == null) {
+ String rv1 = extractResourceVersion(configMap1);
+ String rv2 = extractResourceVersion(configMap2);
+ newest = newest(configMap1, configMap2, rv1, rv2);
+ }
+
+ if (newest == null) {
+ String ct1 = extractCreationTimestamp(configMap1);
+ String ct2 = extractCreationTimestamp(configMap2);
+ // timestamps are string-comparable
+ newest = newest(configMap1, configMap2, ct1, ct2);
+ }
+
+ return newest;
+ }
+
+ private <T extends Comparable<T>> ConfigMap newest(ConfigMap configMap1, ConfigMap configMap2, T cmp1, T cmp2) {
+ if (cmp1 != null && cmp2 != null) {
+ int comp = cmp1.compareTo(cmp2);
+ if (comp > 0) {
+ return configMap1;
+ } else {
+ return configMap2;
+ }
+ }
+ return null;
+ }
+
+ private String extractResourceVersion(ConfigMap configMap) {
+ if (configMap != null && configMap.getMetadata() != null) {
+ return configMap.getMetadata().getResourceVersion();
+ }
+ return null;
+ }
+
+ private String extractCreationTimestamp(ConfigMap configMap) {
+ if (configMap != null && configMap.getMetadata() != null) {
+ return configMap.getMetadata().getCreationTimestamp();
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
new file mode 100644
index 0000000..ad2f9bc
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import org.apache.camel.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Start the monitors and participate to leader election when no active leaders are present.
+ * It communicates changes in leadership and cluster members to the given event handler.
+ */
+public class KubernetesLeadershipController implements Service {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeadershipController.class);
+
+ private KubernetesClient kubernetesClient;
+
+ private KubernetesLockConfiguration lockConfiguration;
+
+ private ScheduledExecutorService executor;
+
+ private KubernetesLeaderMonitor leaderMonitor;
+
+ private KubernetesMembersMonitor membersMonitor;
+
+ private Optional<String> currentLeader;
+
+ private Set<String> currentMembers;
+
+ private KubernetesClusterEventHandler eventHandler;
+
+ public KubernetesLeadershipController(KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler eventHandler) {
+
+ this.kubernetesClient = kubernetesClient;
+ this.lockConfiguration = lockConfiguration;
+ this.eventHandler = eventHandler;
+
+ this.currentLeader = Optional.empty();
+ this.currentMembers = Collections.emptySet();
+ }
+
+ @Override
+ public void start() throws Exception {
+
+ if (executor == null) {
+ executor = Executors.newSingleThreadScheduledExecutor(); // No concurrency
+ leaderMonitor = new KubernetesLeaderMonitor(this.executor, this.kubernetesClient, this.lockConfiguration);
+ membersMonitor = new KubernetesMembersMonitor(this.executor, this.kubernetesClient, this.lockConfiguration);
+
+ leaderMonitor.addClusterEventHandler(e -> executor.execute(() -> onLeaderChanged(e)));
+ if (eventHandler != null) {
+ leaderMonitor.addClusterEventHandler(eventHandler);
+ }
+
+ membersMonitor.addClusterEventHandler(e -> executor.execute(() -> onMembersChanged(e)));
+ if (eventHandler != null) {
+ membersMonitor.addClusterEventHandler(eventHandler);
+ }
+
+ // Start all services
+ leaderMonitor.start();
+ membersMonitor.start();
+
+ // Fire a new election if possible
+ executor.execute(this::runLeaderElection);
+ }
+
+ }
+
+ @Override
+ public void stop() throws Exception {
+ if (executor != null) {
+ membersMonitor.stop();
+ leaderMonitor.stop();
+ executor.shutdown();
+ executor.shutdownNow();
+
+ membersMonitor = null;
+ leaderMonitor = null;
+ executor = null;
+ }
+ }
+
+ private void onLeaderChanged(KubernetesClusterEvent e) {
+ Optional<String> newLeader = KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(e).getData();
+ if (!newLeader.isPresent()) {
+ executor.execute(this::tryLeaderElection);
+ }
+ this.currentLeader = newLeader;
+ }
+
+ private void onMembersChanged(KubernetesClusterEvent e) {
+ Set<String> newMembers = KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent.class.cast(e).getData();
+ if (currentLeader.isPresent()) {
+ // Check if the current leader is still present in the list
+ if (!newMembers.contains(currentLeader.get()) && currentMembers.contains(currentLeader.get())) {
+ executor.execute(this::runLeaderElection);
+ }
+ }
+ this.currentMembers = newMembers;
+ }
+
+ private void runLeaderElection() {
+ boolean finished = false;
+ try {
+ finished = tryLeaderElection();
+ } catch (Exception ex) {
+ LOG.warn("Exception while trying to acquire the leadership", ex);
+ }
+
+ if (!finished) {
+ executor.schedule(this::runLeaderElection, 1, TimeUnit.SECONDS);
+ }
+ }
+
+ private boolean tryLeaderElection() {
+ LOG.debug("Starting leader election");
+ if (!currentMembers.contains(this.lockConfiguration.getPodName())) {
+ LOG.debug("The current pod ({}) is not in the list of participating pods {}. Cannot participate to the election", this.lockConfiguration.getPodName(), currentMembers);
+ return false;
+ }
+
+ ConfigMap configMap = kubernetesClient.configMaps()
+ .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+ .withName(this.lockConfiguration.getConfigMapName())
+ .get();
+
+ if (configMap == null) {
+ // No configmap created so far
+ LOG.info("Lock configmap is not present in the Kubernetes namespace. A new ConfigMap will be created");
+
+ ConfigMap newConfigMap = new ConfigMapBuilder().
+ withNewMetadata()
+ .withName(this.lockConfiguration.getConfigMapName())
+ .addToLabels("provider", "camel")
+ .addToLabels("kind", "locks").
+ endMetadata()
+ .addToData(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName())
+ .build();
+
+ try {
+ kubernetesClient.configMaps()
+ .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+ .create(newConfigMap);
+ } catch (Exception ex) {
+ // Suppress exception
+ LOG.warn("Unable to create the ConfigMap, it may have been created by other cluster members concurrently. If the problem persists, check if the service account has the right "
+ + "permissions to create it");
+ LOG.debug("Exception while trying to create the ConfigMap", ex);
+ return false;
+ }
+ return true;
+ } else {
+ LOG.info("Lock configmap already present in the Kubernetes namespace. Checking...");
+ Map<String, String> leaders = configMap.getData();
+ Optional<String> oldLeader = leaders != null ? Optional.ofNullable(leaders.get(this.lockConfiguration.getGroupName())) : Optional.empty();
+
+ boolean noLeaderPresent = !oldLeader.isPresent() || !currentMembers.contains(oldLeader.get());
+ boolean alreadyLeader = oldLeader.isPresent() && oldLeader.get().equals(this.lockConfiguration.getPodName());
+
+ if (noLeaderPresent && !alreadyLeader) {
+ LOG.info("Trying to acquire the lock in configmap={}, key={}", this.lockConfiguration.getConfigMapName(), this.lockConfiguration.getGroupName());
+ ConfigMap newConfigMap = new ConfigMapBuilder(configMap)
+ .addToData(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName())
+ .build();
+
+ kubernetesClient.configMaps()
+ .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+ .withName(this.lockConfiguration.getConfigMapName())
+ .lockResourceVersion(configMap.getMetadata().getResourceVersion())
+ .replace(newConfigMap);
+
+ LOG.info("Lock acquired for configmap={}, key={}", this.lockConfiguration.getConfigMapName(), this.lockConfiguration.getGroupName());
+ } else if (!noLeaderPresent) {
+ LOG.info("A leader is already present for configmap={}, key={}: {}", this.lockConfiguration.getConfigMapName(), this.lockConfiguration.getGroupName(), oldLeader);
+ } else {
+ LOG.info("This pod ({}) is already the leader for configmap={}, key={}", this.lockConfiguration.getPodName(), this.lockConfiguration.getConfigMapName(), this.lockConfiguration
+ .getGroupName());
+ }
+ return true;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
new file mode 100644
index 0000000..f203c0a
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+/**
+ * Configuration for Kubernetes Lock.
+ */
+public class KubernetesLockConfiguration implements Cloneable {
+
+ private static final long DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS = 1800;
+
+ /**
+ * Kubernetes namespace containing the pods and the ConfigMap used for locking.
+ */
+ private String kubernetesResourcesNamespace;
+
+ /**
+ * Name of the ConfigMap used for locking.
+ */
+ private String configMapName;
+
+ /**
+ * Name of the lock group (or namespace according to the Camel cluster convention) within the chosen ConfgMap.
+ */
+ private String groupName;
+
+ /**
+ * Name of the current pod (defaults to host name).
+ */
+ private String podName;
+
+ /**
+ * Labels used to identify the members of the cluster.
+ */
+ private Map<String, String> clusterLabels = new HashMap<>();
+
+ /**
+ * Indicates the maximum amount of time a Kubernetes watch should be kept active, before being recreated.
+ * Watch recreation can be disabled by putting a negative value (the default will be used in case of null).
+ */
+ private Long watchRefreshIntervalSeconds;
+
+ public KubernetesLockConfiguration() {
+ }
+
+ public String getKubernetesResourcesNamespaceOrDefault(KubernetesClient kubernetesClient) {
+ if (kubernetesResourcesNamespace != null) {
+ return kubernetesResourcesNamespace;
+ }
+ return kubernetesClient.getNamespace();
+ }
+
+ public String getKubernetesResourcesNamespace() {
+ return kubernetesResourcesNamespace;
+ }
+
+ public void setKubernetesResourcesNamespace(String kubernetesResourcesNamespace) {
+ this.kubernetesResourcesNamespace = kubernetesResourcesNamespace;
+ }
+
+ public String getConfigMapName() {
+ return configMapName;
+ }
+
+ public void setConfigMapName(String configMapName) {
+ this.configMapName = configMapName;
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public void setGroupName(String groupName) {
+ this.groupName = groupName;
+ }
+
+ public String getPodName() {
+ return podName;
+ }
+
+ public void setPodName(String podName) {
+ this.podName = podName;
+ }
+
+ public Map<String, String> getClusterLabels() {
+ return clusterLabels;
+ }
+
+ public void addToClusterLabels(String key, String value) {
+ this.clusterLabels.put(key, value);
+ }
+
+ public void setClusterLabels(Map<String, String> clusterLabels) {
+ this.clusterLabels = clusterLabels;
+ }
+
+ public Long getWatchRefreshIntervalSeconds() {
+ return watchRefreshIntervalSeconds;
+ }
+
+ public long getWatchRefreshIntervalSecondsOrDefault() {
+ Long interval = watchRefreshIntervalSeconds;
+ if (interval == null) {
+ interval = DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS;
+ }
+ return interval;
+ }
+
+ public void setWatchRefreshIntervalSeconds(Long watchRefreshIntervalSeconds) {
+ this.watchRefreshIntervalSeconds = watchRefreshIntervalSeconds;
+ }
+
+ public KubernetesLockConfiguration copy() {
+ try {
+ KubernetesLockConfiguration copy = (KubernetesLockConfiguration) this.clone();
+ return copy;
+ } catch (CloneNotSupportedException e) {
+ throw new IllegalStateException("Cannot clone", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("KubernetesLockConfiguration{");
+ sb.append("kubernetesResourcesNamespace='").append(kubernetesResourcesNamespace).append('\'');
+ sb.append(", configMapName='").append(configMapName).append('\'');
+ sb.append(", groupName='").append(groupName).append('\'');
+ sb.append(", podName='").append(podName).append('\'');
+ sb.append(", clusterLabels=").append(clusterLabels);
+ sb.append(", watchRefreshIntervalSeconds=").append(watchRefreshIntervalSeconds);
+ sb.append('}');
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/45481262/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
new file mode 100644
index 0000000..d9173b2
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+
+import org.apache.camel.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Monitors the list of participants in a leader election and provides the most recently updated list.
+ * It calls the callback eventHandlers only when the member set changes w.r.t. the previous invocation.
+ */
+class KubernetesMembersMonitor implements Service {
+
+ private static final long DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS = 1800;
+
+ private static final Logger LOG = LoggerFactory.getLogger(KubernetesMembersMonitor.class);
+
+ private ScheduledExecutorService serializedExecutor;
+
+ private KubernetesClient kubernetesClient;
+
+ private KubernetesLockConfiguration lockConfiguration;
+
+ private List<KubernetesClusterEventHandler> eventHandlers;
+
+ private Watch watch;
+
+ private boolean terminated;
+
+ private boolean refreshing;
+
+ private Set<String> previousMembers = new HashSet<>();
+
+ private Set<String> basePoll = new HashSet<>();
+ private Set<String> deleted = new HashSet<>();
+ private Set<String> added = new HashSet<>();
+
+ public KubernetesMembersMonitor(ScheduledExecutorService serializedExecutor, KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration) {
+ this.serializedExecutor = serializedExecutor;
+ this.kubernetesClient = kubernetesClient;
+ this.lockConfiguration = lockConfiguration;
+ this.eventHandlers = new LinkedList<>();
+ }
+
+ public void addClusterEventHandler(KubernetesClusterEventHandler eventHandler) {
+ this.eventHandlers.add(eventHandler);
+ }
+
+ @Override
+ public void start() throws Exception {
+ serializedExecutor.execute(() -> doPoll(true));
+ serializedExecutor.execute(this::createWatch);
+
+ long recreationDelay = lockConfiguration.getWatchRefreshIntervalSecondsOrDefault();
+ if (recreationDelay > 0) {
+ serializedExecutor.scheduleWithFixedDelay(this::refresh, recreationDelay, recreationDelay, TimeUnit.SECONDS);
+ }
+ }
+
+ private void createWatch() {
+ try {
+ LOG.debug("Starting cluster members watcher");
+ this.watch = kubernetesClient.pods()
+ .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+ .withLabels(this.lockConfiguration.getClusterLabels())
+ .watch(new Watcher<Pod>() {
+
+ @Override
+ public void eventReceived(Action action, Pod pod) {
+ switch (action) {
+ case DELETED:
+ serializedExecutor.execute(() -> deleteAndNotify(podName(pod)));
+ break;
+ case ADDED:
+ serializedExecutor.execute(() -> addAndNotify(podName(pod)));
+ break;
+ default:
+ }
+ }
+
+ @Override
+ public void onClose(KubernetesClientException e) {
+ if (!terminated) {
+ KubernetesMembersMonitor.this.watch = null;
+ if (refreshing) {
+ LOG.info("Refreshing pod list watcher...");
+ serializedExecutor.execute(KubernetesMembersMonitor.this::createWatch);
+ } else {
+ LOG.warn("Pod list watcher has been closed unexpectedly. Recreating it in 1 second...", e);
+ serializedExecutor.schedule(KubernetesMembersMonitor.this::createWatch, 1, TimeUnit.SECONDS);
+ }
+ }
+ }
+ });
+ } catch (Exception ex) {
+ LOG.warn("Unable to watch for pod list changes. Retrying in 5 seconds...");
+ LOG.debug("Error while trying to watch the pod list", ex);
+
+ serializedExecutor.schedule(this::createWatch, 5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ this.terminated = true;
+ Watch watch = this.watch;
+ if (watch != null) {
+ watch.close();
+ }
+ }
+
+ public void refresh() {
+ serializedExecutor.execute(() -> {
+ if (!terminated) {
+ refreshing = true;
+ try {
+ doPoll(false);
+
+ Watch w = this.watch;
+ if (w != null) {
+ // It will be recreated
+ w.close();
+ }
+ } finally {
+ refreshing = false;
+ }
+ }
+ });
+ }
+
+ private void doPoll(boolean retry) {
+ LOG.debug("Starting poll to get all cluster members");
+ List<Pod> pods;
+ try {
+ pods = pollPods();
+ } catch (Exception ex) {
+ if (retry) {
+ LOG.warn("Pods poll failed. Retrying in 5 seconds...", ex);
+ this.serializedExecutor.schedule(() -> doPoll(true), 5, TimeUnit.SECONDS);
+ } else {
+ LOG.warn("Pods poll failed", ex);
+ }
+ return;
+ }
+
+ this.basePoll = pods.stream()
+ .map(p -> Optional.ofNullable(podName(p)))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toSet());
+
+ this.added = new HashSet<>();
+ this.deleted = new HashSet<>();
+
+ LOG.debug("Base list of members is {}", this.basePoll);
+
+ checkAndNotify();
+ }
+
+ private List<Pod> pollPods() {
+ return kubernetesClient.pods()
+ .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+ .withLabels(this.lockConfiguration.getClusterLabels())
+ .list().getItems();
+ }
+
+ private String podName(Pod pod) {
+ if (pod != null && pod.getMetadata() != null) {
+ return pod.getMetadata().getName();
+ }
+ return null;
+ }
+
+ private void checkAndNotify() {
+ Set<String> newMembers = new HashSet<>(basePoll);
+ newMembers.addAll(added);
+ newMembers.removeAll(deleted);
+
+ LOG.debug("Current list of members is: {}", newMembers);
+
+ if (!newMembers.equals(this.previousMembers)) {
+ LOG.debug("List of members changed: sending notifications");
+ this.previousMembers = newMembers;
+
+ for (KubernetesClusterEventHandler eventHandler : eventHandlers) {
+ eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) () -> newMembers);
+ }
+ } else {
+ LOG.debug("List of members has not changed");
+ }
+ }
+
+ private void addAndNotify(String member) {
+ LOG.debug("Adding new member to the list: {}", member);
+ if (member != null) {
+ this.added.add(member);
+ checkAndNotify();
+ }
+ }
+
+ private void deleteAndNotify(String member) {
+ LOG.debug("Deleting member to the list: {}", member);
+ if (member != null) {
+ this.deleted.add(member);
+ checkAndNotify();
+ }
+ }
+
+}