You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/08/08 14:43:26 UTC

[08/14] camel git commit: CAMEL-11331: Adding tests and fixing impl

CAMEL-11331: Adding tests and fixing impl


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/881b9331
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/881b9331
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/881b9331

Branch: refs/heads/master
Commit: 881b9331f0712b4e611087eb11e623016dd3de72
Parents: debfeed
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Fri Jul 14 11:21:50 2017 +0200
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Tue Aug 8 16:39:43 2017 +0200

----------------------------------------------------------------------
 components/camel-kubernetes/pom.xml             |   6 +
 .../kubernetes/ha/lock/ConfigMapLockUtils.java  |   2 +-
 ...ubernetesLeaseBasedLeadershipController.java |  15 +-
 .../ha/lock/KubernetesLockConfiguration.java    |   6 +-
 .../ha/KubernetesClusterServiceTest.java        | 291 +++++++++++++++++++
 .../ha/utils/ConfigMapLockSimulator.java        |  83 ++++++
 .../kubernetes/ha/utils/LeaderRecorder.java     | 115 ++++++++
 .../kubernetes/ha/utils/LockTestServer.java     | 175 +++++++++++
 .../kubernetes/ha/utils/LockTestServerTest.java |  97 +++++++
 parent/pom.xml                                  |   1 +
 10 files changed, 784 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/pom.xml b/components/camel-kubernetes/pom.xml
