You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/06/27 08:27:42 UTC

[2/2] camel git commit: CAMEL-11457: camel-atomix - No new leader when all nodes are killed forcefully

CAMEL-11457: camel-atomix - No new leader when all nodes are killed forcefully


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/57eb512d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/57eb512d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/57eb512d

Branch: refs/heads/master
Commit: 57eb512dbe06eca9c756c17ade4d1f7e85f27ea6
Parents: 3d99fc3
Author: lburgazzoli <lb...@gmail.com>
Authored: Tue Jun 27 10:13:20 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Tue Jun 27 10:27:18 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/atomix-map-component.adoc     |   3 +-
 .../main/docs/atomix-messaging-component.adoc   |   3 +-
 .../main/docs/atomix-multimap-component.adoc    |   3 +-
 .../src/main/docs/atomix-queue-component.adoc   |   3 +-
 .../src/main/docs/atomix-set-component.adoc     |   3 +-
 .../src/main/docs/atomix-value-component.adoc   |   3 +-
 .../component/atomix/AtomixConfiguration.java   |  17 +++
 .../atomix/ha/AtomixClusterClientService.java   |   8 ++
 .../atomix/ha/AtomixClusterService.java         |   8 ++
 .../component/atomix/ha/AtomixClusterView.java  |  40 +++---
 .../atomix/ha/AtomixClientRoutePolicyTest.java  | 106 ++--------------
 .../ha/AtomixClientRoutePolicyTestSupport.java  | 121 +++++++++++++++++++
 .../AtomixEphemeralClientRoutePolicyTest.java   |  34 ++++++
 .../atomix/ha/AtomixRoutePolicyTest.java        |  33 ++---
 14 files changed, 247 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/57eb512d/components/camel-atomix/src/main/docs/atomix-map-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/docs/atomix-map-component.adoc b/components/camel-atomix/src/main/docs/atomix-map-component.adoc
index 820354a..5c9cd96 100644
--- a/components/camel-atomix/src/main/docs/atomix-map-component.adoc
+++ b/components/camel-atomix/src/main/docs/atomix-map-component.adoc
@@ -55,7 +55,7 @@ with the following path and query parameters:
 | **resourceName** | *Required* The distributed resource name |  | String
 |=======================================================================
 
-#### Query Parameters (17 parameters):
+#### Query Parameters (18 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -73,6 +73,7 @@ with the following path and query parameters:
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | **defaultResourceConfig** (advanced) | The cluster wide default resource configuration. |  | Properties
 | **defaultResourceOptions** (advanced) | The local default resource options. |  | Properties
+| **ephemeral** (advanced) | Sets if the local member should join groups as PersistentMember or not. If set to ephemeral the local member will receive an auto generated ID thus the local one is ignored. | false | boolean
 | **readConsistency** (advanced) | The read consistency level. |  | ReadConsistency
 | **resourceConfigs** (advanced) | Cluster wide resources configuration. |  | Map
 | **resourceOptions** (advanced) | Local resources configurations |  | Map

http://git-wip-us.apache.org/repos/asf/camel/blob/57eb512d/components/camel-atomix/src/main/docs/atomix-messaging-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/docs/atomix-messaging-component.adoc b/components/camel-atomix/src/main/docs/atomix-messaging-component.adoc
index f5fe1ac..5efec51 100644
--- a/components/camel-atomix/src/main/docs/atomix-messaging-component.adoc
+++ b/components/camel-atomix/src/main/docs/atomix-messaging-component.adoc
@@ -54,7 +54,7 @@ with the following path and query parameters:
 | **resourceName** | *Required* The distributed resource name |  | String
 |=======================================================================
 
-#### Query Parameters (18 parameters):
+#### Query Parameters (19 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -73,6 +73,7 @@ with the following path and query parameters:
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | **defaultResourceConfig** (advanced) | The cluster wide default resource configuration. |  | Properties
 | **defaultResourceOptions** (advanced) | The local default resource options. |  | Properties
+| **ephemeral** (advanced) | Sets if the local member should join groups as PersistentMember or not. If set to ephemeral the local member will receive an auto generated ID thus the local one is ignored. | false | boolean
 | **readConsistency** (advanced) | The read consistency level. |  | ReadConsistency
 | **resourceConfigs** (advanced) | Cluster wide resources configuration. |  | Map
 | **resourceOptions** (advanced) | Local resources configurations |  | Map

