You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/08/08 14:43:21 UTC
[03/14] camel git commit: CAMEL-11331: Clock-drift-free version of
the protocol
http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java
new file mode 100644
index 0000000..8380147
--- /dev/null
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.camel.component.kubernetes.ha.lock.KubernetesClusterEvent;
+import org.apache.camel.component.kubernetes.ha.lock.TimedLeaderNotifier;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test the behavior of the timed notifier.
+ */
+public class TimedLeaderNotifierTest {
+
+ private TimedLeaderNotifier notifier;
+
+ private volatile Optional<String> currentLeader;
+
+ private volatile Set<String> currentMembers;
+
+ @Before
+ public void init() throws Exception {
+ this.notifier = new TimedLeaderNotifier(e -> {
+ if (e instanceof KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) {
+ currentLeader = ((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) e).getData();
+ } else if (e instanceof KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) {
+ currentMembers = ((KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) e).getData();
+ }
+ });
+ this.notifier.start();
+ }
+
+ @After
+ public void destroy() throws Exception {
+ this.notifier.stop();
+ }
+
+ @Test
+ public void testMultipleCalls() throws Exception {
+ Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three"));
+ notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 50L, members);
+ notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis(), 50L, members);
+ notifier.refreshLeadership(Optional.of("three"), System.currentTimeMillis(), 5000L, members);
+ Thread.sleep(80);
+ assertEquals(Optional.of("three"), currentLeader);
+ assertEquals(members, currentMembers);
+ }
+
+ @Test
+ public void testExpiration() throws Exception {
+ Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three"));
+ notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 50L, members);
+ notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis(), 50L, members);
+ Thread.sleep(160);
+ assertEquals(Optional.empty(), currentLeader);
+ assertEquals(members, currentMembers);
+ notifier.refreshLeadership(Optional.of("three"), System.currentTimeMillis(), 5000L, members);
+ Thread.sleep(80);
+ assertEquals(Optional.of("three"), currentLeader);
+ assertEquals(members, currentMembers);
+ }
+
+ @Test
+ public void testMemberChanging() throws Exception {
+ Set<String> members1 = Collections.singleton("one");
+ Set<String> members2 = new TreeSet<>(Arrays.asList("one", "two"));
+ notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 50L, members1);
+ notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis(), 5000L, members2);
+ Thread.sleep(80);
+ assertEquals(Optional.of("two"), currentLeader);
+ assertEquals(members2, currentMembers);
+ }
+
+ @Test
+ public void testOldData() throws Exception {
+ Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three"));
+ notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 1000L, members);
+ Thread.sleep(80);
+ notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis() - 1000, 900L, members);
+ Thread.sleep(80);
+ assertEquals(Optional.empty(), currentLeader);
+ }
+
+ @Test
+ public void testNewLeaderEmpty() throws Exception {
+ Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three"));
+ notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 1000L, members);
+ Thread.sleep(80);
+ notifier.refreshLeadership(Optional.empty(), null, null, members);
+ Thread.sleep(80);
+ assertEquals(Optional.empty(), currentLeader);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java
index 6670f37..7d7147b 100644
--- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java
@@ -28,16 +28,21 @@ import org.apache.camel.ha.CamelClusterEventListener;
import org.apache.camel.ha.CamelClusterMember;
import org.apache.camel.ha.CamelClusterView;
import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Records leadership changes and allow to do assertions.
*/
public class LeaderRecorder implements CamelClusterEventListener.Leadership {
+ private static final Logger LOG = LoggerFactory.getLogger(LeaderRecorder.class);
+
private List<LeadershipInfo> leaderships = new CopyOnWriteArrayList<>();
@Override
public void leadershipChanged(CamelClusterView view, CamelClusterMember leader) {
+ LOG.info("Cluster view {} - leader changed to: {}", view.getLocalMember(), leader);
this.leaderships.add(new LeadershipInfo(leader != null ? leader.getId() : null, System.currentTimeMillis()));
}
http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java
index 6422e35..3dc2423 100644
--- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java
@@ -17,10 +17,16 @@
package org.apache.camel.component.kubernetes.ha.utils;
import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodListBuilder;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import io.fabric8.mockwebserver.utils.ResponseProvider;
@@ -41,7 +47,15 @@ public class LockTestServer extends KubernetesMockServer {
private Long delayRequests;
+ private Set<String> pods;
+
public LockTestServer(ConfigMapLockSimulator lockSimulator) {
+ this(lockSimulator, Collections.emptySet());
+ }
+
+ public LockTestServer(ConfigMapLockSimulator lockSimulator, Collection<String> initialPods) {
+
+ this.pods = new TreeSet<>(initialPods);
expect().get().withPath("/api/v1/namespaces/test/configmaps/" + lockSimulator.getConfigMapName()).andReply(new ResponseProvider<Object>() {
ThreadLocal<Integer> responseCode = new ThreadLocal<>();
@@ -132,8 +146,9 @@ public class LockTestServer extends KubernetesMockServer {
}).always();
// Other resources
- expect().get().withPath("/api/v1/namespaces/test/pods").andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion("1").and().build()).always();
- expect().get().withPath("/api/v1/namespaces/test/pods?resourceVersion=1&watch=true").andUpgradeToWebSocket().open().done().always();
+ expect().get().withPath("/api/v1/namespaces/test/pods").andReply(200, request -> new PodListBuilder().withNewMetadata().withResourceVersion("1").and().withItems(
+ getCurrentPods().stream().map(name -> new PodBuilder().withNewMetadata().withName(name).and().build()).collect(Collectors.toList())
+ ).build()).always();
}
@@ -145,6 +160,18 @@ public class LockTestServer extends KubernetesMockServer {
this.refuseRequests = refuseRequests;
}
+ public synchronized Collection<String> getCurrentPods() {
+ return new TreeSet<>(this.pods);
+ }
+
+ public synchronized void removePod(String pod) {
+ this.pods.remove(pod);
+ }
+
+ public synchronized void addPod(String pod) {
+ this.pods.add(pod);
+ }
+
public Long getDelayRequests() {
return delayRequests;
}