You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/08/08 14:43:24 UTC
[06/14] camel git commit: CAMEL-11331: Lease based implementation of
Kubernetes lock
CAMEL-11331: Lease based implementation of Kubernetes lock
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e4cab329
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e4cab329
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e4cab329
Branch: refs/heads/master
Commit: e4cab32911c2d825568d1de93fbf42ccbc341c92
Parents: 4548126
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Fri Jul 7 17:05:31 2017 +0200
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Tue Aug 8 16:39:43 2017 +0200
----------------------------------------------------------------------
.../kubernetes/ha/KubernetesClusterService.java | 117 +++++-
.../kubernetes/ha/KubernetesClusterView.java | 6 +-
.../kubernetes/ha/lock/ConfigMapLockUtils.java | 106 ++++++
.../ha/lock/KubernetesLeaderMonitor.java | 256 -------------
.../ha/lock/KubernetesLeadershipController.java | 211 -----------
...ubernetesLeaseBasedLeadershipController.java | 374 +++++++++++++++++++
.../ha/lock/KubernetesLockConfiguration.java | 99 ++++-
.../ha/lock/KubernetesMembersMonitor.java | 4 +-
.../kubernetes/ha/lock/LeaderInfo.java | 90 +++++
9 files changed, 767 insertions(+), 496 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
index 6d87d48..a868d16 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterService.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.kubernetes.ha;
import java.net.InetAddress;
import java.util.Map;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
import org.apache.camel.CamelContext;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.kubernetes.KubernetesConfiguration;
@@ -31,8 +33,6 @@ import org.apache.camel.util.ObjectHelper;
*/
public class KubernetesClusterService extends AbstractCamelClusterService<KubernetesClusterView> {
- public static final String DEFAULT_CONFIGMAP_NAME = "leaders";
-
private KubernetesConfiguration configuration;
private KubernetesLockConfiguration lockConfiguration;
@@ -64,10 +64,7 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern
config.setGroupName(ObjectHelper.notNull(groupName, "groupName"));
- // Check defaults (Namespace and podName can be null)
- if (config.getConfigMapName() == null) {
- config.setConfigMapName(DEFAULT_CONFIGMAP_NAME);
- }
+ // Determine the pod name if not provided
if (config.getPodName() == null) {
config.setPodName(System.getenv("HOSTNAME"));
if (config.getPodName() == null) {
@@ -79,6 +76,33 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern
}
}
+ ObjectHelper.notNull(config.getConfigMapName(), "configMapName");
+ ObjectHelper.notNull(config.getClusterLabels(), "clusterLabels");
+
+ if (config.getJitterFactor() < 1) {
+ throw new IllegalStateException("jitterFactor must be >= 1 (found: " + config.getJitterFactor() + ")");
+ }
+ if (config.getRetryOnErrorIntervalSeconds() <= 0) {
+ throw new IllegalStateException("retryOnErrorIntervalSeconds must be > 0 (found: " + config.getRetryOnErrorIntervalSeconds() + ")");
+ }
+ if (config.getRetryPeriodSeconds() <= 0) {
+ throw new IllegalStateException("retryPeriodSeconds must be > 0 (found: " + config.getRetryPeriodSeconds() + ")");
+ }
+ if (config.getRenewDeadlineSeconds() <= 0) {
+ throw new IllegalStateException("renewDeadlineSeconds must be > 0 (found: " + config.getRenewDeadlineSeconds() + ")");
+ }
+ if (config.getLeaseDurationSeconds() <= 0) {
+ throw new IllegalStateException("leaseDurationSeconds must be > 0 (found: " + config.getLeaseDurationSeconds() + ")");
+ }
+ if (config.getLeaseDurationSeconds() <= config.getRenewDeadlineSeconds()) {
+ throw new IllegalStateException("leaseDurationSeconds must be greater than renewDeadlineSeconds "
+ + "(" + config.getLeaseDurationSeconds() + " is not greater than " + config.getRenewDeadlineSeconds() + ")");
+ }
+ if (config.getRenewDeadlineSeconds() <= config.getJitterFactor() * config.getRetryPeriodSeconds()) {
+ throw new IllegalStateException("renewDeadlineSeconds must be greater than jitterFactor*retryPeriodSeconds "
+ + "(" + config.getRenewDeadlineSeconds() + " is not greater than " + config.getJitterFactor() + "*" + config.getRetryPeriodSeconds() + ")");
+ }
+
return config;
}
@@ -137,15 +161,88 @@ public class KubernetesClusterService extends AbstractCamelClusterService<Kubern
lockConfiguration.setClusterLabels(clusterLabels);
}
- public Long getWatchRefreshIntervalSeconds() {
- return lockConfiguration.getWatchRefreshIntervalSeconds();
+ public void addToClusterLabels(String key, String value) {
+ lockConfiguration.addToClusterLabels(key, value);
+ }
+
+ public String getKubernetesResourcesNamespace() {
+ return lockConfiguration.getKubernetesResourcesNamespace();
+ }
+
+ /**
+ * Kubernetes namespace containing the pods and the ConfigMap used for locking.
+ */
+ public void setKubernetesResourcesNamespace(String kubernetesResourcesNamespace) {
+ lockConfiguration.setKubernetesResourcesNamespace(kubernetesResourcesNamespace);
+ }
+
+ public long getRetryOnErrorIntervalSeconds() {
+ return lockConfiguration.getRetryOnErrorIntervalSeconds();
}
/**
* Indicates the maximum amount of time a Kubernetes watch should be kept active, before being recreated.
- * Watch recreation can be disabled by putting a negative value (the default will be used in case of null).
+ * Watch recreation can be disabled by putting value <= 0.
+ */
+ public void setRetryOnErrorIntervalSeconds(long retryOnErrorIntervalSeconds) {
+ lockConfiguration.setRetryOnErrorIntervalSeconds(retryOnErrorIntervalSeconds);
+ }
+
+ public double getJitterFactor() {
+ return lockConfiguration.getJitterFactor();
+ }
+
+ /**
+ * A jitter factor to apply in order to prevent all pods to try to become leaders in the same instant.
*/
- public void setWatchRefreshIntervalSeconds(Long watchRefreshIntervalSeconds) {
+ public void setJitterFactor(double jitterFactor) {
+ lockConfiguration.setJitterFactor(jitterFactor);
+ }
+
+ public long getLeaseDurationSeconds() {
+ return lockConfiguration.getLeaseDurationSeconds();
+ }
+
+ /**
+ * The default duration of the lease for the current leader.
+ */
+ public void setLeaseDurationSeconds(long leaseDurationSeconds) {
+ lockConfiguration.setLeaseDurationSeconds(leaseDurationSeconds);
+ }
+
+ public long getRenewDeadlineSeconds() {
+ return lockConfiguration.getRenewDeadlineSeconds();
+ }
+
+ /**
+ * The deadline after which the leader must stop trying to renew its leadership (and yield it).
+ */
+ public void setRenewDeadlineSeconds(long renewDeadlineSeconds) {
+ lockConfiguration.setRenewDeadlineSeconds(renewDeadlineSeconds);
+ }
+
+ public long getRetryPeriodSeconds() {
+ return lockConfiguration.getRetryPeriodSeconds();
+ }
+
+ /**
+ * The time between two subsequent attempts to acquire/renew the leadership (or after the lease expiration).
+ * It is randomized using the jitter factor in case of new leader election (not renewal).
+ */
+ public void setRetryPeriodSeconds(long retryPeriodSeconds) {
+ lockConfiguration.setRetryPeriodSeconds(retryPeriodSeconds);
+ }
+
+ public long getWatchRefreshIntervalSeconds() {
+ return lockConfiguration.getWatchRefreshIntervalSeconds();
+ }
+
+ /**
+ * Set this to a positive value in order to recreate watchers after a certain amount of time,
+ * to avoid having stale watchers.
+ */
+ public void setWatchRefreshIntervalSeconds(long watchRefreshIntervalSeconds) {
lockConfiguration.setWatchRefreshIntervalSeconds(watchRefreshIntervalSeconds);
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
index 9ac6a86..e324b3f 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterView.java
@@ -30,7 +30,7 @@ import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.camel.component.kubernetes.KubernetesConfiguration;
import org.apache.camel.component.kubernetes.KubernetesHelper;
import org.apache.camel.component.kubernetes.ha.lock.KubernetesClusterEvent;
-import org.apache.camel.component.kubernetes.ha.lock.KubernetesLeadershipController;
+import org.apache.camel.component.kubernetes.ha.lock.KubernetesLeaseBasedLeadershipController;
import org.apache.camel.component.kubernetes.ha.lock.KubernetesLockConfiguration;
import org.apache.camel.ha.CamelClusterMember;
import org.apache.camel.impl.ha.AbstractCamelClusterView;
@@ -56,7 +56,7 @@ public class KubernetesClusterView extends AbstractCamelClusterView {
private volatile List<CamelClusterMember> currentMembers = Collections.emptyList();
- private KubernetesLeadershipController controller;
+ private KubernetesLeaseBasedLeadershipController controller;
public KubernetesClusterView(KubernetesClusterService cluster, KubernetesConfiguration configuration, KubernetesLockConfiguration lockConfiguration) {
super(cluster, lockConfiguration.getGroupName());
@@ -86,7 +86,7 @@ public class KubernetesClusterView extends AbstractCamelClusterView {
if (controller == null) {
this.kubernetesClient = KubernetesHelper.getKubernetesClient(configuration);
- controller = new KubernetesLeadershipController(kubernetesClient, this.lockConfiguration, event -> {
+ controller = new KubernetesLeaseBasedLeadershipController(kubernetesClient, this.lockConfiguration, event -> {
if (event instanceof KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) {
// New leader
Optional<String> leader = KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(event).getData();
http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
new file mode 100644
index 0000000..84718f3
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public final class ConfigMapLockUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigMapLockUtils.class);
+
+ private static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ssX";
+
+ private static final String LEADER_PREFIX = "leader.pod.";
+
+ private static final String TIMESTAMP_PREFIX = "leader.timestamp.";
+
+ private ConfigMapLockUtils() {
+ }
+
+ public static ConfigMap createNewConfigMap(String configMapName, LeaderInfo leaderInfo) {
+ return new ConfigMapBuilder().
+ withNewMetadata()
+ .withName(configMapName)
+ .addToLabels("provider", "camel")
+ .addToLabels("kind", "locks").
+ endMetadata()
+ .addToData(LEADER_PREFIX + leaderInfo.getGroupName(), leaderInfo.getLeader())
+ .addToData(TIMESTAMP_PREFIX + leaderInfo.getGroupName(), formatDate(leaderInfo.getTimestamp()))
+ .build();
+ }
+
+ public static ConfigMap getConfigMapWithNewLeader(ConfigMap configMap, LeaderInfo leaderInfo) {
+ return new ConfigMapBuilder(configMap)
+ .addToData(LEADER_PREFIX + leaderInfo.getGroupName(), leaderInfo.getLeader())
+ .addToData(TIMESTAMP_PREFIX + leaderInfo.getGroupName(), formatDate(leaderInfo.getTimestamp()))
+ .build();
+ }
+
+ public static LeaderInfo getLeaderInfo(ConfigMap configMap, String group) {
+ return new LeaderInfo(group, getLeader(configMap, group), getTimestamp(configMap, group));
+ }
+
+ private static String getLeader(ConfigMap configMap, String group) {
+ return getConfigMapValue(configMap, LEADER_PREFIX + group);
+ }
+
+ private static String formatDate(Date date) {
+ if (date == null) {
+ return null;
+ }
+ try {
+ return new SimpleDateFormat(DATE_TIME_FORMAT).format(date);
+ } catch (Exception e) {
+ LOG.warn("Unable to format date '" + date + "' using format " + DATE_TIME_FORMAT, e);
+ }
+
+ return null;
+ }
+
+ private static Date getTimestamp(ConfigMap configMap, String group) {
+ String timestamp = getConfigMapValue(configMap, TIMESTAMP_PREFIX + group);
+ if (timestamp == null) {
+ return null;
+ }
+
+ try {
+ return new SimpleDateFormat(DATE_TIME_FORMAT).parse(timestamp);
+ } catch (Exception e) {
+ LOG.warn("Unable to parse time string '" + timestamp + "' using format " + DATE_TIME_FORMAT, e);
+ }
+
+ return null;
+ }
+
+ private static String getConfigMapValue(ConfigMap configMap, String key) {
+ if (configMap == null || configMap.getData() == null) {
+ return null;
+ }
+ return configMap.getData().get(key);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java
deleted file mode 100644
index 5555fe1..0000000
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaderMonitor.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kubernetes.ha.lock;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import io.fabric8.kubernetes.api.model.ConfigMap;
-import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.KubernetesClientException;
-import io.fabric8.kubernetes.client.Watch;
-import io.fabric8.kubernetes.client.Watcher;
-
-import org.apache.camel.Service;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Monitors continuously the configmap to detect changes in leadership.
- * It calls the callback eventHandlers only when the leader changes w.r.t. the previous invocation.
- */
-class KubernetesLeaderMonitor implements Service {
-
- private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderMonitor.class);
-
- private ScheduledExecutorService serializedExecutor;
-
- private KubernetesClient kubernetesClient;
-
- private KubernetesLockConfiguration lockConfiguration;
-
- private List<KubernetesClusterEventHandler> eventHandlers;
-
- private Watch watch;
-
- private boolean terminated;
-
- private boolean refreshing;
-
- private ConfigMap latestConfigMap;
-
- public KubernetesLeaderMonitor(ScheduledExecutorService serializedExecutor, KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration) {
- this.serializedExecutor = serializedExecutor;
- this.kubernetesClient = kubernetesClient;
- this.lockConfiguration = lockConfiguration;
- this.eventHandlers = new LinkedList<>();
- }
-
- public void addClusterEventHandler(KubernetesClusterEventHandler leaderEventHandler) {
- this.eventHandlers.add(leaderEventHandler);
- }
-
- @Override
- public void start() throws Exception {
- this.terminated = false;
- serializedExecutor.execute(this::startWatch);
- serializedExecutor.execute(() -> doPoll(true));
-
- long recreationDelay = lockConfiguration.getWatchRefreshIntervalSecondsOrDefault();
- if (recreationDelay > 0) {
- serializedExecutor.scheduleWithFixedDelay(this::refresh, recreationDelay, recreationDelay, TimeUnit.SECONDS);
- }
- }
-
- @Override
- public void stop() throws Exception {
- this.terminated = true;
- Watch watch = this.watch;
- if (watch != null) {
- watch.close();
- }
- }
-
- public void refresh() {
- serializedExecutor.execute(() -> {
- if (!terminated) {
- refreshing = true;
- try {
- doPoll(false);
-
- Watch w = this.watch;
- if (w != null) {
- // It will be recreated
- w.close();
- }
- } finally {
- refreshing = false;
- }
- }
- });
- }
-
- private void startWatch() {
- try {
- LOG.debug("Starting ConfigMap watcher for monitoring the current leader");
- this.watch = kubernetesClient.configMaps()
- .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
- .withName(this.lockConfiguration.getConfigMapName())
- .watch(new Watcher<ConfigMap>() {
-
- @Override
- public void eventReceived(Action action, ConfigMap configMap) {
- switch (action) {
- case MODIFIED:
- case DELETED:
- case ADDED:
- LOG.debug("Received update from watch on ConfigMap {}", configMap);
- serializedExecutor.execute(() -> checkAndNotify(configMap));
- break;
- default:
- }
- }
-
- @Override
- public void onClose(KubernetesClientException e) {
- if (!terminated) {
- KubernetesLeaderMonitor.this.watch = null;
- if (refreshing) {
- LOG.info("Refreshing ConfigMap watcher...");
- serializedExecutor.execute(KubernetesLeaderMonitor.this::startWatch);
- } else {
- LOG.warn("ConfigMap watcher has been closed unexpectedly. Recreating it in 1 second...", e);
- serializedExecutor.schedule(KubernetesLeaderMonitor.this::startWatch, 1, TimeUnit.SECONDS);
- }
- }
- }
- });
- } catch (Exception ex) {
- LOG.warn("Unable to watch for configmap changes. Retrying in 5 seconds...");
- LOG.debug("Error while trying to watch the configmap", ex);
-
- this.serializedExecutor.schedule(this::startWatch, 5, TimeUnit.SECONDS);
- }
- }
-
- private void doPoll(boolean retry) {
- LOG.debug("Starting poll to get configmap {}", this.lockConfiguration.getConfigMapName());
- ConfigMap configMap;
- try {
- configMap = pollConfigMap();
- } catch (Exception ex) {
- if (retry) {
- LOG.warn("ConfigMap poll failed. Retrying in 5 seconds...", ex);
- this.serializedExecutor.schedule(() -> doPoll(true), 5, TimeUnit.SECONDS);
- } else {
- LOG.warn("ConfigMap poll failed", ex);
- }
- return;
- }
-
- checkAndNotify(configMap);
- }
-
- private void checkAndNotify(ConfigMap candidateConfigMap) {
- LOG.debug("Checking configMap {}", candidateConfigMap);
- ConfigMap newConfigMap = newest(this.latestConfigMap, candidateConfigMap);
- Optional<String> leader = extractLeader(newConfigMap);
- Optional<String> oldLeader = extractLeader(this.latestConfigMap);
-
- this.latestConfigMap = newConfigMap;
-
- LOG.debug("The new leader is {}. Old leader was {}", leader, oldLeader);
- if (!leader.equals(oldLeader)) {
- LOG.debug("Notifying the new leader to all eventHandlers");
- for (KubernetesClusterEventHandler eventHandler : eventHandlers) {
- eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) () -> leader);
- }
- } else {
- LOG.debug("Leader has not changed");
- }
- }
-
- private ConfigMap pollConfigMap() {
- return kubernetesClient.configMaps()
- .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
- .withName(this.lockConfiguration.getConfigMapName())
- .get();
- }
-
- private Optional<String> extractLeader(ConfigMap configMap) {
- Optional<String> leader = Optional.empty();
- if (configMap != null && configMap.getData() != null) {
- leader = Optional.ofNullable(configMap.getData().get(this.lockConfiguration.getGroupName()));
- }
- return leader;
- }
-
- private ConfigMap newest(ConfigMap configMap1, ConfigMap configMap2) {
- ConfigMap newest = null;
-
- if (configMap1 != null && configMap2 == null) {
- newest = configMap1;
- } else if (configMap1 == null && configMap2 != null) {
- newest = configMap2;
- }
-
- if (newest == null) {
- String rv1 = extractResourceVersion(configMap1);
- String rv2 = extractResourceVersion(configMap2);
- newest = newest(configMap1, configMap2, rv1, rv2);
- }
-
- if (newest == null) {
- String ct1 = extractCreationTimestamp(configMap1);
- String ct2 = extractCreationTimestamp(configMap2);
- // timestamps are string-comparable
- newest = newest(configMap1, configMap2, ct1, ct2);
- }
-
- return newest;
- }
-
- private <T extends Comparable<T>> ConfigMap newest(ConfigMap configMap1, ConfigMap configMap2, T cmp1, T cmp2) {
- if (cmp1 != null && cmp2 != null) {
- int comp = cmp1.compareTo(cmp2);
- if (comp > 0) {
- return configMap1;
- } else {
- return configMap2;
- }
- }
- return null;
- }
-
- private String extractResourceVersion(ConfigMap configMap) {
- if (configMap != null && configMap.getMetadata() != null) {
- return configMap.getMetadata().getResourceVersion();
- }
- return null;
- }
-
- private String extractCreationTimestamp(ConfigMap configMap) {
- if (configMap != null && configMap.getMetadata() != null) {
- return configMap.getMetadata().getCreationTimestamp();
- }
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
deleted file mode 100644
index ad2f9bc..0000000
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeadershipController.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kubernetes.ha.lock;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import io.fabric8.kubernetes.api.model.ConfigMap;
-import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
-import io.fabric8.kubernetes.client.KubernetesClient;
-
-import org.apache.camel.Service;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Start the monitors and participate to leader election when no active leaders are present.
- * It communicates changes in leadership and cluster members to the given event handler.
- */
-public class KubernetesLeadershipController implements Service {
-
- private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeadershipController.class);
-
- private KubernetesClient kubernetesClient;
-
- private KubernetesLockConfiguration lockConfiguration;
-
- private ScheduledExecutorService executor;
-
- private KubernetesLeaderMonitor leaderMonitor;
-
- private KubernetesMembersMonitor membersMonitor;
-
- private Optional<String> currentLeader;
-
- private Set<String> currentMembers;
-
- private KubernetesClusterEventHandler eventHandler;
-
- public KubernetesLeadershipController(KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler eventHandler) {
-
- this.kubernetesClient = kubernetesClient;
- this.lockConfiguration = lockConfiguration;
- this.eventHandler = eventHandler;
-
- this.currentLeader = Optional.empty();
- this.currentMembers = Collections.emptySet();
- }
-
- @Override
- public void start() throws Exception {
-
- if (executor == null) {
- executor = Executors.newSingleThreadScheduledExecutor(); // No concurrency
- leaderMonitor = new KubernetesLeaderMonitor(this.executor, this.kubernetesClient, this.lockConfiguration);
- membersMonitor = new KubernetesMembersMonitor(this.executor, this.kubernetesClient, this.lockConfiguration);
-
- leaderMonitor.addClusterEventHandler(e -> executor.execute(() -> onLeaderChanged(e)));
- if (eventHandler != null) {
- leaderMonitor.addClusterEventHandler(eventHandler);
- }
-
- membersMonitor.addClusterEventHandler(e -> executor.execute(() -> onMembersChanged(e)));
- if (eventHandler != null) {
- membersMonitor.addClusterEventHandler(eventHandler);
- }
-
- // Start all services
- leaderMonitor.start();
- membersMonitor.start();
-
- // Fire a new election if possible
- executor.execute(this::runLeaderElection);
- }
-
- }
-
- @Override
- public void stop() throws Exception {
- if (executor != null) {
- membersMonitor.stop();
- leaderMonitor.stop();
- executor.shutdown();
- executor.shutdownNow();
-
- membersMonitor = null;
- leaderMonitor = null;
- executor = null;
- }
- }
-
- private void onLeaderChanged(KubernetesClusterEvent e) {
- Optional<String> newLeader = KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent.class.cast(e).getData();
- if (!newLeader.isPresent()) {
- executor.execute(this::tryLeaderElection);
- }
- this.currentLeader = newLeader;
- }
-
- private void onMembersChanged(KubernetesClusterEvent e) {
- Set<String> newMembers = KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent.class.cast(e).getData();
- if (currentLeader.isPresent()) {
- // Check if the current leader is still present in the list
- if (!newMembers.contains(currentLeader.get()) && currentMembers.contains(currentLeader.get())) {
- executor.execute(this::runLeaderElection);
- }
- }
- this.currentMembers = newMembers;
- }
-
- private void runLeaderElection() {
- boolean finished = false;
- try {
- finished = tryLeaderElection();
- } catch (Exception ex) {
- LOG.warn("Exception while trying to acquire the leadership", ex);
- }
-
- if (!finished) {
- executor.schedule(this::runLeaderElection, 1, TimeUnit.SECONDS);
- }
- }
-
- private boolean tryLeaderElection() {
- LOG.debug("Starting leader election");
- if (!currentMembers.contains(this.lockConfiguration.getPodName())) {
- LOG.debug("The current pod ({}) is not in the list of participating pods {}. Cannot participate to the election", this.lockConfiguration.getPodName(), currentMembers);
- return false;
- }
-
- ConfigMap configMap = kubernetesClient.configMaps()
- .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
- .withName(this.lockConfiguration.getConfigMapName())
- .get();
-
- if (configMap == null) {
- // No configmap created so far
- LOG.info("Lock configmap is not present in the Kubernetes namespace. A new ConfigMap will be created");
-
- ConfigMap newConfigMap = new ConfigMapBuilder().
- withNewMetadata()
- .withName(this.lockConfiguration.getConfigMapName())
- .addToLabels("provider", "camel")
- .addToLabels("kind", "locks").
- endMetadata()
- .addToData(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName())
- .build();
-
- try {
- kubernetesClient.configMaps()
- .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
- .create(newConfigMap);
- } catch (Exception ex) {
- // Suppress exception
- LOG.warn("Unable to create the ConfigMap, it may have been created by other cluster members concurrently. If the problem persists, check if the service account has the right "
- + "permissions to create it");
- LOG.debug("Exception while trying to create the ConfigMap", ex);
- return false;
- }
- return true;
- } else {
- LOG.info("Lock configmap already present in the Kubernetes namespace. Checking...");
- Map<String, String> leaders = configMap.getData();
- Optional<String> oldLeader = leaders != null ? Optional.ofNullable(leaders.get(this.lockConfiguration.getGroupName())) : Optional.empty();
-
- boolean noLeaderPresent = !oldLeader.isPresent() || !currentMembers.contains(oldLeader.get());
- boolean alreadyLeader = oldLeader.isPresent() && oldLeader.get().equals(this.lockConfiguration.getPodName());
-
- if (noLeaderPresent && !alreadyLeader) {
- LOG.info("Trying to acquire the lock in configmap={}, key={}", this.lockConfiguration.getConfigMapName(), this.lockConfiguration.getGroupName());
- ConfigMap newConfigMap = new ConfigMapBuilder(configMap)
- .addToData(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName())
- .build();
-
- kubernetesClient.configMaps()
- .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
- .withName(this.lockConfiguration.getConfigMapName())
- .lockResourceVersion(configMap.getMetadata().getResourceVersion())
- .replace(newConfigMap);
-
- LOG.info("Lock acquired for configmap={}, key={}", this.lockConfiguration.getConfigMapName(), this.lockConfiguration.getGroupName());
- } else if (!noLeaderPresent) {
- LOG.info("A leader is already present for configmap={}, key={}: {}", this.lockConfiguration.getConfigMapName(), this.lockConfiguration.getGroupName(), oldLeader);
- } else {
- LOG.info("This pod ({}) is already the leader for configmap={}, key={}", this.lockConfiguration.getPodName(), this.lockConfiguration.getConfigMapName(), this.lockConfiguration
- .getGroupName());
- }
- return true;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
new file mode 100644
index 0000000..b385925
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.util.Date;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import org.apache.camel.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Monitors current status and participate to leader election when no active leaders are present.
+ * It communicates changes in leadership and cluster members to the given event handler.
+ */
+public class KubernetesLeaseBasedLeadershipController implements Service {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaseBasedLeadershipController.class);
+
+ private static final long FIXED_ADDITIONAL_DELAY = 100;
+
+ private KubernetesClient kubernetesClient;
+
+ private KubernetesLockConfiguration lockConfiguration;
+
+ private KubernetesClusterEventHandler eventHandler;
+
+ private ScheduledExecutorService serializedExecutor;
+ private ScheduledExecutorService eventDispatcherExecutor;
+
+ private KubernetesMembersMonitor membersMonitor;
+
+ private Optional<String> currentLeader = Optional.empty();
+
+ private volatile LeaderInfo latestLeaderInfo;
+
+ public KubernetesLeaseBasedLeadershipController(KubernetesClient kubernetesClient, KubernetesLockConfiguration lockConfiguration, KubernetesClusterEventHandler eventHandler) {
+ this.kubernetesClient = kubernetesClient;
+ this.lockConfiguration = lockConfiguration;
+ this.eventHandler = eventHandler;
+ }
+
+ @Override
+ public void start() throws Exception {
+ if (serializedExecutor == null) {
+ LOG.debug("Starting leadership controller...");
+ serializedExecutor = Executors.newSingleThreadScheduledExecutor();
+
+ eventDispatcherExecutor = Executors.newSingleThreadScheduledExecutor();
+
+ membersMonitor = new KubernetesMembersMonitor(this.serializedExecutor, this.kubernetesClient, this.lockConfiguration);
+ if (eventHandler != null) {
+ membersMonitor.addClusterEventHandler(eventHandler);
+ }
+
+ membersMonitor.start();
+ serializedExecutor.execute(this::initialization);
+ }
+ }
+
+ @Override
+ public void stop() throws Exception {
+ LOG.debug("Stopping leadership controller...");
+ if (serializedExecutor != null) {
+ serializedExecutor.shutdownNow();
+ }
+ if (eventDispatcherExecutor != null) {
+ eventDispatcherExecutor.shutdown();
+ eventDispatcherExecutor.awaitTermination(2, TimeUnit.SECONDS);
+ eventDispatcherExecutor.shutdownNow();
+ }
+ if (membersMonitor != null) {
+ membersMonitor.stop();
+ }
+
+ membersMonitor = null;
+ eventDispatcherExecutor = null;
+ serializedExecutor = null;
+ }
+
+ /**
+ * Get the first ConfigMap and setup the initial state.
+ */
+ private void initialization() {
+ LOG.debug("Reading (with retry) the configmap {} to detect the current leader", this.lockConfiguration.getConfigMapName());
+ refreshConfigMapFromCluster(true);
+
+ if (isCurrentPodTheActiveLeader()) {
+ serializedExecutor.execute(this::onLeadershipAcquired);
+ } else {
+ LOG.info("The current pod ({}) is not the leader of the group '{}' in ConfigMap '{}' at this time", this.lockConfiguration.getPodName(), this.lockConfiguration
+ .getGroupName(), this.lockConfiguration.getConfigMapName());
+ serializedExecutor.execute(this::acquireLeadershipCycle);
+ }
+ }
+
+ /**
+ * Signals the acquisition of the leadership and move to the keep-leadership state.
+ */
+ private void onLeadershipAcquired() {
+ LOG.info("The current pod ({}) is now the leader of the group '{}' in ConfigMap '{}'", this.lockConfiguration.getPodName(), this.lockConfiguration
+ .getGroupName(), this.lockConfiguration.getConfigMapName());
+
+ this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader);
+
+ long nextDelay = computeNextRenewWaitTime(this.latestLeaderInfo.getTimestamp(), this.latestLeaderInfo.getTimestamp());
+ serializedExecutor.schedule(this::keepLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * While in the keep-leadership state, the controller periodically renews the lease.
+ * If a renewal deadline is met and it was not possible to renew the lease, the leadership is lost.
+ */
+ private void keepLeadershipCycle() {
+ // renew lease periodically
+ refreshConfigMapFromCluster(false); // if possible, update
+
+ if (this.latestLeaderInfo.isTimeElapsedSeconds(lockConfiguration.getRenewDeadlineSeconds()) || !this.latestLeaderInfo.isLeader(this.lockConfiguration.getPodName())) {
+ // Time over for renewal or leadership lost
+ LOG.debug("The current pod ({}) has lost the leadership", this.lockConfiguration.getPodName());
+ serializedExecutor.execute(this::onLeadershipLost);
+ return;
+ }
+
+ boolean success = tryAcquireOrRenewLeadership();
+ LOG.debug("Attempted to renew the lease. Success={}", success);
+
+ long nextDelay = computeNextRenewWaitTime(this.latestLeaderInfo.getTimestamp(), new Date());
+ serializedExecutor.schedule(this::keepLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Compute the timestamp of next event while in keep-leadership state.
+ */
+ private long computeNextRenewWaitTime(Date lastRenewal, Date lastRenewalAttempt) {
+ long timeDeadline = lastRenewal.getTime() + this.lockConfiguration.getRenewDeadlineSeconds() * 1000;
+ long timeRetry;
+ long counter = 0;
+ do {
+ counter++;
+ timeRetry = lastRenewal.getTime() + counter * this.lockConfiguration.getRetryPeriodSeconds() * 1000;
+ } while (timeRetry < lastRenewalAttempt.getTime() && timeRetry < timeDeadline);
+
+ long time = Math.min(timeRetry, timeDeadline);
+ long delay = Math.max(0, time - System.currentTimeMillis());
+ LOG.debug("Next renewal timeout event will be fired in {} seconds", delay / 1000);
+ return delay;
+ }
+
+
+ /**
+ * Signals the loss of leadership and move to the acquire-leadership state.
+ */
+ private void onLeadershipLost() {
+ LOG.info("The local pod ({}) is no longer the leader of the group '{}' in ConfigMap '{}'", this.lockConfiguration.getPodName(), this.lockConfiguration.getGroupName(),
+ this.lockConfiguration.getConfigMapName());
+
+ this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader);
+ serializedExecutor.execute(this::acquireLeadershipCycle);
+ }
+
+ /**
+ * While in the acquire-leadership state, the controller waits for the current lease to expire before trying to acquire the leadership.
+ */
+ private void acquireLeadershipCycle() {
+ // wait for the current lease to finish then fire an election
+ refreshConfigMapFromCluster(false); // if possible, update
+
+ // Notify about changes in current leader if any
+ this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader);
+
+ if (!this.latestLeaderInfo.isTimeElapsedSeconds(lockConfiguration.getLeaseDurationSeconds())) {
+ // Wait for the lease to finish before trying leader election
+ long nextDelay = computeNextElectionWaitTime(this.latestLeaderInfo.getTimestamp());
+ serializedExecutor.schedule(this::acquireLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
+ return;
+ }
+
+ boolean acquired = tryAcquireOrRenewLeadership();
+ if (acquired) {
+ LOG.debug("Leadership acquired for ConfigMap {}. Notification in progress...", this.lockConfiguration.getConfigMapName());
+ serializedExecutor.execute(this::onLeadershipAcquired);
+ return;
+ }
+
+ // Notify about changes in current leader if any
+ this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader);
+
+ LOG.debug("Cannot acquire the leadership for ConfigMap {}", this.lockConfiguration.getConfigMapName());
+ long nextDelay = computeNextElectionWaitTime(this.latestLeaderInfo.getTimestamp());
+ serializedExecutor.schedule(this::acquireLeadershipCycle, nextDelay + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
+ }
+
+ private long computeNextElectionWaitTime(Date lastRenewal) {
+ if (lastRenewal == null) {
+ LOG.debug("Error detected while getting leadership info, next election timeout event will be fired in {} seconds", this.lockConfiguration.getRetryOnErrorIntervalSeconds());
+ return this.lockConfiguration.getRetryOnErrorIntervalSeconds();
+ }
+ long time = lastRenewal.getTime() + this.lockConfiguration.getLeaseDurationSeconds() * 1000
+ + jitter(this.lockConfiguration.getRetryPeriodSeconds() * 1000, this.lockConfiguration.getJitterFactor());
+
+ long delay = Math.max(0, time - System.currentTimeMillis());
+ LOG.debug("Next election timeout event will be fired in {} seconds", delay / 1000);
+ return delay;
+ }
+
+ private long jitter(long num, double factor) {
+ return (long) (num * (1 + Math.random() * (factor - 1)));
+ }
+
+ private boolean tryAcquireOrRenewLeadership() {
+ LOG.debug("Trying to acquire or renew the leadership...");
+
+ ConfigMap configMap;
+ try {
+ configMap = pullConfigMap();
+ } catch (Exception e) {
+ LOG.warn("Unable to retrieve the current ConfigMap " + this.lockConfiguration.getConfigMapName() + " from Kubernetes", e);
+ return false;
+ }
+
+ // Info to set in the configmap to become leaders
+ LeaderInfo newLeaderInfo = new LeaderInfo(this.lockConfiguration.getGroupName(), this.lockConfiguration.getPodName(), new Date());
+
+ if (configMap == null) {
+ // No configmap created so far
+ LOG.debug("Lock configmap is not present in the Kubernetes namespace. A new ConfigMap will be created");
+ ConfigMap newConfigMap = ConfigMapLockUtils.createNewConfigMap(this.lockConfiguration.getConfigMapName(), newLeaderInfo);
+
+ try {
+ kubernetesClient.configMaps()
+ .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+ .create(newConfigMap);
+ } catch (Exception ex) {
+ // Suppress exception
+ LOG.warn("Unable to create the ConfigMap, it may have been created by other cluster members concurrently. If the problem persists, check if the service account has the right "
+ + "permissions to create it");
+ LOG.debug("Exception while trying to create the ConfigMap", ex);
+
+ // Try to get the configMap and return the current leader
+ refreshConfigMapFromCluster(false);
+ return isCurrentPodTheActiveLeader();
+ }
+
+ LOG.debug("ConfigMap {} successfully created and local pod is leader", this.lockConfiguration.getConfigMapName());
+ updateLatestLeaderInfo(newConfigMap);
+ return true;
+ } else {
+ LOG.debug("Lock configmap already present in the Kubernetes namespace. Checking...");
+ LeaderInfo leaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, this.lockConfiguration.getGroupName());
+
+ boolean weWereLeader = leaderInfo.isLeader(this.lockConfiguration.getPodName());
+ boolean leaseExpired = leaderInfo.isTimeElapsedSeconds(this.lockConfiguration.getLeaseDurationSeconds());
+
+ if (weWereLeader || leaseExpired) {
+ // Renew the lease or set the new leader
+ try {
+ ConfigMap updatedConfigMap = ConfigMapLockUtils.getConfigMapWithNewLeader(configMap, newLeaderInfo);
+ kubernetesClient.configMaps()
+ .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+ .withName(this.lockConfiguration.getConfigMapName())
+ .lockResourceVersion(configMap.getMetadata().getResourceVersion())
+ .replace(updatedConfigMap);
+
+ LOG.debug("ConfigMap {} successfully updated and local pod is leader", this.lockConfiguration.getConfigMapName());
+ updateLatestLeaderInfo(updatedConfigMap);
+ return true;
+ } catch (Exception ex) {
+ LOG.warn("An attempt to become leader has failed. It's possible that the leadership has been taken by another pod");
+ LOG.debug("Error received during configmap lock replace", ex);
+
+ // Try to get the configMap and return the current leader
+ refreshConfigMapFromCluster(false);
+ return isCurrentPodTheActiveLeader();
+ }
+ } else {
+ // Another pod is the leader and lease is not expired
+ LOG.debug("Another pod is the current leader and lease has not expired yet");
+ updateLatestLeaderInfo(configMap);
+ return false;
+ }
+ }
+ }
+
+
+ private void refreshConfigMapFromCluster(boolean retry) {
+ LOG.debug("Retrieving configmap {}", this.lockConfiguration.getConfigMapName());
+ try {
+ updateLatestLeaderInfo(pullConfigMap());
+ } catch (Exception ex) {
+ if (retry) {
+ LOG.warn("ConfigMap pull failed. Retrying in " + this.lockConfiguration.getRetryOnErrorIntervalSeconds() + " seconds...", ex);
+ try {
+ Thread.sleep(this.lockConfiguration.getRetryOnErrorIntervalSeconds() * 1000);
+ refreshConfigMapFromCluster(true);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Controller Thread interrupted, shutdown in progress", e);
+ }
+ } else {
+ LOG.warn("Cannot retrieve the ConfigMap: pull failed", ex);
+ }
+ }
+ }
+
+ private boolean isCurrentPodTheActiveLeader() {
+ return latestLeaderInfo != null
+ && latestLeaderInfo.isLeader(this.lockConfiguration.getPodName())
+ && !latestLeaderInfo.isTimeElapsedSeconds(this.lockConfiguration.getRenewDeadlineSeconds());
+ }
+
+ private ConfigMap pullConfigMap() {
+ return kubernetesClient.configMaps()
+ .inNamespace(this.lockConfiguration.getKubernetesResourcesNamespaceOrDefault(kubernetesClient))
+ .withName(this.lockConfiguration.getConfigMapName())
+ .get();
+ }
+
+
+ private void updateLatestLeaderInfo(ConfigMap configMap) {
+ LOG.debug("Updating internal status about the current leader");
+ this.latestLeaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, this.lockConfiguration.getGroupName());
+ }
+
+ private void checkAndNotifyNewLeader() {
+ LOG.debug("Checking if the current leader has changed to notify the event handler...");
+ LeaderInfo newLeaderInfo = this.latestLeaderInfo;
+ if (newLeaderInfo == null) {
+ return;
+ }
+
+ long leaderInfoDurationSeconds = newLeaderInfo.isLeader(this.lockConfiguration.getPodName())
+ ? this.lockConfiguration.getRenewDeadlineSeconds()
+ : this.lockConfiguration.getLeaseDurationSeconds();
+
+ Optional<String> newLeader;
+ if (newLeaderInfo.getLeader() != null && !newLeaderInfo.isTimeElapsedSeconds(leaderInfoDurationSeconds)) {
+ newLeader = Optional.of(newLeaderInfo.getLeader());
+ } else {
+ newLeader = Optional.empty();
+ }
+
+ // Sending notifications in case of leader change
+ if (!newLeader.equals(this.currentLeader)) {
+ LOG.debug("Current leader has changed from {} to {}. Sending notifications...", this.currentLeader, newLeader);
+ this.currentLeader = newLeader;
+ eventHandler.onKubernetesClusterEvent((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) () -> newLeader);
+ } else {
+ LOG.debug("Current leader unchanged: {}", this.currentLeader);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
index f203c0a..37e0251 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
@@ -26,7 +26,16 @@ import io.fabric8.kubernetes.client.KubernetesClient;
*/
public class KubernetesLockConfiguration implements Cloneable {
- private static final long DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS = 1800;
+ public static final String DEFAULT_CONFIGMAP_NAME = "leaders";
+
+
+ public static final double DEFAULT_JITTER_FACTOR = 1.2;
+ public static final long DEFAULT_LEASE_DURATION_SECONDS = 20;
+ public static final long DEFAULT_RENEW_DEADLINE_SECONDS = 15;
+ public static final long DEFAULT_RETRY_PERIOD_SECONDS = 6;
+
+ public static final long DEFAULT_RETRY_ON_ERROR_INTERVAL_SECONDS = 5;
+ public static final long DEFAULT_WATCH_REFRESH_INTERVAL_SECONDS = 1800;
/**
* Kubernetes namespace containing the pods and the ConfigMap used for locking.
@@ -36,7 +45,7 @@ public class KubernetesLockConfiguration implements Cloneable {
/**
* Name of the ConfigMap used for locking.
*/
- private String configMapName;
+ private String configMapName = DEFAULT_CONFIGMAP_NAME;
/**
* Name of the lock group (or namespace according to the Camel cluster convention) within the chosen ConfgMap.
@@ -55,9 +64,36 @@ public class KubernetesLockConfiguration implements Cloneable {
/**
* Indicates the maximum amount of time a Kubernetes watch should be kept active, before being recreated.
- * Watch recreation can be disabled by putting a negative value (the default will be used in case of null).
+ * Watch recreation can be disabled by putting value <= 0.
+ */
+ private long retryOnErrorIntervalSeconds = DEFAULT_RETRY_ON_ERROR_INTERVAL_SECONDS;
+
+ /**
+ * A jitter factor to apply in order to prevent all pods to try to become leaders in the same instant.
+ */
+ private double jitterFactor = DEFAULT_JITTER_FACTOR;
+
+ /**
+ * The default duration of the lease for the current leader.
+ */
+ private long leaseDurationSeconds = DEFAULT_LEASE_DURATION_SECONDS;
+
+ /**
+ * The deadline after which the leader must stop trying to renew its leadership (and yield it).
+ */
+ private long renewDeadlineSeconds = DEFAULT_RENEW_DEADLINE_SECONDS;
+
+ /**
+ * The time between two subsequent attempts to acquire/renew the leadership (or after the lease expiration).
+ * It is randomized using the jitter factor in case of new leader election (not renewal).
*/
- private Long watchRefreshIntervalSeconds;
+ private long retryPeriodSeconds = DEFAULT_RETRY_PERIOD_SECONDS;
+
+ /**
+ * Set this to a positive value in order to recreate watchers after a certain amount of time
+ * (to prevent them becoming stale).
+ */
+ private long watchRefreshIntervalSeconds = DEFAULT_WATCH_REFRESH_INTERVAL_SECONDS;
public KubernetesLockConfiguration() {
}
@@ -113,19 +149,51 @@ public class KubernetesLockConfiguration implements Cloneable {
this.clusterLabels = clusterLabels;
}
- public Long getWatchRefreshIntervalSeconds() {
- return watchRefreshIntervalSeconds;
+ public long getRetryOnErrorIntervalSeconds() {
+ return retryOnErrorIntervalSeconds;
}
- public long getWatchRefreshIntervalSecondsOrDefault() {
- Long interval = watchRefreshIntervalSeconds;
- if (interval == null) {
- interval = DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS;
- }
- return interval;
+ public void setRetryOnErrorIntervalSeconds(long retryOnErrorIntervalSeconds) {
+ this.retryOnErrorIntervalSeconds = retryOnErrorIntervalSeconds;
+ }
+
+ public double getJitterFactor() {
+ return jitterFactor;
+ }
+
+ public void setJitterFactor(double jitterFactor) {
+ this.jitterFactor = jitterFactor;
+ }
+
+ public long getLeaseDurationSeconds() {
+ return leaseDurationSeconds;
+ }
+
+ public void setLeaseDurationSeconds(long leaseDurationSeconds) {
+ this.leaseDurationSeconds = leaseDurationSeconds;
+ }
+
+ public long getRenewDeadlineSeconds() {
+ return renewDeadlineSeconds;
+ }
+
+ public void setRenewDeadlineSeconds(long renewDeadlineSeconds) {
+ this.renewDeadlineSeconds = renewDeadlineSeconds;
+ }
+
+ public long getRetryPeriodSeconds() {
+ return retryPeriodSeconds;
+ }
+
+ public void setRetryPeriodSeconds(long retryPeriodSeconds) {
+ this.retryPeriodSeconds = retryPeriodSeconds;
+ }
+
+ public long getWatchRefreshIntervalSeconds() {
+ return watchRefreshIntervalSeconds;
}
- public void setWatchRefreshIntervalSeconds(Long watchRefreshIntervalSeconds) {
+ public void setWatchRefreshIntervalSeconds(long watchRefreshIntervalSeconds) {
this.watchRefreshIntervalSeconds = watchRefreshIntervalSeconds;
}
@@ -146,6 +214,11 @@ public class KubernetesLockConfiguration implements Cloneable {
sb.append(", groupName='").append(groupName).append('\'');
sb.append(", podName='").append(podName).append('\'');
sb.append(", clusterLabels=").append(clusterLabels);
+ sb.append(", retryOnErrorIntervalSeconds=").append(retryOnErrorIntervalSeconds);
+ sb.append(", jitterFactor=").append(jitterFactor);
+ sb.append(", leaseDurationSeconds=").append(leaseDurationSeconds);
+ sb.append(", renewDeadlineSeconds=").append(renewDeadlineSeconds);
+ sb.append(", retryPeriodSeconds=").append(retryPeriodSeconds);
sb.append(", watchRefreshIntervalSeconds=").append(watchRefreshIntervalSeconds);
sb.append('}');
return sb.toString();
http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
index d9173b2..586a11f 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesMembersMonitor.java
@@ -41,8 +41,6 @@ import org.slf4j.LoggerFactory;
*/
class KubernetesMembersMonitor implements Service {
- private static final long DEFAULT_WATCHER_REFRESH_INTERVAL_SECONDS = 1800;
-
private static final Logger LOG = LoggerFactory.getLogger(KubernetesMembersMonitor.class);
private ScheduledExecutorService serializedExecutor;
@@ -81,7 +79,7 @@ class KubernetesMembersMonitor implements Service {
serializedExecutor.execute(() -> doPoll(true));
serializedExecutor.execute(this::createWatch);
- long recreationDelay = lockConfiguration.getWatchRefreshIntervalSecondsOrDefault();
+ long recreationDelay = lockConfiguration.getWatchRefreshIntervalSeconds();
if (recreationDelay > 0) {
serializedExecutor.scheduleWithFixedDelay(this::refresh, recreationDelay, recreationDelay, TimeUnit.SECONDS);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e4cab329/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java
new file mode 100644
index 0000000..50d1603
--- /dev/null
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/LeaderInfo.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.lock;
+
+import java.util.Date;
+
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Overview of a leadership status.
+ */
+public class LeaderInfo {
+
+ private String groupName;
+
+ private String leader;
+
+ private Date timestamp;
+
+ public LeaderInfo() {
+ }
+
+ public LeaderInfo(String groupName, String leader, Date timestamp) {
+ this.groupName = groupName;
+ this.leader = leader;
+ this.timestamp = timestamp;
+ }
+
+ public boolean isTimeElapsedSeconds(long timeSeconds) {
+ if (timestamp == null) {
+ return true;
+ }
+ long now = System.currentTimeMillis();
+ return timestamp.getTime() + timeSeconds * 1000 <= now;
+ }
+
+ public boolean isLeader(String pod) {
+ ObjectHelper.notNull(pod, "pod");
+ return pod.equals(leader);
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public void setGroupName(String groupName) {
+ this.groupName = groupName;
+ }
+
+ public String getLeader() {
+ return leader;
+ }
+
+ public void setLeader(String leader) {
+ this.leader = leader;
+ }
+
+ public Date getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(Date timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("LeaderInfo{");
+ sb.append("groupName='").append(groupName).append('\'');
+ sb.append(", leader='").append(leader).append('\'');
+ sb.append(", timestamp=").append(timestamp);
+ sb.append('}');
+ return sb.toString();
+ }
+
+}