http://git-wip-us.apache.org/repos/asf/camel/blob/57eb512d/components/camel-atomix/src/main/docs/atomix-multimap-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/docs/atomix-multimap-component.adoc b/components/camel-atomix/src/main/docs/atomix-multimap-component.adoc
index cd83d3d..7672baf 100644
--- a/components/camel-atomix/src/main/docs/atomix-multimap-component.adoc
+++ b/components/camel-atomix/src/main/docs/atomix-multimap-component.adoc
@@ -54,7 +54,7 @@ with the following path and query parameters:
 | **resourceName** | *Required* The distributed resource name |  | String
 |=======================================================================
 
-#### Query Parameters (17 parameters):
+#### Query Parameters (18 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -72,6 +72,7 @@ with the following path and query parameters:
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | **defaultResourceConfig** (advanced) | The cluster wide default resource configuration. |  | Properties
 | **defaultResourceOptions** (advanced) | The local default resource options. |  | Properties
+| **ephemeral** (advanced) | Sets if the local member should join groups as PersistentMember or not. If set to ephemeral the local member will receive an auto generated ID thus the local one is ignored. | false | boolean
 | **readConsistency** (advanced) | The read consistency level. |  | ReadConsistency
 | **resourceConfigs** (advanced) | Cluster wide resources configuration. |  | Map
 | **resourceOptions** (advanced) | Local resources configurations |  | Map

http://git-wip-us.apache.org/repos/asf/camel/blob/57eb512d/components/camel-atomix/src/main/docs/atomix-queue-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/docs/atomix-queue-component.adoc b/components/camel-atomix/src/main/docs/atomix-queue-component.adoc
index 511fcff..60bb8b9 100644
--- a/components/camel-atomix/src/main/docs/atomix-queue-component.adoc
+++ b/components/camel-atomix/src/main/docs/atomix-queue-component.adoc
@@ -54,7 +54,7 @@ with the following path and query parameters:
 | **resourceName** | *Required* The distributed resource name |  | String
 |=======================================================================
 
-#### Query Parameters (15 parameters):
+#### Query Parameters (16 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -70,6 +70,7 @@ with the following path and query parameters:
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | **defaultResourceConfig** (advanced) | The cluster wide default resource configuration. |  | Properties
 | **defaultResourceOptions** (advanced) | The local default resource options. |  | Properties
+| **ephemeral** (advanced) | Sets if the local member should join groups as PersistentMember or not. If set to ephemeral the local member will receive an auto generated ID thus the local one is ignored. | false | boolean
 | **readConsistency** (advanced) | The read consistency level. |  | ReadConsistency
 | **resourceConfigs** (advanced) | Cluster wide resources configuration. |  | Map
 | **resourceOptions** (advanced) | Local resources configurations |  | Map

http://git-wip-us.apache.org/repos/asf/camel/blob/57eb512d/components/camel-atomix/src/main/docs/atomix-set-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/docs/atomix-set-component.adoc b/components/camel-atomix/src/main/docs/atomix-set-component.adoc
index 913a7e7..7f5eb11 100644
--- a/components/camel-atomix/src/main/docs/atomix-set-component.adoc
+++ b/components/camel-atomix/src/main/docs/atomix-set-component.adoc
@@ -54,7 +54,7 @@ with the following path and query parameters:
 | **resourceName** | *Required* The distributed resource name |  | String
 |=======================================================================
 
-#### Query Parameters (16 parameters):
+#### Query Parameters (17 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -71,6 +71,7 @@ with the following path and query parameters:
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | **defaultResourceConfig** (advanced) | The cluster wide default resource configuration. |  | Properties
 | **defaultResourceOptions** (advanced) | The local default resource options. |  | Properties
+| **ephemeral** (advanced) | Sets if the local member should join groups as PersistentMember or not. If set to ephemeral the local member will receive an auto generated ID thus the local one is ignored. | false | boolean
 | **readConsistency** (advanced) | The read consistency level. |  | ReadConsistency
 | **resourceConfigs** (advanced) | Cluster wide resources configuration. |  | Map
 | **resourceOptions** (advanced) | Local resources configurations |  | Map