index c444068..38fa037 100644
--- a/components/camel-kubernetes/pom.xml
+++ b/components/camel-kubernetes/pom.xml
@@ -67,6 +67,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>io.fabric8</groupId>
+      <artifactId>mockwebserver</artifactId>
+      <version>${mockwebserver-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.camel</groupId>
       <artifactId>camel-test-spring</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
index 84718f3..70fa860 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- *
+ * Utilities for managing ConfigMaps that contain lock information.
  */
 public final class ConfigMapLockUtils {
 

http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
index 8e96a72..42be2e7 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
@@ -163,8 +163,9 @@ public class KubernetesLeaseBasedLeadershipController implements Service {
 
         long time = Math.min(timeRetry, timeDeadline);
         long delay = Math.max(0, time - System.currentTimeMillis());
-        LOG.debug("Next renewal timeout event will be fired in {} seconds", delay / 1000);
-        return delay;
+        long delayJittered = jitter(delay, lockConfiguration.getJitterFactor());
+        LOG.debug("Next renewal timeout event will be fired in {} seconds", delayJittered / 1000);
+        return delayJittered;
     }
 
 
@@ -340,10 +341,18 @@ public class KubernetesLeaseBasedLeadershipController implements Service {
     private void updateLatestLeaderInfo(ConfigMap configMap) {
         LOG.debug("Updating internal status about the current leader");
         this.latestLeaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, this.lockConfiguration.getGroupName());
+
+        // Notify about changes in current leader if any
+        this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader);
+        if (this.latestLeaderInfo.isLeader(this.lockConfiguration.getPodName())) {
+            this.eventDispatcherExecutor.schedule(this::checkAndNotifyNewLeader, this.lockConfiguration.getRenewDeadlineSeconds() * 1000 + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
+        } else if (this.latestLeaderInfo.getLeader() != null) {
+            this.eventDispatcherExecutor.schedule(this::checkAndNotifyNewLeader, this.lockConfiguration.getLeaseDurationSeconds() * 1000 + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
+        }
     }
 
     private void checkAndNotifyNewLeader() {
-        LOG.debug("Checking if the current leader has changed to notify the event handler...");
+        LOG.info("Checking if the current leader has changed to notify the event handler...");
         LeaderInfo newLeaderInfo = this.latestLeaderInfo;
         if (newLeaderInfo == null) {
             return;

http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
index 37e0251..6461708 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
@@ -30,9 +30,9 @@ public class KubernetesLockConfiguration implements Cloneable {
 
 
     public static final double DEFAULT_JITTER_FACTOR = 1.2;
-    public static final long DEFAULT_LEASE_DURATION_SECONDS = 20;
-    public static final long DEFAULT_RENEW_DEADLINE_SECONDS = 15;
-    public static final long DEFAULT_RETRY_PERIOD_SECONDS = 6;
+    public static final long DEFAULT_LEASE_DURATION_SECONDS = 60;
+    public static final long DEFAULT_RENEW_DEADLINE_SECONDS = 45;
+    public static final long DEFAULT_RETRY_PERIOD_SECONDS = 9;
 
     public static final long DEFAULT_RETRY_ON_ERROR_INTERVAL_SECONDS = 5;
     public static final long DEFAULT_WATCH_REFRESH_INTERVAL_SECONDS = 1800;

http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
new file mode 100644
index 0000000..4baebc6
--- /dev/null
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+import org.apache.camel.component.kubernetes.ha.utils.ConfigMapLockSimulator;
+import org.apache.camel.component.kubernetes.ha.utils.LeaderRecorder;
+import org.apache.camel.component.kubernetes.ha.utils.LockTestServer;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test leader election scenarios using a mock server.
+ */
+public class KubernetesClusterServiceTest extends CamelTestSupport {
+
+    private static final int LEASE_TIME_SECONDS = 5;
+
+    private ConfigMapLockSimulator lockSimulator;
+
+    private Map<String, LockTestServer> lockServers;
+
+    @Before
+    public void prepareLock() {
+        this.lockSimulator = new ConfigMapLockSimulator("leaders");
+        this.lockServers = new HashMap<>();
+    }
+
+    @After
+    public void shutdownLock() {
+        for (LockTestServer server : this.lockServers.values()) {
+            try {
+                server.destroy();
+            } catch (Exception e) {
+                // can happen in case of delay
+            }
+        }
+    }
+
+    @Test
+    public void testSimpleLeaderElection() throws Exception {
+        LeaderRecorder mypod1 = addMember("mypod1");
+        LeaderRecorder mypod2 = addMember("mypod2");
+        context.start();
+
+        mypod1.waitForAnyLeader(2, TimeUnit.SECONDS);
+        mypod2.waitForAnyLeader(2, TimeUnit.SECONDS);
+
+        String leader = mypod1.getCurrentLeader();
+        assertTrue(leader.startsWith("mypod"));
+        assertEquals("Leaders should be equals", mypod2.getCurrentLeader(), leader);
+    }
+
+    @Test
+    public void testMultipleMembersLeaderElection() throws Exception {
+        int number = 5;
+        List<LeaderRecorder> members = IntStream.range(0, number).mapToObj(i -> addMember("mypod" + i)).collect(Collectors.toList());
+        context.start();
+
+        for (LeaderRecorder member : members) {
+            member.waitForAnyLeader(2, TimeUnit.SECONDS);
+        }
+
+        Set<String> leaders = members.stream().map(LeaderRecorder::getCurrentLeader).collect(Collectors.toSet());
+        assertEquals(1, leaders.size());
+        String leader = leaders.iterator().next();
+        assertTrue(leader.startsWith("mypod"));
+    }
+
+    @Test
+    public void testSimpleLeaderElectionWithExistingConfigMap() throws Exception {
+        lockSimulator.setConfigMap(new ConfigMapBuilder()
+                .withNewMetadata()
+                .withName("leaders")
+                .and().build(), true);
+
+        LeaderRecorder mypod1 = addMember("mypod1");
+        LeaderRecorder mypod2 = addMember("mypod2");
+        context.start();
+
+        mypod1.waitForAnyLeader(2, TimeUnit.SECONDS);
+        mypod2.waitForAnyLeader(2, TimeUnit.SECONDS);
+
+        String leader = mypod1.getCurrentLeader();
+        assertTrue(leader.startsWith("mypod"));
+        assertEquals("Leaders should be equals", mypod2.getCurrentLeader(), leader);
+    }
+
+    @Test
+    public void testLeadershipLoss() throws Exception {
+        LeaderRecorder mypod1 = addMember("mypod1");
+        LeaderRecorder mypod2 = addMember("mypod2");
+        context.start();
+
+        mypod1.waitForAnyLeader(2, TimeUnit.SECONDS);
+        mypod2.waitForAnyLeader(2, TimeUnit.SECONDS);
+
+        String firstLeader = mypod1.getCurrentLeader();
+
+        LeaderRecorder formerLeaderRecorder = firstLeader.equals("mypod1") ? mypod1 : mypod2;
+        LeaderRecorder formerLoserRecorder = firstLeader.equals("mypod1") ? mypod2 : mypod1;
+
+        refuseRequestsFromPod(firstLeader);
+
+        formerLeaderRecorder.waitForALeaderChange(7, TimeUnit.SECONDS);
+        formerLoserRecorder.waitForANewLeader(firstLeader, 7, TimeUnit.SECONDS);
+
+        String secondLeader = formerLoserRecorder.getCurrentLeader();
+        assertNotEquals("The firstLeader should be different from the new one", firstLeader, secondLeader);
+
+        Long lossTimestamp = formerLeaderRecorder.getLastTimeOf(l -> l == null);
+        Long gainTimestamp = formerLoserRecorder.getLastTimeOf(secondLeader::equals);
+
+        assertTrue("At least 2 seconds must elapse from leadership loss and regain (see renewDeadlineSeconds)", gainTimestamp >= lossTimestamp + 2000);
+        checkLeadershipChangeDistance(LEASE_TIME_SECONDS, TimeUnit.SECONDS, mypod1, mypod2);
+    }
+
+    @Test
+    public void testSlowLeaderLosingLeadership() throws Exception {
+        LeaderRecorder mypod1 = addMember("mypod1");
+        LeaderRecorder mypod2 = addMember("mypod2");
+        context.start();
+
+        mypod1.waitForAnyLeader(2, TimeUnit.SECONDS);
+        mypod2.waitForAnyLeader(2, TimeUnit.SECONDS);
+
+        String firstLeader = mypod1.getCurrentLeader();
+
+        LeaderRecorder formerLeaderRecorder = firstLeader.equals("mypod1") ? mypod1 : mypod2;
+        LeaderRecorder formerLoserRecorder = firstLeader.equals("mypod1") ? mypod2 : mypod1;
+
+        delayRequestsFromPod(firstLeader, 10, TimeUnit.SECONDS);
+
+        formerLeaderRecorder.waitForALeaderChange(7, TimeUnit.SECONDS);
+        formerLoserRecorder.waitForANewLeader(firstLeader, 7, TimeUnit.SECONDS);
+
+        String secondLeader = formerLoserRecorder.getCurrentLeader();
+        assertNotEquals("The firstLeader should be different from the new one", firstLeader, secondLeader);
+
+        Long lossTimestamp = formerLeaderRecorder.getLastTimeOf(l -> l == null);
+        Long gainTimestamp = formerLoserRecorder.getLastTimeOf(secondLeader::equals);
+
+        assertTrue("At least 2 seconds must elapse from leadership loss and regain (see renewDeadlineSeconds)", gainTimestamp >= lossTimestamp + 2000);
+        checkLeadershipChangeDistance(LEASE_TIME_SECONDS, TimeUnit.SECONDS, mypod1, mypod2);
+    }
+
+    @Test
+    public void testRecoveryAfterFailure() throws Exception {
+        LeaderRecorder mypod1 = addMember("mypod1");
+        LeaderRecorder mypod2 = addMember("mypod2");
+        context.start();
+
+        mypod1.waitForAnyLeader(2, TimeUnit.SECONDS);
+        mypod2.waitForAnyLeader(2, TimeUnit.SECONDS);
+
+        String firstLeader = mypod1.getCurrentLeader();
+
+        for (int i = 0; i < 3; i++) {
+            refuseRequestsFromPod(firstLeader);
+            Thread.sleep(1000);
+            allowRequestsFromPod(firstLeader);
+            Thread.sleep(2000);
+        }
+
+        assertEquals(firstLeader, mypod1.getCurrentLeader());
+        assertEquals(firstLeader, mypod2.getCurrentLeader());
+    }
+
+    @Test
+    public void testSharedConfigMap() throws Exception {
+        LeaderRecorder a1 = addMember("a1");
+        LeaderRecorder a2 = addMember("a2");
+        LeaderRecorder b1 = addMember("b1", "app2");
+        LeaderRecorder b2 = addMember("b2", "app2");
+        context.start();
+
+        a1.waitForAnyLeader(2, TimeUnit.SECONDS);
+        a2.waitForAnyLeader(2, TimeUnit.SECONDS);
+        b1.waitForAnyLeader(2, TimeUnit.SECONDS);
+        b1.waitForAnyLeader(2, TimeUnit.SECONDS);
+
+        assertNotNull(a1.getCurrentLeader());
+        assertTrue(a1.getCurrentLeader().startsWith("a"));
+        assertEquals(a1.getCurrentLeader(), a2.getCurrentLeader());
+        assertNotNull(b1.getCurrentLeader());
+        assertTrue(b1.getCurrentLeader().startsWith("b"));
+        assertEquals(b1.getCurrentLeader(), b2.getCurrentLeader());
+
+        assertNotEquals(a1.getCurrentLeader(), b2.getCurrentLeader());
+    }
+
+    private void delayRequestsFromPod(String pod, long delay, TimeUnit unit) {
+        this.lockServers.get(pod).setDelayRequests(TimeUnit.MILLISECONDS.convert(delay, unit));
+    }
+
+    private void refuseRequestsFromPod(String pod) {
+        this.lockServers.get(pod).setRefuseRequests(true);
+    }
+
+    private void allowRequestsFromPod(String pod) {
+        this.lockServers.get(pod).setRefuseRequests(false);
+    }
+
+    private void checkLeadershipChangeDistance(long minimum, TimeUnit unit, LeaderRecorder... recorders) {
+        List<LeaderRecorder.LeadershipInfo> infos = Arrays.stream(recorders)
+                .flatMap(lr -> lr.getLeadershipInfo().stream())
+                .sorted((li1, li2) -> Long.compare(li1.getChangeTimestamp(), li2.getChangeTimestamp()))
+                .collect(Collectors.toList());
+
+        LeaderRecorder.LeadershipInfo currentLeaderLastSeen = null;
+        for (LeaderRecorder.LeadershipInfo info : infos) {
+            if (currentLeaderLastSeen == null || currentLeaderLastSeen.getLeader() == null) {
+                currentLeaderLastSeen = info;
+            } else {
+                if (Objects.equals(info.getLeader(), currentLeaderLastSeen.getLeader())) {
+                    currentLeaderLastSeen = info;
+                } else if (info.getLeader() != null && !info.getLeader().equals(currentLeaderLastSeen.getLeader())) {
+                    // switch
+                    long delay = info.getChangeTimestamp() - currentLeaderLastSeen.getChangeTimestamp();
+                    assertTrue("Lease time not elapsed between switch", delay >= TimeUnit.MILLISECONDS.convert(minimum, unit));
+                    currentLeaderLastSeen = info;
+                }
+            }
+        }
+    }
+
+    private LeaderRecorder addMember(String name) {
+        return addMember(name, "app");
+    }
+
+    private LeaderRecorder addMember(String name, String namespace) {
+        assertNull(this.lockServers.get(name));
+
+        LockTestServer lockServer = new LockTestServer(lockSimulator);
+        this.lockServers.put(name, lockServer);
+
+        KubernetesConfiguration configuration = new KubernetesConfiguration();
+        configuration.setKubernetesClient(lockServer.createClient());
+
+        KubernetesClusterService member = new KubernetesClusterService(configuration);
+        member.setKubernetesNamespace("test");
+        member.setPodName(name);
+        member.setLeaseDurationSeconds(LEASE_TIME_SECONDS);
+        member.setRenewDeadlineSeconds(3); // 5-3 = at least 2 seconds for switching on leadership loss
+        member.setRetryPeriodSeconds(1);
+        member.setRetryOnErrorIntervalSeconds(1);
+        member.setJitterFactor(1.2);
+
+        LeaderRecorder recorder = new LeaderRecorder();
+        try {
+            member.getView(namespace).addEventListener(recorder);
+            context().addService(member);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+        return recorder;
+    }
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/ConfigMapLockSimulator.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/ConfigMapLockSimulator.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/ConfigMapLockSimulator.java
new file mode 100644
index 0000000..1c3d7d0
--- /dev/null
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/ConfigMapLockSimulator.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.utils;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Central lock for testing leader election.
+ */
+public class ConfigMapLockSimulator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigMapLockSimulator.class);
+
+    private String configMapName;
+
+    private ConfigMap currentMap;
+
+    private long versionCounter = 1000000;
+
+    public ConfigMapLockSimulator(String configMapName) {
+        this.configMapName = configMapName;
+    }
+
+    public String getConfigMapName() {
+        return configMapName;
+    }
+
+    public synchronized boolean setConfigMap(ConfigMap map, boolean insert) {
+        // Insert
+        if (insert && currentMap != null) {
+            LOG.error("Current map should have been null");
+            return false;
+        }
+
+        // Update
+        if (!insert && currentMap == null) {
+            LOG.error("Current map should not have been null");
+            return false;
+        }
+        String version = map.getMetadata() != null ? map.getMetadata().getResourceVersion() : null;
+        if (version != null) {
+            long versionLong = Long.parseLong(version);
+            if (versionLong != versionCounter) {
+                LOG.warn("Current resource version is {} while the update is related to version {}", versionCounter, versionLong);
+                return false;
+            }
+        }
+
+        this.currentMap = new ConfigMapBuilder(map)
+                .editOrNewMetadata()
+                .withResourceVersion(String.valueOf(++versionCounter))
+                .endMetadata()
+                .build();
+        return true;
+    }
+
+    public synchronized ConfigMap getConfigMap() {
+        if (currentMap == null) {
+            return null;
+        }
+
+        return new ConfigMapBuilder(currentMap).build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java
new file mode 100644
index 0000000..6670f37
--- /dev/null
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.utils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+import org.apache.camel.ha.CamelClusterEventListener;
+import org.apache.camel.ha.CamelClusterMember;
+import org.apache.camel.ha.CamelClusterView;
+import org.junit.Assert;
+
+/**
+ * Records leadership changes and allow to do assertions.
+ */
+public class LeaderRecorder implements CamelClusterEventListener.Leadership {
+
+    private List<LeadershipInfo> leaderships = new CopyOnWriteArrayList<>();
+
+    @Override
+    public void leadershipChanged(CamelClusterView view, CamelClusterMember leader) {
+        this.leaderships.add(new LeadershipInfo(leader != null ? leader.getId() : null, System.currentTimeMillis()));
+    }
+
+    public List<LeadershipInfo> getLeadershipInfo() {
+        return leaderships;
+    }
+
+    public void waitForAnyLeader(long time, TimeUnit unit) {
+        waitForLeader(leader -> leader != null, time, unit);
+    }
+
+    public void waitForALeaderChange(long time, TimeUnit unit) {
+        String current = getCurrentLeader();
+        waitForLeader(leader -> !Objects.equals(current, leader), time, unit);
+    }
+
+    public void waitForANewLeader(String current, long time, TimeUnit unit) {
+        waitForLeader(leader -> leader != null && !Objects.equals(current, leader), time, unit);
+    }
+
+    public void waitForLeader(Predicate<String> as, long time, TimeUnit unit) {
+        long start = System.currentTimeMillis();
+        while (!as.test(getCurrentLeader())) {
+            if (System.currentTimeMillis() - start > TimeUnit.MILLISECONDS.convert(time, unit)) {
+                Assert.fail("Timeout while waiting for condition");
+            }
+            doWait(50);
+        }
+    }
+
+    private void doWait(long millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String getCurrentLeader() {
+        if (leaderships.size() > 0) {
+            return leaderships.get(leaderships.size() - 1).getLeader();
+        }
+        return null;
+    }
+
+    public Long getLastTimeOf(Predicate<String> p) {
+        List<LeadershipInfo> lst = new ArrayList<>(leaderships);
+        Collections.reverse(lst);
+        for (LeadershipInfo info : lst) {
+            if (p.test(info.getLeader())) {
+                return info.getChangeTimestamp();
+            }
+        }
+        return null;
+    }
+
+    public static class LeadershipInfo {
+        private String leader;
+        private long changeTimestamp;
+
+        public LeadershipInfo(String leader, long changeTimestamp) {
+            this.leader = leader;
+            this.changeTimestamp = changeTimestamp;
+        }
+
+        public String getLeader() {
+            return leader;
+        }
+
+        public long getChangeTimestamp() {
+            return changeTimestamp;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java
new file mode 100644
index 0000000..6422e35
--- /dev/null
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.utils;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.PodListBuilder;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import io.fabric8.mockwebserver.utils.ResponseProvider;
+
+import okhttp3.mockwebserver.RecordedRequest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Test server to interact with Kubernetes for locking on a ConfigMap.
+ */
+public class LockTestServer extends KubernetesMockServer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LockTestServer.class);
+
+    private boolean refuseRequests;
+
+    private Long delayRequests;
+
+    public LockTestServer(ConfigMapLockSimulator lockSimulator) {
+
+        expect().get().withPath("/api/v1/namespaces/test/configmaps/" + lockSimulator.getConfigMapName()).andReply(new ResponseProvider<Object>() {
+            ThreadLocal<Integer> responseCode = new ThreadLocal<>();
+
+            @Override
+            public int getStatusCode() {
+                return responseCode.get();
+            }
+
+            @Override
+            public Object getBody(RecordedRequest recordedRequest) {
+                delayIfNecessary();
+                if (refuseRequests) {
+                    responseCode.set(500);
+                    return "";
+                }
+
+                ConfigMap map = lockSimulator.getConfigMap();
+                if (map != null) {
+                    responseCode.set(200);
+                    return map;
+                } else {
+                    responseCode.set(404);
+                    return "";
+                }
+            }
+        }).always();
+
+        expect().post().withPath("/api/v1/namespaces/test/configmaps").andReply(new ResponseProvider<Object>() {
+            ThreadLocal<Integer> responseCode = new ThreadLocal<>();
+
+            @Override
+            public int getStatusCode() {
+                return responseCode.get();
+            }
+
+            @Override
+            public Object getBody(RecordedRequest recordedRequest) {
+                delayIfNecessary();
+                if (refuseRequests) {
+                    responseCode.set(500);
+                    return "";
+                }
+
+                ConfigMap map = convert(recordedRequest);
+                if (map == null || map.getMetadata() == null || !lockSimulator.getConfigMapName().equals(map.getMetadata().getName())) {
+                    throw new IllegalArgumentException("Illegal configMap received");
+                }
+
+                boolean done = lockSimulator.setConfigMap(map, true);
+                if (done) {
+                    responseCode.set(201);
+                    return lockSimulator.getConfigMap();
+                } else {
+                    responseCode.set(500);
+                    return "";
+                }
+            }
+        }).always();
+
+        expect().put().withPath("/api/v1/namespaces/test/configmaps/" + lockSimulator.getConfigMapName()).andReply(new ResponseProvider<Object>() {
+            ThreadLocal<Integer> responseCode = new ThreadLocal<>();
+
+            @Override
+            public int getStatusCode() {
+                return responseCode.get();
+            }
+
+            @Override
+            public Object getBody(RecordedRequest recordedRequest) {
+                delayIfNecessary();
+                if (refuseRequests) {
+                    responseCode.set(500);
+                    return "";
+                }
+
+                ConfigMap map = convert(recordedRequest);
+
+                boolean done = lockSimulator.setConfigMap(map, false);
+                if (done) {
+                    responseCode.set(200);
+                    return lockSimulator.getConfigMap();
+                } else {
+                    responseCode.set(409);
+                    return "";
+                }
+            }
+        }).always();
+
+        // Other resources
+        expect().get().withPath("/api/v1/namespaces/test/pods").andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion("1").and().build()).always();
+        expect().get().withPath("/api/v1/namespaces/test/pods?resourceVersion=1&watch=true").andUpgradeToWebSocket().open().done().always();
+    }
+
+
+    public boolean isRefuseRequests() {
+        return refuseRequests;
+    }
+
+    public void setRefuseRequests(boolean refuseRequests) {
+        this.refuseRequests = refuseRequests;
+    }
+
+    public Long getDelayRequests() {
+        return delayRequests;
+    }
+
+    public void setDelayRequests(Long delayRequests) {
+        this.delayRequests = delayRequests;
+    }
+
+    private void delayIfNecessary() {
+        if (delayRequests != null) {
+            try {
+                Thread.sleep(delayRequests);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private ConfigMap convert(RecordedRequest request) {
+        try {
+            ObjectMapper mapper = new ObjectMapper();
+            return mapper.readValue(request.getBody().readByteArray(), ConfigMap.class);
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Erroneous data", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServerTest.java
new file mode 100644
index 0000000..282b83f
--- /dev/null
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServerTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.utils;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Basic tests on the lock test server.
+ */
+public class LockTestServerTest {
+
+    @Test
+    public void test() {
+        ConfigMapLockSimulator lock = new ConfigMapLockSimulator("xxx");
+        LockTestServer server = new LockTestServer(lock);
+        KubernetesClient client = server.createClient();
+
+        assertNull(client.configMaps().withName("xxx").get());
+
+        client.configMaps().withName("xxx").createNew()
+                .withNewMetadata()
+                .withName("xxx")
+                .and().done();
+
+        try {
+            client.configMaps().withName("xxx").createNew()
+                    .withNewMetadata()
+                    .withName("xxx")
+                    .and().done();
+            Assert.fail("Should have failed for duplicate insert");
+        } catch (Exception e) {
+        }
+
+        client.configMaps().withName("xxx")
+                .createOrReplaceWithNew()
+                .editOrNewMetadata()
+                .withName("xxx")
+                .addToLabels("a", "b")
+                .and().done();
+
+        ConfigMap map = client.configMaps().withName("xxx").get();
+        assertEquals("b", map.getMetadata().getLabels().get("a"));
+
+
+        client.configMaps().withName("xxx")
+                .lockResourceVersion(map.getMetadata().getResourceVersion())
+                .replace(new ConfigMapBuilder(map)
+                        .editOrNewMetadata()
+                        .withName("xxx")
+                        .addToLabels("c", "d")
+                        .and()
+                        .build());
+
+        ConfigMap newMap = client.configMaps().withName("xxx").get();
+        assertEquals("d", newMap.getMetadata().getLabels().get("c"));
+
+        try {
+            client.configMaps().withName("xxx")
+                    .lockResourceVersion(map.getMetadata().getResourceVersion())
+                    .replace(new ConfigMapBuilder(map)
+                            .editOrNewMetadata()
+                            .withName("xxx")
+                            .addToLabels("e", "f")
+                            .and()
+                            .build());
+            Assert.fail("Should have failed for wrong version");
+        } catch (Exception ex) {
+        }
+
+        ConfigMap newMap2 = client.configMaps().withName("xxx").get();
+        assertNull(newMap2.getMetadata().getLabels().get("e"));
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 8030a2a..516033f 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -491,6 +491,7 @@
     <mina2-version>2.0.16</mina2-version>
     <minimal-json-version>0.9.4</minimal-json-version>
     <mock-javamail-version>1.9</mock-javamail-version>
+    <mockwebserver-version>0.0.13</mockwebserver-version>
     <mockito-version>1.10.19</mockito-version>
     <mongo-java-driver-version>3.5.0</mongo-java-driver-version>
     <mongo-java-driver32-version>3.2.2</mongo-java-driver32-version>