You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/08/08 14:43:26 UTC
[08/14] camel git commit: CAMEL-11331: Adding tests and fixing impl
CAMEL-11331: Adding tests and fixing impl
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/881b9331
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/881b9331
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/881b9331
Branch: refs/heads/master
Commit: 881b9331f0712b4e611087eb11e623016dd3de72
Parents: debfeed
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Fri Jul 14 11:21:50 2017 +0200
Committer: Nicola Ferraro <ni...@gmail.com>
Committed: Tue Aug 8 16:39:43 2017 +0200
----------------------------------------------------------------------
components/camel-kubernetes/pom.xml | 6 +
.../kubernetes/ha/lock/ConfigMapLockUtils.java | 2 +-
...ubernetesLeaseBasedLeadershipController.java | 15 +-
.../ha/lock/KubernetesLockConfiguration.java | 6 +-
.../ha/KubernetesClusterServiceTest.java | 291 +++++++++++++++++++
.../ha/utils/ConfigMapLockSimulator.java | 83 ++++++
.../kubernetes/ha/utils/LeaderRecorder.java | 115 ++++++++
.../kubernetes/ha/utils/LockTestServer.java | 175 +++++++++++
.../kubernetes/ha/utils/LockTestServerTest.java | 97 +++++++
parent/pom.xml | 1 +
10 files changed, 784 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/pom.xml b/components/camel-kubernetes/pom.xml
index c444068..38fa037 100644
--- a/components/camel-kubernetes/pom.xml
+++ b/components/camel-kubernetes/pom.xml
@@ -67,6 +67,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>${mockwebserver-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-spring</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
index 84718f3..70fa860 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/ConfigMapLockUtils.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- *
+ * Utilities for managing ConfigMaps that contain lock information.
*/
public final class ConfigMapLockUtils {
http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
index 8e96a72..42be2e7 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLeaseBasedLeadershipController.java
@@ -163,8 +163,9 @@ public class KubernetesLeaseBasedLeadershipController implements Service {
long time = Math.min(timeRetry, timeDeadline);
long delay = Math.max(0, time - System.currentTimeMillis());
- LOG.debug("Next renewal timeout event will be fired in {} seconds", delay / 1000);
- return delay;
+ long delayJittered = jitter(delay, lockConfiguration.getJitterFactor());
+ LOG.debug("Next renewal timeout event will be fired in {} seconds", delayJittered / 1000);
+ return delayJittered;
}
@@ -340,10 +341,18 @@ public class KubernetesLeaseBasedLeadershipController implements Service {
private void updateLatestLeaderInfo(ConfigMap configMap) {
LOG.debug("Updating internal status about the current leader");
this.latestLeaderInfo = ConfigMapLockUtils.getLeaderInfo(configMap, this.lockConfiguration.getGroupName());
+
+ // Notify about changes in current leader if any
+ this.eventDispatcherExecutor.execute(this::checkAndNotifyNewLeader);
+ if (this.latestLeaderInfo.isLeader(this.lockConfiguration.getPodName())) {
+ this.eventDispatcherExecutor.schedule(this::checkAndNotifyNewLeader, this.lockConfiguration.getRenewDeadlineSeconds() * 1000 + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
+ } else if (this.latestLeaderInfo.getLeader() != null) {
+ this.eventDispatcherExecutor.schedule(this::checkAndNotifyNewLeader, this.lockConfiguration.getLeaseDurationSeconds() * 1000 + FIXED_ADDITIONAL_DELAY, TimeUnit.MILLISECONDS);
+ }
}
private void checkAndNotifyNewLeader() {
- LOG.debug("Checking if the current leader has changed to notify the event handler...");
+ LOG.info("Checking if the current leader has changed to notify the event handler...");
LeaderInfo newLeaderInfo = this.latestLeaderInfo;
if (newLeaderInfo == null) {
return;
http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
index 37e0251..6461708 100644
--- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
+++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/ha/lock/KubernetesLockConfiguration.java
@@ -30,9 +30,9 @@ public class KubernetesLockConfiguration implements Cloneable {
public static final double DEFAULT_JITTER_FACTOR = 1.2;
- public static final long DEFAULT_LEASE_DURATION_SECONDS = 20;
- public static final long DEFAULT_RENEW_DEADLINE_SECONDS = 15;
- public static final long DEFAULT_RETRY_PERIOD_SECONDS = 6;
+ public static final long DEFAULT_LEASE_DURATION_SECONDS = 60;
+ public static final long DEFAULT_RENEW_DEADLINE_SECONDS = 45;
+ public static final long DEFAULT_RETRY_PERIOD_SECONDS = 9;
public static final long DEFAULT_RETRY_ON_ERROR_INTERVAL_SECONDS = 5;
public static final long DEFAULT_WATCH_REFRESH_INTERVAL_SECONDS = 1800;
http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
new file mode 100644
index 0000000..4baebc6
--- /dev/null
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/KubernetesClusterServiceTest.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+
+import org.apache.camel.component.kubernetes.KubernetesConfiguration;
+import org.apache.camel.component.kubernetes.ha.utils.ConfigMapLockSimulator;
+import org.apache.camel.component.kubernetes.ha.utils.LeaderRecorder;
+import org.apache.camel.component.kubernetes.ha.utils.LockTestServer;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test leader election scenarios using a mock server.
+ */
+public class KubernetesClusterServiceTest extends CamelTestSupport {
+
+ private static final int LEASE_TIME_SECONDS = 5;
+
+ private ConfigMapLockSimulator lockSimulator;
+
+ private Map<String, LockTestServer> lockServers;
+
+ @Before
+ public void prepareLock() {
+ this.lockSimulator = new ConfigMapLockSimulator("leaders");
+ this.lockServers = new HashMap<>();
+ }
+
+ @After
+ public void shutdownLock() {
+ for (LockTestServer server : this.lockServers.values()) {
+ try {
+ server.destroy();
+ } catch (Exception e) {
+ // can happen in case of delay
+ }
+ }
+ }
+
+ @Test
+ public void testSimpleLeaderElection() throws Exception {
+ LeaderRecorder mypod1 = addMember("mypod1");
+ LeaderRecorder mypod2 = addMember("mypod2");
+ context.start();
+
+ mypod1.waitForAnyLeader(2, TimeUnit.SECONDS);
+ mypod2.waitForAnyLeader(2, TimeUnit.SECONDS);
+
+ String leader = mypod1.getCurrentLeader();
+ assertTrue(leader.startsWith("mypod"));
+ assertEquals("Leaders should be equals", mypod2.getCurrentLeader(), leader);
+ }
+
+ @Test
+ public void testMultipleMembersLeaderElection() throws Exception {
+ int number = 5;
+ List<LeaderRecorder> members = IntStream.range(0, number).mapToObj(i -> addMember("mypod" + i)).collect(Collectors.toList());
+ context.start();
+
+ for (LeaderRecorder member : members) {
+ member.waitForAnyLeader(2, TimeUnit.SECONDS);
+ }
+
+ Set<String> leaders = members.stream().map(LeaderRecorder::getCurrentLeader).collect(Collectors.toSet());
+ assertEquals(1, leaders.size());
+ String leader = leaders.iterator().next();
+ assertTrue(leader.startsWith("mypod"));
+ }
+
+ @Test
+ public void testSimpleLeaderElectionWithExistingConfigMap() throws Exception {
+ lockSimulator.setConfigMap(new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName("leaders")
+ .and().build(), true);
+
+ LeaderRecorder mypod1 = addMember("mypod1");
+ LeaderRecorder mypod2 = addMember("mypod2");
+ context.start();
+
+ mypod1.waitForAnyLeader(2, TimeUnit.SECONDS);
+ mypod2.waitForAnyLeader(2, TimeUnit.SECONDS);
+
+ String leader = mypod1.getCurrentLeader();
+ assertTrue(leader.startsWith("mypod"));
+ assertEquals("Leaders should be equals", mypod2.getCurrentLeader(), leader);
+ }
+
+ @Test
+ public void testLeadershipLoss() throws Exception {
+ LeaderRecorder mypod1 = addMember("mypod1");
+ LeaderRecorder mypod2 = addMember("mypod2");
+ context.start();
+
+ mypod1.waitForAnyLeader(2, TimeUnit.SECONDS);
+ mypod2.waitForAnyLeader(2, TimeUnit.SECONDS);
+
+ String firstLeader = mypod1.getCurrentLeader();
+
+ LeaderRecorder formerLeaderRecorder = firstLeader.equals("mypod1") ? mypod1 : mypod2;
+ LeaderRecorder formerLoserRecorder = firstLeader.equals("mypod1") ? mypod2 : mypod1;
+
+ refuseRequestsFromPod(firstLeader);
+
+ formerLeaderRecorder.waitForALeaderChange(7, TimeUnit.SECONDS);
+ formerLoserRecorder.waitForANewLeader(firstLeader, 7, TimeUnit.SECONDS);
+
+ String secondLeader = formerLoserRecorder.getCurrentLeader();
+ assertNotEquals("The firstLeader should be different from the new one", firstLeader, secondLeader);
+
+ Long lossTimestamp = formerLeaderRecorder.getLastTimeOf(l -> l == null);
+ Long gainTimestamp = formerLoserRecorder.getLastTimeOf(secondLeader::equals);
+
+ assertTrue("At least 2 seconds must elapse from leadership loss and regain (see renewDeadlineSeconds)", gainTimestamp >= lossTimestamp + 2000);
+ checkLeadershipChangeDistance(LEASE_TIME_SECONDS, TimeUnit.SECONDS, mypod1, mypod2);
+ }
+
+ @Test
+ public void testSlowLeaderLosingLeadership() throws Exception {
+ LeaderRecorder mypod1 = addMember("mypod1");
+ LeaderRecorder mypod2 = addMember("mypod2");
+ context.start();
+
+ mypod1.waitForAnyLeader(2, TimeUnit.SECONDS);
+ mypod2.waitForAnyLeader(2, TimeUnit.SECONDS);
+
+ String firstLeader = mypod1.getCurrentLeader();
+
+ LeaderRecorder formerLeaderRecorder = firstLeader.equals("mypod1") ? mypod1 : mypod2;
+ LeaderRecorder formerLoserRecorder = firstLeader.equals("mypod1") ? mypod2 : mypod1;
+
+ delayRequestsFromPod(firstLeader, 10, TimeUnit.SECONDS);
+
+ formerLeaderRecorder.waitForALeaderChange(7, TimeUnit.SECONDS);
+ formerLoserRecorder.waitForANewLeader(firstLeader, 7, TimeUnit.SECONDS);
+
+ String secondLeader = formerLoserRecorder.getCurrentLeader();
+ assertNotEquals("The firstLeader should be different from the new one", firstLeader, secondLeader);
+
+ Long lossTimestamp = formerLeaderRecorder.getLastTimeOf(l -> l == null);
+ Long gainTimestamp = formerLoserRecorder.getLastTimeOf(secondLeader::equals);
+
+ assertTrue("At least 2 seconds must elapse from leadership loss and regain (see renewDeadlineSeconds)", gainTimestamp >= lossTimestamp + 2000);
+ checkLeadershipChangeDistance(LEASE_TIME_SECONDS, TimeUnit.SECONDS, mypod1, mypod2);
+ }
+
+ @Test
+ public void testRecoveryAfterFailure() throws Exception {
+ LeaderRecorder mypod1 = addMember("mypod1");
+ LeaderRecorder mypod2 = addMember("mypod2");
+ context.start();
+
+ mypod1.waitForAnyLeader(2, TimeUnit.SECONDS);
+ mypod2.waitForAnyLeader(2, TimeUnit.SECONDS);
+
+ String firstLeader = mypod1.getCurrentLeader();
+
+ for (int i = 0; i < 3; i++) {
+ refuseRequestsFromPod(firstLeader);
+ Thread.sleep(1000);
+ allowRequestsFromPod(firstLeader);
+ Thread.sleep(2000);
+ }
+
+ assertEquals(firstLeader, mypod1.getCurrentLeader());
+ assertEquals(firstLeader, mypod2.getCurrentLeader());
+ }
+
+ @Test
+ public void testSharedConfigMap() throws Exception {
+ LeaderRecorder a1 = addMember("a1");
+ LeaderRecorder a2 = addMember("a2");
+ LeaderRecorder b1 = addMember("b1", "app2");
+ LeaderRecorder b2 = addMember("b2", "app2");
+ context.start();
+
+ a1.waitForAnyLeader(2, TimeUnit.SECONDS);
+ a2.waitForAnyLeader(2, TimeUnit.SECONDS);
+ b1.waitForAnyLeader(2, TimeUnit.SECONDS);
+ b1.waitForAnyLeader(2, TimeUnit.SECONDS);
+
+ assertNotNull(a1.getCurrentLeader());
+ assertTrue(a1.getCurrentLeader().startsWith("a"));
+ assertEquals(a1.getCurrentLeader(), a2.getCurrentLeader());
+ assertNotNull(b1.getCurrentLeader());
+ assertTrue(b1.getCurrentLeader().startsWith("b"));
+ assertEquals(b1.getCurrentLeader(), b2.getCurrentLeader());
+
+ assertNotEquals(a1.getCurrentLeader(), b2.getCurrentLeader());
+ }
+
+ private void delayRequestsFromPod(String pod, long delay, TimeUnit unit) {
+ this.lockServers.get(pod).setDelayRequests(TimeUnit.MILLISECONDS.convert(delay, unit));
+ }
+
+ private void refuseRequestsFromPod(String pod) {
+ this.lockServers.get(pod).setRefuseRequests(true);
+ }
+
+ private void allowRequestsFromPod(String pod) {
+ this.lockServers.get(pod).setRefuseRequests(false);
+ }
+
+ private void checkLeadershipChangeDistance(long minimum, TimeUnit unit, LeaderRecorder... recorders) {
+ List<LeaderRecorder.LeadershipInfo> infos = Arrays.stream(recorders)
+ .flatMap(lr -> lr.getLeadershipInfo().stream())
+ .sorted((li1, li2) -> Long.compare(li1.getChangeTimestamp(), li2.getChangeTimestamp()))
+ .collect(Collectors.toList());
+
+ LeaderRecorder.LeadershipInfo currentLeaderLastSeen = null;
+ for (LeaderRecorder.LeadershipInfo info : infos) {
+ if (currentLeaderLastSeen == null || currentLeaderLastSeen.getLeader() == null) {
+ currentLeaderLastSeen = info;
+ } else {
+ if (Objects.equals(info.getLeader(), currentLeaderLastSeen.getLeader())) {
+ currentLeaderLastSeen = info;
+ } else if (info.getLeader() != null && !info.getLeader().equals(currentLeaderLastSeen.getLeader())) {
+ // switch
+ long delay = info.getChangeTimestamp() - currentLeaderLastSeen.getChangeTimestamp();
+ assertTrue("Lease time not elapsed between switch", delay >= TimeUnit.MILLISECONDS.convert(minimum, unit));
+ currentLeaderLastSeen = info;
+ }
+ }
+ }
+ }
+
+ private LeaderRecorder addMember(String name) {
+ return addMember(name, "app");
+ }
+
+ private LeaderRecorder addMember(String name, String namespace) {
+ assertNull(this.lockServers.get(name));
+
+ LockTestServer lockServer = new LockTestServer(lockSimulator);
+ this.lockServers.put(name, lockServer);
+
+ KubernetesConfiguration configuration = new KubernetesConfiguration();
+ configuration.setKubernetesClient(lockServer.createClient());
+
+ KubernetesClusterService member = new KubernetesClusterService(configuration);
+ member.setKubernetesNamespace("test");
+ member.setPodName(name);
+ member.setLeaseDurationSeconds(LEASE_TIME_SECONDS);
+ member.setRenewDeadlineSeconds(3); // 5-3 = at least 2 seconds for switching on leadership loss
+ member.setRetryPeriodSeconds(1);
+ member.setRetryOnErrorIntervalSeconds(1);
+ member.setJitterFactor(1.2);
+
+ LeaderRecorder recorder = new LeaderRecorder();
+ try {
+ member.getView(namespace).addEventListener(recorder);
+ context().addService(member);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ return recorder;
+ }
+
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/ConfigMapLockSimulator.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/ConfigMapLockSimulator.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/ConfigMapLockSimulator.java
new file mode 100644
index 0000000..1c3d7d0
--- /dev/null
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/ConfigMapLockSimulator.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.utils;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Central lock for testing leader election.
+ */
+public class ConfigMapLockSimulator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigMapLockSimulator.class);
+
+ private String configMapName;
+
+ private ConfigMap currentMap;
+
+ private long versionCounter = 1000000;
+
+ public ConfigMapLockSimulator(String configMapName) {
+ this.configMapName = configMapName;
+ }
+
+ public String getConfigMapName() {
+ return configMapName;
+ }
+
+ public synchronized boolean setConfigMap(ConfigMap map, boolean insert) {
+ // Insert
+ if (insert && currentMap != null) {
+ LOG.error("Current map should have been null");
+ return false;
+ }
+
+ // Update
+ if (!insert && currentMap == null) {
+ LOG.error("Current map should not have been null");
+ return false;
+ }
+ String version = map.getMetadata() != null ? map.getMetadata().getResourceVersion() : null;
+ if (version != null) {
+ long versionLong = Long.parseLong(version);
+ if (versionLong != versionCounter) {
+ LOG.warn("Current resource version is {} while the update is related to version {}", versionCounter, versionLong);
+ return false;
+ }
+ }
+
+ this.currentMap = new ConfigMapBuilder(map)
+ .editOrNewMetadata()
+ .withResourceVersion(String.valueOf(++versionCounter))
+ .endMetadata()
+ .build();
+ return true;
+ }
+
+ public synchronized ConfigMap getConfigMap() {
+ if (currentMap == null) {
+ return null;
+ }
+
+ return new ConfigMapBuilder(currentMap).build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java
new file mode 100644
index 0000000..6670f37
--- /dev/null
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.utils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+import org.apache.camel.ha.CamelClusterEventListener;
+import org.apache.camel.ha.CamelClusterMember;
+import org.apache.camel.ha.CamelClusterView;
+import org.junit.Assert;
+
+/**
+ * Records leadership changes and allow to do assertions.
+ */
+public class LeaderRecorder implements CamelClusterEventListener.Leadership {
+
+ private List<LeadershipInfo> leaderships = new CopyOnWriteArrayList<>();
+
+ @Override
+ public void leadershipChanged(CamelClusterView view, CamelClusterMember leader) {
+ this.leaderships.add(new LeadershipInfo(leader != null ? leader.getId() : null, System.currentTimeMillis()));
+ }
+
+ public List<LeadershipInfo> getLeadershipInfo() {
+ return leaderships;
+ }
+
+ public void waitForAnyLeader(long time, TimeUnit unit) {
+ waitForLeader(leader -> leader != null, time, unit);
+ }
+
+ public void waitForALeaderChange(long time, TimeUnit unit) {
+ String current = getCurrentLeader();
+ waitForLeader(leader -> !Objects.equals(current, leader), time, unit);
+ }
+
+ public void waitForANewLeader(String current, long time, TimeUnit unit) {
+ waitForLeader(leader -> leader != null && !Objects.equals(current, leader), time, unit);
+ }
+
+ public void waitForLeader(Predicate<String> as, long time, TimeUnit unit) {
+ long start = System.currentTimeMillis();
+ while (!as.test(getCurrentLeader())) {
+ if (System.currentTimeMillis() - start > TimeUnit.MILLISECONDS.convert(time, unit)) {
+ Assert.fail("Timeout while waiting for condition");
+ }
+ doWait(50);
+ }
+ }
+
+ private void doWait(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getCurrentLeader() {
+ if (leaderships.size() > 0) {
+ return leaderships.get(leaderships.size() - 1).getLeader();
+ }
+ return null;
+ }
+
+ public Long getLastTimeOf(Predicate<String> p) {
+ List<LeadershipInfo> lst = new ArrayList<>(leaderships);
+ Collections.reverse(lst);
+ for (LeadershipInfo info : lst) {
+ if (p.test(info.getLeader())) {
+ return info.getChangeTimestamp();
+ }
+ }
+ return null;
+ }
+
+ public static class LeadershipInfo {
+ private String leader;
+ private long changeTimestamp;
+
+ public LeadershipInfo(String leader, long changeTimestamp) {
+ this.leader = leader;
+ this.changeTimestamp = changeTimestamp;
+ }
+
+ public String getLeader() {
+ return leader;
+ }
+
+ public long getChangeTimestamp() {
+ return changeTimestamp;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java
new file mode 100644
index 0000000..6422e35
--- /dev/null
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.utils;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.PodListBuilder;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import io.fabric8.mockwebserver.utils.ResponseProvider;
+
+import okhttp3.mockwebserver.RecordedRequest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Test server to interact with Kubernetes for locking on a ConfigMap.
+ */
+public class LockTestServer extends KubernetesMockServer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LockTestServer.class);
+
+ private boolean refuseRequests;
+
+ private Long delayRequests;
+
+ public LockTestServer(ConfigMapLockSimulator lockSimulator) {
+
+ expect().get().withPath("/api/v1/namespaces/test/configmaps/" + lockSimulator.getConfigMapName()).andReply(new ResponseProvider<Object>() {
+ ThreadLocal<Integer> responseCode = new ThreadLocal<>();
+
+ @Override
+ public int getStatusCode() {
+ return responseCode.get();
+ }
+
+ @Override
+ public Object getBody(RecordedRequest recordedRequest) {
+ delayIfNecessary();
+ if (refuseRequests) {
+ responseCode.set(500);
+ return "";
+ }
+
+ ConfigMap map = lockSimulator.getConfigMap();
+ if (map != null) {
+ responseCode.set(200);
+ return map;
+ } else {
+ responseCode.set(404);
+ return "";
+ }
+ }
+ }).always();
+
+ expect().post().withPath("/api/v1/namespaces/test/configmaps").andReply(new ResponseProvider<Object>() {
+ ThreadLocal<Integer> responseCode = new ThreadLocal<>();
+
+ @Override
+ public int getStatusCode() {
+ return responseCode.get();
+ }
+
+ @Override
+ public Object getBody(RecordedRequest recordedRequest) {
+ delayIfNecessary();
+ if (refuseRequests) {
+ responseCode.set(500);
+ return "";
+ }
+
+ ConfigMap map = convert(recordedRequest);
+ if (map == null || map.getMetadata() == null || !lockSimulator.getConfigMapName().equals(map.getMetadata().getName())) {
+ throw new IllegalArgumentException("Illegal configMap received");
+ }
+
+ boolean done = lockSimulator.setConfigMap(map, true);
+ if (done) {
+ responseCode.set(201);
+ return lockSimulator.getConfigMap();
+ } else {
+ responseCode.set(500);
+ return "";
+ }
+ }
+ }).always();
+
+ expect().put().withPath("/api/v1/namespaces/test/configmaps/" + lockSimulator.getConfigMapName()).andReply(new ResponseProvider<Object>() {
+ ThreadLocal<Integer> responseCode = new ThreadLocal<>();
+
+ @Override
+ public int getStatusCode() {
+ return responseCode.get();
+ }
+
+ @Override
+ public Object getBody(RecordedRequest recordedRequest) {
+ delayIfNecessary();
+ if (refuseRequests) {
+ responseCode.set(500);
+ return "";
+ }
+
+ ConfigMap map = convert(recordedRequest);
+
+ boolean done = lockSimulator.setConfigMap(map, false);
+ if (done) {
+ responseCode.set(200);
+ return lockSimulator.getConfigMap();
+ } else {
+ responseCode.set(409);
+ return "";
+ }
+ }
+ }).always();
+
+ // Other resources
+ expect().get().withPath("/api/v1/namespaces/test/pods").andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion("1").and().build()).always();
+ expect().get().withPath("/api/v1/namespaces/test/pods?resourceVersion=1&watch=true").andUpgradeToWebSocket().open().done().always();
+ }
+
+
+ public boolean isRefuseRequests() {
+ return refuseRequests;
+ }
+
+ public void setRefuseRequests(boolean refuseRequests) {
+ this.refuseRequests = refuseRequests;
+ }
+
+ public Long getDelayRequests() {
+ return delayRequests;
+ }
+
+ public void setDelayRequests(Long delayRequests) {
+ this.delayRequests = delayRequests;
+ }
+
+ private void delayIfNecessary() {
+ if (delayRequests != null) {
+ try {
+ Thread.sleep(delayRequests);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private ConfigMap convert(RecordedRequest request) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(request.getBody().readByteArray(), ConfigMap.class);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Erroneous data", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServerTest.java
new file mode 100644
index 0000000..282b83f
--- /dev/null
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServerTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha.utils;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Basic tests on the lock test server.
+ */
+public class LockTestServerTest {
+
+ @Test
+ public void test() {
+ ConfigMapLockSimulator lock = new ConfigMapLockSimulator("xxx");
+ LockTestServer server = new LockTestServer(lock);
+ KubernetesClient client = server.createClient();
+
+ assertNull(client.configMaps().withName("xxx").get());
+
+ client.configMaps().withName("xxx").createNew()
+ .withNewMetadata()
+ .withName("xxx")
+ .and().done();
+
+ try {
+ client.configMaps().withName("xxx").createNew()
+ .withNewMetadata()
+ .withName("xxx")
+ .and().done();
+ Assert.fail("Should have failed for duplicate insert");
+ } catch (Exception e) {
+ }
+
+ client.configMaps().withName("xxx")
+ .createOrReplaceWithNew()
+ .editOrNewMetadata()
+ .withName("xxx")
+ .addToLabels("a", "b")
+ .and().done();
+
+ ConfigMap map = client.configMaps().withName("xxx").get();
+ assertEquals("b", map.getMetadata().getLabels().get("a"));
+
+
+ client.configMaps().withName("xxx")
+ .lockResourceVersion(map.getMetadata().getResourceVersion())
+ .replace(new ConfigMapBuilder(map)
+ .editOrNewMetadata()
+ .withName("xxx")
+ .addToLabels("c", "d")
+ .and()
+ .build());
+
+ ConfigMap newMap = client.configMaps().withName("xxx").get();
+ assertEquals("d", newMap.getMetadata().getLabels().get("c"));
+
+ try {
+ client.configMaps().withName("xxx")
+ .lockResourceVersion(map.getMetadata().getResourceVersion())
+ .replace(new ConfigMapBuilder(map)
+ .editOrNewMetadata()
+ .withName("xxx")
+ .addToLabels("e", "f")
+ .and()
+ .build());
+ Assert.fail("Should have failed for wrong version");
+ } catch (Exception ex) {
+ }
+
+ ConfigMap newMap2 = client.configMaps().withName("xxx").get();
+ assertNull(newMap2.getMetadata().getLabels().get("e"));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/881b9331/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 8030a2a..516033f 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -491,6 +491,7 @@
<mina2-version>2.0.16</mina2-version>
<minimal-json-version>0.9.4</minimal-json-version>
<mock-javamail-version>1.9</mock-javamail-version>
+ <mockwebserver-version>0.0.13</mockwebserver-version>
<mockito-version>1.10.19</mockito-version>
<mongo-java-driver-version>3.5.0</mongo-java-driver-version>
<mongo-java-driver32-version>3.2.2</mongo-java-driver32-version>