http://git-wip-us.apache.org/repos/asf/camel/blob/57eb512d/components/camel-atomix/src/main/docs/atomix-value-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/docs/atomix-value-component.adoc b/components/camel-atomix/src/main/docs/atomix-value-component.adoc
index c35f4f9..8437d89 100644
--- a/components/camel-atomix/src/main/docs/atomix-value-component.adoc
+++ b/components/camel-atomix/src/main/docs/atomix-value-component.adoc
@@ -54,7 +54,7 @@ with the following path and query parameters:
 | **resourceName** | *Required* The distributed resource name |  | String
 |=======================================================================
 
-#### Query Parameters (16 parameters):
+#### Query Parameters (17 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -71,6 +71,7 @@ with the following path and query parameters:
 | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | **defaultResourceConfig** (advanced) | The cluster wide default resource configuration. |  | Properties
 | **defaultResourceOptions** (advanced) | The local default resource options. |  | Properties
+| **ephemeral** (advanced) | Sets if the local member should join groups as PersistentMember or not. If set to ephemeral the local member will receive an auto generated ID thus the local one is ignored. | false | boolean
 | **readConsistency** (advanced) | The read consistency level. |  | ReadConsistency
 | **resourceConfigs** (advanced) | Cluster wide resources configuration. |  | Map
 | **resourceOptions** (advanced) | Local resources configurations |  | Map

http://git-wip-us.apache.org/repos/asf/camel/blob/57eb512d/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java
index 2163686..2e1e979 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java
@@ -51,6 +51,9 @@ public class AtomixConfiguration<T extends Atomix> implements Cloneable {
     private Map<String, Properties> resourceConfigs;
     @UriParam(label = "advanced", prefix = "resource.options")
     private Map<String, Properties> resourceOptions;
+    @UriParam(label = "advanced", defaultValue = "false")
+    private boolean ephemeral;
+
 
     protected AtomixConfiguration() {
     }
@@ -221,4 +224,18 @@ public class AtomixConfiguration<T extends Atomix> implements Cloneable {
 
         return properties;
     }
+
+    public boolean isEphemeral() {
+        return ephemeral;
+    }
+
+    /**
+     * Sets if the local member should join groups as PersistentMember or not.
+     *
+     * If set to ephemeral the local member will receive an auto generated ID thus
+     * the local one is ignored.
+     */
+    public void setEphemeral(boolean ephemeral) {
+        this.ephemeral = ephemeral;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/57eb512d/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
index cc135ca..6194f81 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterClientService.java
@@ -92,6 +92,14 @@ public final class AtomixClusterClientService extends AbstractCamelClusterServic
         configuration.setConfigurationUri(configurationUri);
     }
 
+    public boolean isEphemeral() {
+        return configuration.isEphemeral();
+    }
+
+    public void setEphemeral(boolean ephemeral) {
+        configuration.setEphemeral(ephemeral);
+    }
+
     // *********************************************
     // Lifecycle
     // *********************************************

http://git-wip-us.apache.org/repos/asf/camel/blob/57eb512d/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
index a7901ce..3faf79e 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
@@ -122,6 +122,14 @@ public final class AtomixClusterService extends AbstractCamelClusterService<Atom
         configuration.setConfigurationUri(configurationUri);
     }
 
+    public boolean isEphemeral() {
+        return configuration.isEphemeral();
+    }
+
+    public void setEphemeral(boolean ephemeral) {
+        configuration.setEphemeral(ephemeral);
+    }
+
     // *********************************************
     // Lifecycle
     // *********************************************

http://git-wip-us.apache.org/repos/asf/camel/blob/57eb512d/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
index 68e4429..4f02a85 100644
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
+++ b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterView.java
@@ -61,7 +61,7 @@ final class AtomixClusterView extends AbstractCamelClusterView {
             return Optional.empty();
         }
 
-        return Optional.of(asCamelClusterMember(leader));
+        return Optional.of(new AtomixClusterMember(leader));
     }
 
     @Override
@@ -76,14 +76,10 @@ final class AtomixClusterView extends AbstractCamelClusterView {
         }
 
         return this.group.members().stream()
-            .map(this::asCamelClusterMember)
+            .map(AtomixClusterMember::new)
             .collect(Collectors.toList());
     }
 
