You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by nf...@apache.org on 2017/08/08 14:43:21 UTC

[03/14] camel git commit: CAMEL-11331: Clock-drift-free version of the protocol

http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java
new file mode 100644
index 0000000..8380147
--- /dev/null
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/TimedLeaderNotifierTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kubernetes.ha;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.camel.component.kubernetes.ha.lock.KubernetesClusterEvent;
+import org.apache.camel.component.kubernetes.ha.lock.TimedLeaderNotifier;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test the behavior of the timed notifier.
+ */
+public class TimedLeaderNotifierTest {
+
+    private TimedLeaderNotifier notifier;
+
+    private volatile Optional<String> currentLeader;
+
+    private volatile Set<String> currentMembers;
+
+    @Before
+    public void init() throws Exception {
+        this.notifier = new TimedLeaderNotifier(e -> {
+            if (e instanceof KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) {
+                currentLeader = ((KubernetesClusterEvent.KubernetesClusterLeaderChangedEvent) e).getData();
+            } else if (e instanceof KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) {
+                currentMembers = ((KubernetesClusterEvent.KubernetesClusterMemberListChangedEvent) e).getData();
+            }
+        });
+        this.notifier.start();
+    }
+
+    @After
+    public void destroy() throws Exception {
+        this.notifier.stop();
+    }
+
+    @Test
+    public void testMultipleCalls() throws Exception {
+        Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three"));
+        notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 50L, members);
+        notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis(), 50L, members);
+        notifier.refreshLeadership(Optional.of("three"), System.currentTimeMillis(), 5000L, members);
+        Thread.sleep(80);
+        assertEquals(Optional.of("three"), currentLeader);
+        assertEquals(members, currentMembers);
+    }
+
+    @Test
+    public void testExpiration() throws Exception {
+        Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three"));
+        notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 50L, members);
+        notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis(), 50L, members);
+        Thread.sleep(160);
+        assertEquals(Optional.empty(), currentLeader);
+        assertEquals(members, currentMembers);
+        notifier.refreshLeadership(Optional.of("three"), System.currentTimeMillis(), 5000L, members);
+        Thread.sleep(80);
+        assertEquals(Optional.of("three"), currentLeader);
+        assertEquals(members, currentMembers);
+    }
+
+    @Test
+    public void testMemberChanging() throws Exception {
+        Set<String> members1 = Collections.singleton("one");
+        Set<String> members2 = new TreeSet<>(Arrays.asList("one", "two"));
+        notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 50L, members1);
+        notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis(), 5000L, members2);
+        Thread.sleep(80);
+        assertEquals(Optional.of("two"), currentLeader);
+        assertEquals(members2, currentMembers);
+    }
+
+    @Test
+    public void testOldData() throws Exception {
+        Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three"));
+        notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 1000L, members);
+        Thread.sleep(80);
+        notifier.refreshLeadership(Optional.of("two"), System.currentTimeMillis() - 1000, 900L, members);
+        Thread.sleep(80);
+        assertEquals(Optional.empty(), currentLeader);
+    }
+
+    @Test
+    public void testNewLeaderEmpty() throws Exception {
+        Set<String> members = new TreeSet<>(Arrays.asList("one", "two", "three"));
+        notifier.refreshLeadership(Optional.of("one"), System.currentTimeMillis(), 1000L, members);
+        Thread.sleep(80);
+        notifier.refreshLeadership(Optional.empty(), null, null, members);
+        Thread.sleep(80);
+        assertEquals(Optional.empty(), currentLeader);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java
index 6670f37..7d7147b 100644
--- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LeaderRecorder.java
@@ -28,16 +28,21 @@ import org.apache.camel.ha.CamelClusterEventListener;
 import org.apache.camel.ha.CamelClusterMember;
 import org.apache.camel.ha.CamelClusterView;
 import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Records leadership changes and allow to do assertions.
  */
 public class LeaderRecorder implements CamelClusterEventListener.Leadership {
 
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderRecorder.class);
+
     private List<LeadershipInfo> leaderships = new CopyOnWriteArrayList<>();
 
     @Override
     public void leadershipChanged(CamelClusterView view, CamelClusterMember leader) {
+        LOG.info("Cluster view {} - leader changed to: {}", view.getLocalMember(), leader);
         this.leaderships.add(new LeadershipInfo(leader != null ? leader.getId() : null, System.currentTimeMillis()));
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/f0b00ab9/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java
----------------------------------------------------------------------
diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java
index 6422e35..3dc2423 100644
--- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java
+++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/ha/utils/LockTestServer.java
@@ -17,10 +17,16 @@
 package org.apache.camel.component.kubernetes.ha.utils;
 
 import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.PodListBuilder;
 import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import io.fabric8.mockwebserver.utils.ResponseProvider;
@@ -41,7 +47,15 @@ public class LockTestServer extends KubernetesMockServer {
 
     private Long delayRequests;
 
+    private Set<String> pods;
+
     public LockTestServer(ConfigMapLockSimulator lockSimulator) {
+        this(lockSimulator, Collections.emptySet());
+    }
+
+    public LockTestServer(ConfigMapLockSimulator lockSimulator, Collection<String> initialPods) {
+
+        this.pods = new TreeSet<>(initialPods);
 
         expect().get().withPath("/api/v1/namespaces/test/configmaps/" + lockSimulator.getConfigMapName()).andReply(new ResponseProvider<Object>() {
             ThreadLocal<Integer> responseCode = new ThreadLocal<>();
@@ -132,8 +146,9 @@ public class LockTestServer extends KubernetesMockServer {
         }).always();
 
         // Other resources
-        expect().get().withPath("/api/v1/namespaces/test/pods").andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion("1").and().build()).always();
-        expect().get().withPath("/api/v1/namespaces/test/pods?resourceVersion=1&watch=true").andUpgradeToWebSocket().open().done().always();
+        expect().get().withPath("/api/v1/namespaces/test/pods").andReply(200, request -> new PodListBuilder().withNewMetadata().withResourceVersion("1").and().withItems(
+                getCurrentPods().stream().map(name -> new PodBuilder().withNewMetadata().withName(name).and().build()).collect(Collectors.toList())
+        ).build()).always();
     }
 
 
@@ -145,6 +160,18 @@ public class LockTestServer extends KubernetesMockServer {
         this.refuseRequests = refuseRequests;
     }
 
+    public synchronized Collection<String> getCurrentPods() {
+        return new TreeSet<>(this.pods);
+    }
+
+    public synchronized void removePod(String pod) {
+        this.pods.remove(pod);
+    }
+
+    public synchronized void addPod(String pod) {
+        this.pods.add(pod);
+    }
+
     public Long getDelayRequests() {
         return delayRequests;
     }