-    private AtomixClusterMember asCamelClusterMember(GroupMember member) {
-        return new AtomixClusterMember(group, member);
-    }
-
     @SuppressWarnings("unchecked")
     @Override
     protected void doStart() throws Exception {
@@ -96,17 +92,17 @@ final class AtomixClusterView extends AbstractCamelClusterView {
                 new DistributedGroup.Options(configuration.getResourceOptions(getNamespace()))
             ).get();
 
-            LOGGER.debug("Join group {}", getNamespace());
-            localMember.join();
-
             LOGGER.debug("Listen election events");
-            group.election().onElection(term -> fireLeadershipChangedEvent(asCamelClusterMember(term.leader())));
+            group.election().onElection(term -> fireLeadershipChangedEvent(new AtomixClusterMember(term.leader())));
 
             LOGGER.debug("Listen join events");
-            group.onJoin(member -> fireMemberAddedEvent(asCamelClusterMember(member)));
+            group.onJoin(member -> fireMemberAddedEvent(new AtomixClusterMember(member)));
 
             LOGGER.debug("Listen leave events");
-            group.onLeave(member -> fireMemberRemovedEvent(asCamelClusterMember(member)));
+            group.onLeave(member -> fireMemberRemovedEvent(new AtomixClusterMember(member)));
+
+            LOGGER.debug("Join group {}", getNamespace());
+            localMember.join();
         }
     }
 
@@ -152,12 +148,13 @@ final class AtomixClusterView extends AbstractCamelClusterView {
         AtomixLocalMember join() throws ExecutionException, InterruptedException {
             if (member == null && group != null) {
                 String id = getClusterService().getId();
-                if (ObjectHelper.isNotEmpty(id)) {
-                    LOGGER.debug("Joining group: {}, with id: {}", group, id);
-                    member = group.join(id).join();
-                } else {
+                if (ObjectHelper.isEmpty(id) || configuration.isEphemeral()) {
                     LOGGER.debug("Joining group: {} ", group);
                     member = group.join().join();
+                    LOGGER.debug("Group {} joined with id {}", group, member.id());
+                } else {
+                    LOGGER.debug("Joining group: {}, with id: {}", group, id);
+                    member = group.join(id).join();
                 }
             }
 
@@ -183,11 +180,9 @@ final class AtomixClusterView extends AbstractCamelClusterView {
     }
 
     class AtomixClusterMember implements CamelClusterMember {
-        private final DistributedGroup group;
         private final GroupMember member;
 
-        AtomixClusterMember(DistributedGroup group, GroupMember member) {
-            this.group = group;
+        AtomixClusterMember(GroupMember member) {
             this.member = member;
         }
 
@@ -198,6 +193,13 @@ final class AtomixClusterView extends AbstractCamelClusterView {
 
         @Override
         public boolean isMaster() {
+            if (group == null) {
+                return false;
+            }
+            if (member == null) {
+                return false;
+            }
+
             return member.equals(group.election().term().leader());
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/57eb512d/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java
index 371eb58..be12571 100644
--- a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java
+++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTest.java
@@ -16,107 +16,19 @@
  */
 package org.apache.camel.component.atomix.ha;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
-import io.atomix.AtomixReplica;
 import io.atomix.catalyst.transport.Address;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.atomix.client.AtomixFactory;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
-import org.apache.camel.test.AvailablePortFinder;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.camel.ha.CamelClusterService;
 
-public final class AtomixClientRoutePolicyTest {
-    private static final Address ADDRESS = new Address("127.0.0.1", AvailablePortFinder.getNextAvailable());
-    private static final Logger LOGGER = LoggerFactory.getLogger(AtomixClientRoutePolicyTest.class);
-    private static final List<String> CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList());
-    private static final List<String> RESULTS = new ArrayList<>();
-    private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2);
-    private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size());
+public final class AtomixClientRoutePolicyTest  extends AtomixClientRoutePolicyTestSupport {
+    @Override
+    protected CamelClusterService createClusterService(String id, Address bootstrapNode) {
+        AtomixClusterClientService service = new AtomixClusterClientService();
+        service.setId("node-" + id);
+        service.setNodes(Collections.singletonList(bootstrapNode));
+        service.setEphemeral(false);
 
-    // ************************************
-    // Test
-    // ************************************
-
-    @Test
-    public void test() throws Exception {
-        AtomixReplica boot = null;
-
-        try {
-            boot = AtomixFactory.replica(ADDRESS);
-
-            for (String id : CLIENTS) {
-                SCHEDULER.submit(() -> run(id));
-            }
-
-            LATCH.await(1, TimeUnit.MINUTES);
-            SCHEDULER.shutdownNow();
-
-            Assert.assertEquals(CLIENTS.size(), RESULTS.size());
-            Assert.assertTrue(RESULTS.containsAll(CLIENTS));
-        } finally {
-            if (boot != null) {
-                boot.shutdown();
-            }
-        }
-    }
-
-    // ************************************
-    // Run a Camel node
-    // ************************************
-
-    private static void run(String id) {
-        try {
-            CountDownLatch contextLatch = new CountDownLatch(1);
-
-            AtomixClusterClientService service = new AtomixClusterClientService();
-            service.setId("node-" + id);
-            service.setNodes(Collections.singletonList(ADDRESS));
-
-            DefaultCamelContext context = new DefaultCamelContext();
-            context.disableJMX();
-            context.setName("context-" + id);
-            context.addService(service);
-            context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
-            context.addRoutes(new RouteBuilder() {
-                @Override
-                public void configure() throws Exception {
-                    from("timer:atomix?delay=1s&period=1s&repeatCount=1")
-                        .routeId("route-" + id)
-                        .process(e -> {
-                            LOGGER.debug("Node {} done", id);
-                            RESULTS.add(id);
-                            // Shutdown the context later on to give a chance to
-                            // other members to catch-up
-                            SCHEDULER.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS);
-                        });
-                }
-            });
-
-            // Start the context after some random time so the startup order
-            // changes for each test.
-            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
-            context.start();
-
-            contextLatch.await();
-            context.stop();
-
-            LATCH.countDown();
-        } catch (Exception e) {
-            LOGGER.warn("", e);
-        }
+        return service;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/57eb512d/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java
new file mode 100644
index 0000000..bf425df
--- /dev/null
+++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixClientRoutePolicyTestSupport.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.ha;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import io.atomix.AtomixReplica;
+import io.atomix.catalyst.transport.Address;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.atomix.client.AtomixFactory;
+import org.apache.camel.ha.CamelClusterService;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.ha.ClusteredRoutePolicyFactory;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AtomixClientRoutePolicyTestSupport {
+    private static final Logger LOGGER = LoggerFactory.getLogger(AtomixClientRoutePolicyTestSupport.class);
+
+    private final Address address = new Address("127.0.0.1", AvailablePortFinder.getNextAvailable());
+    private final List<String> clients = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList());
+    private final List<String> results = new ArrayList<>();
+    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(clients.size() * 2);
+    private final CountDownLatch latch = new CountDownLatch(clients.size());
+
+    // ************************************
+    // Test
+    // ************************************
+
+    @Test
+    public void test() throws Exception {
+        AtomixReplica boot = null;
+
+        try {
+            boot = AtomixFactory.replica(address);
+
+            for (String id : clients) {
+                scheduler.submit(() -> run(id));
+            }
+
+            latch.await(1, TimeUnit.MINUTES);
+            scheduler.shutdownNow();
+
+            Assert.assertEquals(clients.size(), results.size());
+            Assert.assertTrue(results.containsAll(clients));
+        } finally {
+            if (boot != null) {
+                boot.shutdown();
+            }
+        }
+    }
+
+    // ************************************
+    // Run a Camel node
+    // ************************************
+
+    private void run(String id) {
+        try {
+            CountDownLatch contextLatch = new CountDownLatch(1);
+
+            DefaultCamelContext context = new DefaultCamelContext();
+            context.disableJMX();
+            context.setName("context-" + id);
+            context.addService(createClusterService(id, address));
+            context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace("my-ns"));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("timer:atomix?delay=1s&period=1s&repeatCount=1")
+                        .routeId("route-" + id)
+                        .process(e -> {
+                            LOGGER.debug("Node {} done", id);
+                            results.add(id);
+                            // Shutdown the context later on to give a chance to
+                            // other members to catch-up
+                            scheduler.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS);
+                        });
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+            context.stop();
+
+            latch.countDown();
+        } catch (Exception e) {
+            LOGGER.warn("", e);
+        }
+    }
+
+    protected abstract CamelClusterService createClusterService(String id, Address bootstrapNode);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/57eb512d/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixEphemeralClientRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixEphemeralClientRoutePolicyTest.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixEphemeralClientRoutePolicyTest.java
new file mode 100644
index 0000000..2abf562
--- /dev/null
+++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixEphemeralClientRoutePolicyTest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.ha;
+
+import java.util.Collections;
+
+import io.atomix.catalyst.transport.Address;
+import org.apache.camel.ha.CamelClusterService;
+
+public final class AtomixEphemeralClientRoutePolicyTest extends AtomixClientRoutePolicyTestSupport {
+    @Override
+    protected CamelClusterService createClusterService(String id, Address bootstrapNode) {
+        AtomixClusterClientService service = new AtomixClusterClientService();
+        service.setId("node-" + id);
+        service.setNodes(Collections.singletonList(bootstrapNode));
+        service.setEphemeral(true);
+
+        return service;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/57eb512d/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java
index c70372a..8e23d5d 100644
--- a/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java
+++ b/components/camel-atomix/src/test/java/org/apache/camel/component/atomix/ha/AtomixRoutePolicyTest.java
@@ -38,16 +38,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public final class AtomixRoutePolicyTest {
-    private static final List<Address> ADDRESSES = Arrays.asList(
+    private static final Logger LOGGER = LoggerFactory.getLogger(AtomixRoutePolicyTest.class);
+
+    private final List<Address> addresses = Arrays.asList(
         new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()),
         new Address("127.0.0.1", AvailablePortFinder.getNextAvailable()),
         new Address("127.0.0.1", AvailablePortFinder.getNextAvailable())
     );
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(AtomixRoutePolicyTest.class);
-    private static final Set<Address> RESULTS = new HashSet<>();
-    private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(ADDRESSES.size() * 2);
-    private static final CountDownLatch LATCH = new CountDownLatch(ADDRESSES.size());
+    private final Set<Address> results = new HashSet<>();
+    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(addresses.size() * 2);
+    private final CountDownLatch latch = new CountDownLatch(addresses.size());
 
     // ************************************
     // Test
@@ -55,22 +56,22 @@ public final class AtomixRoutePolicyTest {
 
     @Test
     public void test() throws Exception {
-        for (Address address: ADDRESSES) {
-            SCHEDULER.submit(() -> run(address));
+        for (Address address: addresses) {
+            scheduler.submit(() -> run(address));
         }
 
-        LATCH.await(1, TimeUnit.MINUTES);
-        SCHEDULER.shutdownNow();
+        latch.await(1, TimeUnit.MINUTES);
+        scheduler.shutdownNow();
 
-        Assert.assertEquals(ADDRESSES.size(), RESULTS.size());
-        Assert.assertTrue(RESULTS.containsAll(ADDRESSES));
+        Assert.assertEquals(addresses.size(), results.size());
+        Assert.assertTrue(results.containsAll(addresses));
     }
 
     // ************************************
     // Run a Camel node
     // ************************************
 
-    private static void run(Address address) {
+    private void run(Address address) {
         try {
             CountDownLatch contextLatch = new CountDownLatch(1);
 
@@ -78,7 +79,7 @@ public final class AtomixRoutePolicyTest {
             service.setId("node-" + address.port());
             service.setStorageLevel(StorageLevel.MEMORY);
             service.setAddress(address);
-            service.setNodes(ADDRESSES);
+            service.setNodes(addresses);
 
             DefaultCamelContext context = new DefaultCamelContext();
             context.disableJMX();
@@ -92,10 +93,10 @@ public final class AtomixRoutePolicyTest {
                         .routeId("route-" + address.port())
                         .process(e -> {
                             LOGGER.debug("Node {} done", address);
-                            RESULTS.add(address);
+                            results.add(address);
                             // Shutdown the context later on to give a chance to
                             // other members to catch-up
-                            SCHEDULER.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS);
+                            scheduler.schedule(contextLatch::countDown, 2 + ThreadLocalRandom.current().nextInt(3), TimeUnit.SECONDS);
                         });
                 }
             });
@@ -108,7 +109,7 @@ public final class AtomixRoutePolicyTest {
             contextLatch.await();
             context.stop();
 
-            LATCH.countDown();
+            latch.countDown();
         } catch (Exception e) {
             LOGGER.warn("", e);
         }