You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2021/01/29 10:21:18 UTC

[camel] branch master updated: camel-infinispan: implement route policy using clustering facilities

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new a4f6087  camel-infinispan: implement route policy using clustering facilities
a4f6087 is described below

commit a4f6087b2a48b6dbe10621f45ad975239a374e03
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Jan 27 18:15:52 2021 +0100

    camel-infinispan: implement route policy using clustering facilities
---
 .../camel-infinispan-common/pom.xml                |   4 +
 .../infinispan/InfinispanRoutePolicy.java          | 246 ----------------
 .../camel/component/infinispan/InfinispanUtil.java |  18 ++
 .../cluster/InfinispanClusterConfiguration.java    |  74 +++++
 .../cluster/InfinispanClusterService.java          |  23 ++
 .../infinispan/cluster/InfinispanClusterView.java  | 117 ++++++++
 .../InfinispanRoutePolicyTestSupport.java          |  58 ----
 .../camel-infinispan-embedded/pom.xml              |  14 +-
 .../embedded/InfinispanEmbeddedRoutePolicy.java    | 150 ----------
 .../InfinispanEmbeddedClusterConfiguration.java    |  60 ++++
 .../cluster/InfinispanEmbeddedClusterService.java  | 102 +++++++
 .../cluster/InfinispanEmbeddedClusterView.java     | 289 +++++++++++++++++++
 .../InfinispanEmbeddedRoutePolicyTest.java         |  98 -------
 .../AbstractInfinispanEmbeddedClusteredTest.java   |  73 +++++
 .../InfinispanEmbeddedClusteredMasterTest.java     |  59 ++++
 ...panEmbeddedClusteredRoutePolicyFactoryTest.java |  61 ++++
 ...InfinispanEmbeddedClusteredRoutePolicyTest.java |  61 ++++
 .../InfinispanEmbeddedClusteredTestSupport.java    |  38 +++
 .../InfinispanEmbeddedClusteredViewTest.java       |  72 +++++
 .../src/test/resources/log4j2.properties           |   4 +
 .../camel-infinispan/camel-infinispan/pom.xml      |  14 +-
 .../remote/InfinispanRemoteRoutePolicy.java        | 174 ------------
 .../InfinispanRemoteClusterConfiguration.java      | 130 +++++++++
 .../cluster/InfinispanRemoteClusterService.java    |  98 +++++++
 .../cluster/InfinispanRemoteClusterView.java       | 316 +++++++++++++++++++++
 .../remote/InfinispanRemoteRoutePolicyTest.java    | 124 --------
 .../AbstractInfinispanRemoteClusteredTest.java     |  84 ++++++
 .../InfinispanRemoteClusteredMasterTest.java       |  59 ++++
 ...ispanRemoteClusteredRoutePolicyFactoryTest.java |  61 ++++
 .../InfinispanRemoteClusteredRoutePolicyTest.java  |  61 ++++
 .../InfinispanRemoteClusteredTestSupport.java      |  53 ++++
 .../cluster/InfinispanRemoteClusteredViewTest.java |  81 ++++++
 .../src/test/resources/log4j2.properties           |   2 +
 33 files changed, 2020 insertions(+), 858 deletions(-)

diff --git a/components/camel-infinispan/camel-infinispan-common/pom.xml b/components/camel-infinispan/camel-infinispan-common/pom.xml
index 3989a98..1c857bf 100644
--- a/components/camel-infinispan/camel-infinispan-common/pom.xml
+++ b/components/camel-infinispan/camel-infinispan-common/pom.xml
@@ -42,6 +42,10 @@
             <artifactId>camel-support</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-cluster</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.infinispan</groupId>
             <artifactId>infinispan-core</artifactId>
             <version>${infinispan-version}</version>
diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanRoutePolicy.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanRoutePolicy.java
deleted file mode 100644
index 93f5fe5..0000000
--- a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanRoutePolicy.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.Route;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.Service;
-import org.apache.camel.api.management.ManagedAttribute;
-import org.apache.camel.support.RoutePolicySupport;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ReferenceCount;
-import org.apache.camel.util.StringHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class InfinispanRoutePolicy extends RoutePolicySupport implements CamelContextAware {
-    private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanRoutePolicy.class);
-
-    private final AtomicBoolean leader;
-    private final Set<Route> startedRoutes;
-    private final Set<Route> stoppeddRoutes;
-    private final ReferenceCount refCount;
-
-    private CamelContext camelContext;
-    private boolean shouldStopRoute;
-    private String lockMapName;
-    private String lockKey;
-    private String lockValue;
-    private long lifespan;
-    private TimeUnit lifespanTimeUnit;
-    private Service service;
-
-    protected InfinispanRoutePolicy(String lockKey, String lockValue) {
-        this.stoppeddRoutes = new HashSet<>();
-        this.startedRoutes = new HashSet<>();
-        this.leader = new AtomicBoolean();
-        this.shouldStopRoute = true;
-        this.lockKey = lockKey;
-        this.lockValue = lockValue;
-        this.lifespan = 30;
-        this.lifespanTimeUnit = TimeUnit.SECONDS;
-        this.service = null;
-        this.refCount = ReferenceCount.on(this::startService, this::stopService);
-    }
-
-    @Override
-    public CamelContext getCamelContext() {
-        return camelContext;
-    }
-
-    @Override
-    public void setCamelContext(CamelContext camelContext) {
-        this.camelContext = camelContext;
-    }
-
-    @Override
-    public synchronized void onInit(Route route) {
-        super.onInit(route);
-
-        LOGGER.info("Route managed by {}. Setting route {} AutoStartup flag to false.", getClass(), route.getId());
-        route.setAutoStartup(false);
-
-        stoppeddRoutes.add(route);
-
-        this.refCount.retain();
-
-        startManagedRoutes();
-    }
-
-    @Override
-    public synchronized void doShutdown() {
-        this.refCount.release();
-    }
-
-    protected abstract Service createService();
-
-    // ****************************************
-    // Helpers
-    // ****************************************
-
-    private void startService() {
-        // validate
-        StringHelper.notEmpty(lockMapName, "lockMapName", this);
-        StringHelper.notEmpty(lockKey, "lockKey", this);
-        StringHelper.notEmpty(lockValue, "lockValue", this);
-        ObjectHelper.notNull(camelContext, "camelContext", this);
-
-        try {
-            if (lifespanTimeUnit.convert(lifespan, TimeUnit.SECONDS) < 2) {
-                throw new IllegalArgumentException("Lock lifespan can not be less that 2 seconds");
-            }
-
-            this.service = createService();
-            this.service.start();
-        } catch (Exception e) {
-            throw new RuntimeCamelException(e);
-        }
-    }
-
-    private void stopService() {
-        leader.set(false);
-
-        try {
-            if (this.service != null) {
-                this.service.stop();
-            }
-        } catch (Exception e) {
-            throw new RuntimeCamelException(e);
-        }
-    }
-
-    protected void setLeader(boolean isLeader) {
-        if (isLeader && leader.compareAndSet(false, isLeader)) {
-            LOGGER.info("Leadership taken (map={}, key={}, val={})", lockMapName, lockKey, lockValue);
-            startManagedRoutes();
-        } else if (!isLeader && leader.getAndSet(isLeader)) {
-            LOGGER.info("Leadership lost (map={}, key={} val={})", lockMapName, lockKey, lockValue);
-            stopManagedRoutes();
-        }
-    }
-
-    private synchronized void startManagedRoutes() {
-        if (!isLeader()) {
-            return;
-        }
-
-        try {
-            for (Route route : stoppeddRoutes) {
-                LOGGER.debug("Starting route {}", route.getId());
-                startRoute(route);
-                startedRoutes.add(route);
-            }
-
-            stoppeddRoutes.removeAll(startedRoutes);
-        } catch (Exception e) {
-            handleException(e);
-        }
-    }
-
-    private synchronized void stopManagedRoutes() {
-        if (isLeader()) {
-            return;
-        }
-
-        try {
-            for (Route route : startedRoutes) {
-                LOGGER.debug("Stopping route {}", route.getId());
-                stopRoute(route);
-                stoppeddRoutes.add(route);
-            }
-
-            startedRoutes.removeAll(stoppeddRoutes);
-        } catch (Exception e) {
-            handleException(e);
-        }
-    }
-
-    // *************************************************************************
-    // Getter/Setters
-    // *************************************************************************
-
-    @ManagedAttribute(description = "Whether to stop route when starting up and failed to become master")
-    public boolean isShouldStopRoute() {
-        return shouldStopRoute;
-    }
-
-    public void setShouldStopRoute(boolean shouldStopRoute) {
-        this.shouldStopRoute = shouldStopRoute;
-    }
-
-    @ManagedAttribute(description = "The lock map name")
-    public String getLockMapName() {
-        return lockMapName;
-    }
-
-    public void setLockMapName(String lockMapName) {
-        this.lockMapName = lockMapName;
-    }
-
-    @ManagedAttribute(description = "The lock key")
-    public String getLockKey() {
-        return lockKey;
-    }
-
-    public void setLockKey(String lockKey) {
-        this.lockKey = lockKey;
-    }
-
-    @ManagedAttribute(description = "The lock value")
-    public String getLockValue() {
-        return lockValue;
-    }
-
-    public void setLockValue(String lockValue) {
-        this.lockValue = lockValue;
-    }
-
-    @ManagedAttribute(description = "The key lifespan for the lock")
-    public long getLifespan() {
-        return lifespan;
-    }
-
-    public void setLifespan(long lifespan) {
-        this.lifespan = lifespan;
-    }
-
-    public void setLifespan(long lifespan, TimeUnit lifespanTimeUnit) {
-        this.lifespan = lifespan;
-        this.lifespanTimeUnit = lifespanTimeUnit;
-    }
-
-    @ManagedAttribute(description = "The key lifespan time unit for the lock")
-    public TimeUnit getLifespanTimeUnit() {
-        return lifespanTimeUnit;
-    }
-
-    public void setLifespanTimeUnit(TimeUnit lifespanTimeUnit) {
-        this.lifespanTimeUnit = lifespanTimeUnit;
-    }
-
-    @ManagedAttribute(description = "Is this route the master or a slave")
-    public boolean isLeader() {
-        return leader.get();
-    }
-}
diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
index f9f0ebc..57edd34 100644
--- a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
+++ b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
@@ -23,6 +23,7 @@ import java.util.Properties;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.support.ResourceHelper;
@@ -66,4 +67,21 @@ public class InfinispanUtil {
                 source,
                 source.getClass().getSimpleName());
     }
+
+    public static ScheduledExecutorService newSingleThreadScheduledExecutor(
+            CamelContextAware camelContextAware, Object source) {
+        return newSingleThreadScheduledExecutor(camelContextAware.getCamelContext(), source);
+    }
+
+    public static ScheduledExecutorService newSingleThreadScheduledExecutor(
+            CamelContext camelContext, Object source, String id) {
+        return camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(
+                source,
+                source.getClass().getSimpleName() + "-" + id);
+    }
+
+    public static ScheduledExecutorService newSingleThreadScheduledExecutor(
+            CamelContextAware camelContextAware, Object source, String id) {
+        return newSingleThreadScheduledExecutor(camelContextAware.getCamelContext(), source, id);
+    }
 }
diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterConfiguration.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterConfiguration.java
new file mode 100644
index 0000000..617a95a
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterConfiguration.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.cluster;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.component.infinispan.InfinispanConfiguration;
+
+public abstract class InfinispanClusterConfiguration<C extends InfinispanConfiguration> implements Cloneable {
+    private final C configuration;
+    private long lifespan;
+    private TimeUnit lifespanTimeUnit;
+
+    protected InfinispanClusterConfiguration(C configuration) {
+        this.configuration = configuration;
+        this.lifespan = 30;
+        this.lifespanTimeUnit = TimeUnit.SECONDS;
+    }
+
+    // ***********************************************
+    // Properties
+    // ***********************************************
+
+    public long getLifespan() {
+        return lifespan;
+    }
+
+    public void setLifespan(long lifespan) {
+        this.lifespan = lifespan;
+    }
+
+    public TimeUnit getLifespanTimeUnit() {
+        return lifespanTimeUnit;
+    }
+
+    public void setLifespanTimeUnit(TimeUnit lifespanTimeUnit) {
+        this.lifespanTimeUnit = lifespanTimeUnit;
+    }
+
+    public void setConfigurationUri(String configurationUri) {
+        configuration.setConfigurationUri(configurationUri);
+    }
+
+    public C getConfiguration() {
+        return configuration;
+    }
+
+    // ***********************************************
+    //
+    // ***********************************************
+
+    @Override
+    public InfinispanClusterConfiguration clone() {
+        try {
+            return (InfinispanClusterConfiguration) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterService.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterService.java
new file mode 100644
index 0000000..a6def76
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterService.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.cluster;
+
+import org.apache.camel.support.cluster.AbstractCamelClusterService;
+
+public abstract class InfinispanClusterService extends AbstractCamelClusterService<InfinispanClusterView> {
+    public static final String LEADER_KEY = "__camel_leader";
+}
diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterView.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterView.java
new file mode 100644
index 0000000..3a4b880
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterView.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.cluster;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.cluster.CamelClusterMember;
+import org.apache.camel.cluster.CamelClusterService;
+import org.apache.camel.support.cluster.AbstractCamelClusterView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class InfinispanClusterView extends AbstractCamelClusterView {
+    private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanClusterView.class);
+
+    protected InfinispanClusterView(CamelClusterService cluster, String namespace) {
+        super(cluster, namespace);
+    }
+
+    protected abstract boolean isLeader(String id);
+
+    // ***********************************************
+    //
+    // ***********************************************
+
+    protected final class LocalMember implements CamelClusterMember {
+        private final AtomicBoolean leader = new AtomicBoolean();
+        private final String id;
+
+        public LocalMember(String id) {
+            this.id = id;
+        }
+
+        public void setLeader(boolean master) {
+            if (master && this.leader.compareAndSet(false, true)) {
+                LOGGER.debug("Leadership taken for id: {}", id);
+
+                fireLeadershipChangedEvent(Optional.of(this));
+                return;
+            }
+            if (!master && this.leader.compareAndSet(true, false)) {
+                LOGGER.debug("Leadership lost for id: {}", id);
+
+                fireLeadershipChangedEvent(getLeader());
+                return;
+            }
+        }
+
+        @Override
+        public boolean isLeader() {
+            return leader.get();
+        }
+
+        @Override
+        public boolean isLocal() {
+            return true;
+        }
+
+        @Override
+        public String getId() {
+            return this.id;
+        }
+
+        @Override
+        public String toString() {
+            return "LocalMember{" + "leader=" + leader + '}';
+        }
+    }
+
+    protected final class ClusterMember implements CamelClusterMember {
+        private final String id;
+
+        public ClusterMember(String id) {
+            this.id = id;
+        }
+
+        @Override
+        public String getId() {
+            return id;
+        }
+
+        @Override
+        public boolean isLeader() {
+            return InfinispanClusterView.this.isLeader(id);
+        }
+
+        @Override
+        public boolean isLocal() {
+            if (id == null) {
+                return false;
+            }
+
+            return Objects.equals(id, getLocalMember().getId());
+        }
+
+        @Override
+        public String toString() {
+            return "ClusterMember{" + "id='" + id + '\'' + '}';
+        }
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan-common/src/test/java/org/apache/camel/component/infinispan/InfinispanRoutePolicyTestSupport.java b/components/camel-infinispan/camel-infinispan-common/src/test/java/org/apache/camel/component/infinispan/InfinispanRoutePolicyTestSupport.java
deleted file mode 100644
index ba5b6df..0000000
--- a/components/camel-infinispan/camel-infinispan-common/src/test/java/org/apache/camel/component/infinispan/InfinispanRoutePolicyTestSupport.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.junit.jupiter.api.Test;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public interface InfinispanRoutePolicyTestSupport {
-    InfinispanRoutePolicy createRoutePolicy(String lockValue);
-
-    @Test
-    default void testLeadership() throws Exception {
-        final InfinispanRoutePolicy policy1 = createRoutePolicy("route1");
-        final InfinispanRoutePolicy policy2 = createRoutePolicy("route2");
-
-        try (CamelContext context = new DefaultCamelContext()) {
-            context.start();
-
-            RouteBuilder.addRoutes(context, b -> b.from("direct:r1").routePolicy(policy1).to("mock:p1"));
-
-            await().atMost(10, TimeUnit.SECONDS).until(policy1::isLeader);
-
-            RouteBuilder.addRoutes(context, b -> b.from("direct:r2").routePolicy(policy2).to("mock:p2"));
-
-            assertTrue(policy1.isLeader());
-            assertFalse(policy2.isLeader());
-
-            policy1.shutdown();
-
-            await().atMost(10, TimeUnit.SECONDS).until(policy2::isLeader);
-
-            assertFalse(policy1.isLeader());
-            assertTrue(policy2.isLeader());
-        }
-    }
-}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/pom.xml b/components/camel-infinispan/camel-infinispan-embedded/pom.xml
index 34ed98f..b8a7bc4 100644
--- a/components/camel-infinispan/camel-infinispan-embedded/pom.xml
+++ b/components/camel-infinispan/camel-infinispan-embedded/pom.xml
@@ -35,10 +35,6 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-support</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.camel</groupId>
             <artifactId>camel-infinispan-common</artifactId>
         </dependency>
         <dependency>
@@ -65,6 +61,11 @@
             <artifactId>camel-test-spring-junit5</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-master</artifactId>
+            <scope>test</scope>
+        </dependency>
 
         <!-- testing - infinispan -->
         <dependency>
@@ -114,6 +115,11 @@
             <artifactId>awaitility</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicy.java b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicy.java
deleted file mode 100644
index 615a77c..0000000
--- a/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicy.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan.embedded;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-
-import org.apache.camel.Service;
-import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.component.infinispan.InfinispanRoutePolicy;
-import org.apache.camel.support.service.ServiceSupport;
-import org.apache.camel.util.ObjectHelper;
-import org.infinispan.Cache;
-import org.infinispan.notifications.Listener;
-import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
-import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
-import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@ManagedResource(description = "Route policy using Infinispan Embedded as clustered lock")
-public class InfinispanEmbeddedRoutePolicy extends InfinispanRoutePolicy {
-    private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanEmbeddedRoutePolicy.class);
-
-    private final InfinispanEmbeddedManager manager;
-
-    public InfinispanEmbeddedRoutePolicy(InfinispanEmbeddedConfiguration configuration) {
-        this(configuration, null, null);
-    }
-
-    public InfinispanEmbeddedRoutePolicy(InfinispanEmbeddedConfiguration configuration, String lockKey, String lockValue) {
-        super(lockKey, lockValue);
-
-        this.manager = new InfinispanEmbeddedManager(configuration);
-    }
-
-    @Override
-    protected Service createService() {
-        return new EmbeddedCacheService();
-    }
-
-    @Override
-    public void doStart() throws Exception {
-        super.doStart();
-        this.manager.start();
-    }
-
-    @Override
-    public void doStop() throws Exception {
-        super.doStop();
-        this.manager.stop();
-    }
-
-    // *************************************************************************
-    //
-    // *************************************************************************
-
-    @Listener(clustered = true, sync = false)
-    private final class EmbeddedCacheService extends ServiceSupport {
-        private Cache<String, String> cache;
-        private ScheduledExecutorService executorService;
-        private ScheduledFuture<?> future;
-
-        EmbeddedCacheService() {
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        protected void doStart() throws Exception {
-            this.executorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
-                    getClass().getSimpleName());
-            this.cache = manager.getCache(getLockMapName(), Cache.class);
-            this.cache.addListener(this);
-            this.future = executorService.scheduleAtFixedRate(this::run, 0, getLifespan() / 2, getLifespanTimeUnit());
-        }
-
-        @Override
-        protected void doStop() throws Exception {
-            if (cache != null) {
-                this.cache.removeListener(this);
-                this.cache.remove(getLockKey(), getLockValue());
-            }
-
-            if (future != null) {
-                future.cancel(true);
-                future = null;
-            }
-
-            getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
-        }
-
-        private void run() {
-            if (!isRunAllowed()) {
-                return;
-            }
-
-            if (isLeader()) {
-                // I'm still the leader, so refresh the key so it does not expire.
-                if (!cache.replace(getLockKey(), getLockValue(), getLockValue(), getLifespan(), getLifespanTimeUnit())) {
-                    // Looks like I've lost the leadership.
-                    setLeader(false);
-                }
-            }
-
-            if (!isLeader()) {
-                Object result = cache.putIfAbsent(getLockKey(), getLockValue(), getLifespan(), getLifespanTimeUnit());
-                if (result == null) {
-                    // Acquired the key so I'm the leader.
-                    setLeader(true);
-                } else if (ObjectHelper.equal(getLockValue(), result) && !isLeader()) {
-                    // Hey, I may have recovered from failure (or reboot was really
-                    // fast) and my key was still there so yeah, I'm the leader again!
-                    setLeader(true);
-                } else {
-                    setLeader(false);
-                }
-            }
-        }
-
-        @CacheEntryRemoved
-        public void onCacheEntryRemoved(CacheEntryEvent<Object, Object> event) {
-            LOGGER.debug("onCacheEntryExpired lock-key={}, event-key={}", getLockKey(), event.getKey());
-            if (ObjectHelper.equal(getLockKey(), event.getKey())) {
-                run();
-            }
-        }
-
-        @CacheEntryExpired
-        public void onCacheEntryExpired(CacheEntryEvent<Object, Object> event) {
-            LOGGER.debug("onCacheEntryExpired lock-key={}, event-key={}", getLockKey(), event.getKey());
-            if (ObjectHelper.equal(getLockKey(), event.getKey())) {
-                run();
-            }
-        }
-    }
-}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterConfiguration.java b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterConfiguration.java
new file mode 100644
index 0000000..45a698c
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterConfiguration.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterConfiguration;
+import org.apache.camel.component.infinispan.embedded.InfinispanEmbeddedConfiguration;
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.manager.EmbeddedCacheManager;
+
+public class InfinispanEmbeddedClusterConfiguration
+        extends InfinispanClusterConfiguration<InfinispanEmbeddedConfiguration>
+        implements Cloneable {
+
+    public InfinispanEmbeddedClusterConfiguration() {
+        super(new InfinispanEmbeddedConfiguration());
+    }
+
+    // ***********************************************
+    // Properties
+    // ***********************************************
+
+    public EmbeddedCacheManager getCacheContainer() {
+        return getConfiguration().getCacheContainer();
+    }
+
+    public void setCacheContainer(EmbeddedCacheManager cacheContainer) {
+        getConfiguration().setCacheContainer(cacheContainer);
+    }
+
+    public Configuration getCacheContainerConfiguration() {
+        return getConfiguration().getCacheContainerConfiguration();
+    }
+
+    public void setCacheContainerConfiguration(Configuration cacheContainerConfiguration) {
+        getConfiguration().setCacheContainerConfiguration(cacheContainerConfiguration);
+    }
+
+    // ***********************************************
+    //
+    // ***********************************************
+
+    @Override
+    public InfinispanEmbeddedClusterConfiguration clone() {
+        return (InfinispanEmbeddedClusterConfiguration) super.clone();
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterService.java b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterService.java
new file mode 100644
index 0000000..d471d20
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterService.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterService;
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterView;
+import org.apache.camel.util.ObjectHelper;
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InfinispanEmbeddedClusterService extends InfinispanClusterService {
+    private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanEmbeddedClusterService.class);
+
+    private InfinispanEmbeddedClusterConfiguration configuration;
+
+    public InfinispanEmbeddedClusterService() {
+        this.configuration = new InfinispanEmbeddedClusterConfiguration();
+    }
+
+    public InfinispanEmbeddedClusterService(InfinispanEmbeddedClusterConfiguration configuration) {
+        this.configuration = configuration.clone();
+    }
+
+    // *********************************************
+    // Properties
+    // *********************************************
+
+    public InfinispanEmbeddedClusterConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(InfinispanEmbeddedClusterConfiguration configuration) {
+        this.configuration = configuration.clone();
+    }
+
+    public void setConfigurationUri(String configurationUri) {
+        configuration.setConfigurationUri(configurationUri);
+    }
+
+    public EmbeddedCacheManager getCacheContainer() {
+        return configuration.getCacheContainer();
+    }
+
+    public void setCacheContainer(EmbeddedCacheManager cacheContainer) {
+        configuration.setCacheContainer(cacheContainer);
+    }
+
+    public Configuration getCacheContainerConfiguration() {
+        return configuration.getCacheContainerConfiguration();
+    }
+
+    public void setCacheContainerConfiguration(Configuration cacheContainerConfiguration) {
+        configuration.setCacheContainerConfiguration(cacheContainerConfiguration);
+    }
+
+    public long getLifespan() {
+        return configuration.getLifespan();
+    }
+
+    public void setLifespan(long lifespan) {
+        configuration.setLifespan(lifespan);
+    }
+
+    public TimeUnit getLifespanTimeUnit() {
+        return configuration.getLifespanTimeUnit();
+    }
+
+    public void setLifespanTimeUnit(TimeUnit lifespanTimeUnit) {
+        configuration.setLifespanTimeUnit(lifespanTimeUnit);
+    }
+
+    // *********************************************
+    // Impl
+    // *********************************************
+
+    @Override
+    protected InfinispanClusterView createView(String namespace) throws Exception {
+        // Validate parameters
+        ObjectHelper.notNull(getCamelContext(), "Camel Context");
+        ObjectHelper.notNull(getId(), "Cluster ID");
+
+        return new InfinispanEmbeddedClusterView(this, configuration, namespace);
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterView.java b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterView.java
new file mode 100644
index 0000000..2d09cf9
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterView.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import org.apache.camel.cluster.CamelClusterMember;
+import org.apache.camel.component.infinispan.InfinispanUtil;
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterService;
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterView;
+import org.apache.camel.component.infinispan.embedded.InfinispanEmbeddedManager;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.support.service.ServiceSupport;
+import org.infinispan.Cache;
+import org.infinispan.notifications.Listener;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
+import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.util.function.Predicates.negate;
+
+public class InfinispanEmbeddedClusterView extends InfinispanClusterView {
+    private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanEmbeddedClusterService.class);
+
+    private final InfinispanEmbeddedClusterConfiguration configuration;
+    private final InfinispanEmbeddedManager manager;
+    private final LocalMember localMember;
+    private final LeadershipService leadership;
+
+    private Cache<String, String> cache;
+
+    protected InfinispanEmbeddedClusterView(
+                                            InfinispanEmbeddedClusterService cluster,
+                                            InfinispanEmbeddedClusterConfiguration configuration,
+                                            String namespace) {
+        super(cluster, namespace);
+
+        this.configuration = configuration;
+        this.manager = new InfinispanEmbeddedManager(this.configuration.getConfiguration());
+        this.leadership = new LeadershipService();
+        this.localMember = new LocalMember(cluster.getId());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void doStart() throws Exception {
+        super.doStart();
+
+        ServiceHelper.startService(manager);
+
+        this.cache = manager.getCache(getNamespace(), Cache.class);
+
+        ServiceHelper.startService(leadership);
+    }
+
+    @Override
+    public void doStop() throws Exception {
+        super.doStop();
+
+        ServiceHelper.stopService(leadership);
+        ServiceHelper.stopService(manager);
+
+        this.cache = null;
+    }
+
+    @Override
+    public CamelClusterMember getLocalMember() {
+        return this.localMember;
+    }
+
+    @Override
+    public List<CamelClusterMember> getMembers() {
+        return this.cache != null
+                ? cache.keySet().stream()
+                        .filter(negate(InfinispanClusterService.LEADER_KEY::equals))
+                        .map(ClusterMember::new)
+                        .collect(Collectors.toList())
+                : Collections.emptyList();
+    }
+
+    @Override
+    public Optional<CamelClusterMember> getLeader() {
+        if (this.cache == null) {
+            return Optional.empty();
+        }
+
+        String id = cache.get(InfinispanClusterService.LEADER_KEY);
+        if (id == null) {
+            return Optional.empty();
+        }
+
+        return Optional.of(new ClusterMember(id));
+    }
+
+    @Override
+    protected boolean isLeader(String id) {
+        if (this.cache == null) {
+            return false;
+        }
+        if (id == null) {
+            return false;
+        }
+
+        final String key = InfinispanClusterService.LEADER_KEY;
+        final String val = this.cache.get(key);
+
+        return Objects.equals(id, val);
+    }
+
+    // *****************************************
+    //
+    // Service
+    //
+    // *****************************************
+
+    @Listener(clustered = true, sync = false)
+    private final class LeadershipService extends ServiceSupport {
+        private final AtomicBoolean running;
+        private ScheduledExecutorService executorService;
+
+        LeadershipService() {
+            this.running = new AtomicBoolean(false);
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+            super.doStart();
+
+            this.running.set(true);
+            this.executorService = InfinispanUtil.newSingleThreadScheduledExecutor(
+                    getCamelContext(),
+                    this,
+                    getLocalMember().getId());
+
+            // register the local member to the inventory
+            cache.put(
+                    getLocalMember().getId(),
+                    "false",
+                    configuration.getLifespan(),
+                    configuration.getLifespanTimeUnit());
+
+            cache.addListener(this);
+
+            executorService.scheduleAtFixedRate(
+                    this::run,
+                    0,
+                    configuration.getLifespan() / 2,
+                    configuration.getLifespanTimeUnit());
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+            super.doStop();
+
+            this.running.set(false);
+
+            if (cache != null) {
+                cache.removeListener(this);
+            }
+
+            getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
+
+            if (cache != null) {
+                cache.remove(InfinispanClusterService.LEADER_KEY, getClusterService().getId());
+
+                LOGGER.info("Removing local member, key={}", getLocalMember().getId());
+                cache.remove(getLocalMember().getId());
+            }
+        }
+
+        private boolean isLeader() {
+            return getLocalMember().isLeader();
+        }
+
+        private void setLeader(boolean leader) {
+            ((LocalMember) getLocalMember()).setLeader(leader);
+        }
+
+        private synchronized void run() {
+            if (!running.get()) {
+                return;
+            }
+
+            final String leaderKey = InfinispanClusterService.LEADER_KEY;
+            final String localId = getLocalMember().getId();
+
+            if (isLeader()) {
+                LOGGER.debug("Lock refresh key={}, id{}", leaderKey, localId);
+
+                // I'm still the leader, so refresh the key so it does not expire.
+                if (!cache.replace(
+                        InfinispanClusterService.LEADER_KEY,
+                        getClusterService().getId(),
+                        getClusterService().getId(),
+                        configuration.getLifespan(),
+                        configuration.getLifespanTimeUnit())) {
+
+                    LOGGER.debug("Failed to refresh the lock key={}, id={}", leaderKey, localId);
+
+                    // Looks like I've lost the leadership.
+                    setLeader(false);
+                }
+            }
+            if (!isLeader()) {
+                LOGGER.debug("Try to acquire lock key={}, id={}", leaderKey, localId);
+
+                Object result = cache.putIfAbsent(
+                        InfinispanClusterService.LEADER_KEY,
+                        getClusterService().getId(),
+                        configuration.getLifespan(),
+                        configuration.getLifespanTimeUnit());
+                if (result == null) {
+                    LOGGER.debug("Lock acquired key={}, id={}", leaderKey, localId);
+                    // Acquired the key so I'm the leader.
+                    setLeader(true);
+                } else if (Objects.equals(getClusterService().getId(), result) && !isLeader()) {
+                    LOGGER.debug("Lock resumed key={}, id={}", leaderKey, localId);
+                    // Hey, I may have recovered from failure (or reboot was really
+                    // fast) and my key was still there so yeah, I'm the leader again!
+                    setLeader(true);
+                } else {
+                    LOGGER.debug("Failed to acquire the lock key={}, id={}", leaderKey, localId);
+                    setLeader(false);
+                }
+            }
+
+            // refresh local membership
+            cache.put(
+                    getLocalMember().getId(),
+                    isLeader() ? "true" : "false",
+                    configuration.getLifespan(),
+                    configuration.getLifespanTimeUnit());
+        }
+
+        @CacheEntryRemoved
+        public void onCacheEntryRemoved(CacheEntryEvent<Object, Object> event) {
+            if (!running.get()) {
+                return;
+            }
+
+            LOGGER.debug("onCacheEntryRemoved id={}, lock-key={}, event-key={}",
+                    getLocalMember().getId(),
+                    InfinispanClusterService.LEADER_KEY,
+                    event.getKey());
+
+            if (Objects.equals(InfinispanClusterService.LEADER_KEY, event.getKey())) {
+                executorService.execute(this::run);
+            }
+        }
+
+        @CacheEntryExpired
+        public void onCacheEntryExpired(CacheEntryEvent<Object, Object> event) {
+            if (!running.get()) {
+                return;
+            }
+
+            LOGGER.debug("onCacheEntryExpired id={}, lock-key={}, event-key={}",
+                    getLocalMember().getId(),
+                    InfinispanClusterService.LEADER_KEY,
+                    event.getKey());
+
+            if (Objects.equals(InfinispanClusterService.LEADER_KEY, event.getKey())) {
+                executorService.execute(this::run);
+            }
+        }
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicyTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicyTest.java
deleted file mode 100644
index 9c02b7a..0000000
--- a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicyTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan.embedded;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.infinispan.InfinispanRoutePolicy;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.infinispan.commons.api.CacheContainerAdmin;
-import org.infinispan.configuration.cache.CacheMode;
-import org.infinispan.configuration.cache.ConfigurationBuilder;
-import org.infinispan.manager.DefaultCacheManager;
-import org.infinispan.manager.EmbeddedCacheManager;
-import org.junit.jupiter.api.Test;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class InfinispanEmbeddedRoutePolicyTest {
-    public static final String CACHE_NAME = "_route_policy";
-
-    @Test
-    public void testLeadership() throws Exception {
-        try (EmbeddedCacheManager cacheContainer = createCacheContainer()) {
-
-            cacheContainer.start();
-
-            final InfinispanRoutePolicy policy1 = createRoutePolicy(cacheContainer, "route1");
-            final InfinispanRoutePolicy policy2 = createRoutePolicy(cacheContainer, "route2");
-
-            try (CamelContext context = new DefaultCamelContext()) {
-                context.start();
-
-                RouteBuilder.addRoutes(context, b -> b.from("direct:r1").routePolicy(policy1).to("mock:p1"));
-
-                await().atMost(10, TimeUnit.SECONDS).until(policy1::isLeader);
-
-                RouteBuilder.addRoutes(context, b -> b.from("direct:r2").routePolicy(policy2).to("mock:p2"));
-
-                assertTrue(policy1.isLeader());
-                assertFalse(policy2.isLeader());
-
-                policy1.shutdown();
-
-                await().atMost(10, TimeUnit.SECONDS).until(policy2::isLeader);
-
-                assertFalse(policy1.isLeader());
-                assertTrue(policy2.isLeader());
-            }
-        }
-    }
-
-    // *****************************
-    //
-    // *****************************
-
-    private static EmbeddedCacheManager createCacheContainer() {
-        DefaultCacheManager cacheContainer = new DefaultCacheManager();
-        cacheContainer.administration()
-                .withFlags(CacheContainerAdmin.AdminFlag.VOLATILE)
-                .getOrCreateCache(
-                        CACHE_NAME,
-                        new ConfigurationBuilder()
-                                .clustering().cacheMode(CacheMode.LOCAL)
-                                .build());
-
-        return cacheContainer;
-    }
-
-    private static InfinispanRoutePolicy createRoutePolicy(EmbeddedCacheManager cacheContainer, String lockValue) {
-        InfinispanEmbeddedConfiguration configuration = new InfinispanEmbeddedConfiguration();
-        configuration.setCacheContainer(cacheContainer);
-
-        InfinispanEmbeddedRoutePolicy policy = new InfinispanEmbeddedRoutePolicy(configuration);
-        policy.setLockMapName(CACHE_NAME);
-        policy.setLockKey("lock-key");
-        policy.setLockValue(lockValue);
-
-        return policy;
-    }
-}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/AbstractInfinispanEmbeddedClusteredTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/AbstractInfinispanEmbeddedClusteredTest.java
new file mode 100644
index 0000000..4b1aaff
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/AbstractInfinispanEmbeddedClusteredTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.infinispan.manager.DefaultCacheManager;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+abstract class AbstractInfinispanEmbeddedClusteredTest {
+    @Timeout(value = 1, unit = TimeUnit.MINUTES)
+    @Test
+    public void test() throws Exception {
+        final Logger logger = LoggerFactory.getLogger(getClass());
+        final List<String> clients = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList());
+        final List<String> results = new ArrayList<>();
+        final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(clients.size() * 2);
+        final CountDownLatch latch = new CountDownLatch(clients.size());
+        final String viewName = "myView";
+
+        try (DefaultCacheManager cacheContainer = new DefaultCacheManager()) {
+            InfinispanEmbeddedClusteredTestSupport.createCache(cacheContainer, viewName);
+
+            for (String id : clients) {
+                scheduler.submit(() -> {
+                    try {
+                        run(cacheContainer, viewName, id);
+                        logger.debug("Node {} is shutting down", id);
+                        results.add(id);
+                    } catch (Exception e) {
+                        logger.warn("", e);
+                    } finally {
+                        latch.countDown();
+                    }
+                });
+            }
+
+            latch.await();
+            scheduler.shutdownNow();
+
+            assertThat(results).hasSameSizeAs(clients);
+            assertThat(results).containsAll(clients);
+        }
+    }
+
+    protected abstract void run(DefaultCacheManager cacheContainer, String namespace, String id) throws Exception;
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredMasterTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredMasterTest.java
new file mode 100644
index 0000000..37372d4
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredMasterTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.infinispan.manager.DefaultCacheManager;
+
+public class InfinispanEmbeddedClusteredMasterTest extends AbstractInfinispanEmbeddedClusteredTest {
+    @Override
+    protected void run(DefaultCacheManager cacheContainer, String namespace, String id) throws Exception {
+        int events = ThreadLocalRandom.current().nextInt(2, 6);
+        CountDownLatch contextLatch = new CountDownLatch(events);
+
+        //Set up a single node cluster.
+        InfinispanEmbeddedClusterService clusterService = new InfinispanEmbeddedClusterService();
+        clusterService.setCacheContainer(cacheContainer);
+        clusterService.setId("node-" + id);
+
+        try (DefaultCamelContext context = new DefaultCamelContext()) {
+            context.disableJMX();
+            context.setName("context-" + id);
+            context.addService(clusterService);
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    fromF("master:%s:timer:%s?delay=1000&period=1000", namespace, id)
+                            .routeId("route-" + id)
+                            .log("From ${routeId}")
+                            .process(e -> contextLatch.countDown());
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+        }
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyFactoryTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyFactoryTest.java
new file mode 100644
index 0000000..08d611d
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyFactoryTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.cluster.ClusteredRoutePolicyFactory;
+import org.infinispan.manager.DefaultCacheManager;
+
+public class InfinispanEmbeddedClusteredRoutePolicyFactoryTest extends AbstractInfinispanEmbeddedClusteredTest {
+    @Override
+    protected void run(DefaultCacheManager cacheContainer, String namespace, String id) throws Exception {
+        int events = ThreadLocalRandom.current().nextInt(2, 6);
+        CountDownLatch contextLatch = new CountDownLatch(events);
+
+        //Set up a single node cluster.
+        InfinispanEmbeddedClusterService clusterService = new InfinispanEmbeddedClusterService();
+        clusterService.setCacheContainer(cacheContainer);
+        clusterService.setId("node-" + id);
+
+        try (DefaultCamelContext context = new DefaultCamelContext()) {
+            context.disableJMX();
+            context.setName("context-" + id);
+            context.addService(clusterService);
+            context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace(namespace));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    fromF("timer:%s?delay=1000&period=1000", id)
+                            .routeId("route-" + id)
+                            .log("From ${routeId}")
+                            .process(e -> contextLatch.countDown());
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+        }
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyTest.java
new file mode 100644
index 0000000..010c0ce
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.cluster.ClusteredRoutePolicy;
+import org.infinispan.manager.DefaultCacheManager;
+
+public class InfinispanEmbeddedClusteredRoutePolicyTest extends AbstractInfinispanEmbeddedClusteredTest {
+    @Override
+    protected void run(DefaultCacheManager cacheContainer, String namespace, String id) throws Exception {
+        int events = ThreadLocalRandom.current().nextInt(2, 6);
+        CountDownLatch contextLatch = new CountDownLatch(events);
+
+        //Set up a single node cluster.
+        InfinispanEmbeddedClusterService clusterService = new InfinispanEmbeddedClusterService();
+        clusterService.setCacheContainer(cacheContainer);
+        clusterService.setId("node-" + id);
+
+        try (DefaultCamelContext context = new DefaultCamelContext()) {
+            context.disableJMX();
+            context.setName("context-" + id);
+            context.addService(clusterService);
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    fromF("timer:%s?delay=1000&period=1000", id)
+                            .routeId("route-" + id)
+                            .routePolicy(ClusteredRoutePolicy.forNamespace(namespace))
+                            .log("From ${routeId}")
+                            .process(e -> contextLatch.countDown());
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+        }
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredTestSupport.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredTestSupport.java
new file mode 100644
index 0000000..bc8dbde
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredTestSupport.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import org.infinispan.commons.api.CacheContainerAdmin;
+import org.infinispan.configuration.cache.CacheMode;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.manager.DefaultCacheManager;
+
+public final class InfinispanEmbeddedClusteredTestSupport {
+    private InfinispanEmbeddedClusteredTestSupport() {
+    }
+
+    public static void createCache(DefaultCacheManager cacheContainer, String cacheName) {
+        cacheContainer.administration()
+                .withFlags(CacheContainerAdmin.AdminFlag.VOLATILE)
+                .getOrCreateCache(
+                        cacheName,
+                        new ConfigurationBuilder()
+                                .clustering()
+                                .cacheMode(CacheMode.LOCAL)
+                                .build());
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredViewTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredViewTest.java
new file mode 100644
index 0000000..d15efc6
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredViewTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.cluster.CamelClusterMember;
+import org.apache.camel.cluster.CamelClusterView;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.infinispan.manager.DefaultCacheManager;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.component.infinispan.embedded.cluster.InfinispanEmbeddedClusteredTestSupport.createCache;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class InfinispanEmbeddedClusteredViewTest {
+    @Test
+    public void getLeaderTest() throws Exception {
+        final String viewName = "myView";
+
+        try (DefaultCacheManager cacheContainer = new DefaultCacheManager()) {
+            createCache(cacheContainer, viewName);
+
+            //Set up a single node cluster.
+            InfinispanEmbeddedClusterService clusterService = new InfinispanEmbeddedClusterService();
+            clusterService.setCacheContainer(cacheContainer);
+            clusterService.setId("node");
+
+            //Set up context with single locked route.
+            try (DefaultCamelContext context = new DefaultCamelContext()) {
+                context.disableJMX();
+                context.addService(clusterService);
+
+                context.addRoutes(new RouteBuilder() {
+                    @Override
+                    public void configure() {
+                        fromF("master:%s:timer:infinispan?repeatCount=1", viewName)
+                                .routeId("route1")
+                                .stop();
+                    }
+                });
+
+                context.start();
+
+                CamelClusterView view = clusterService.getView(viewName);
+
+                await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+                    assertThat(view.getLeader())
+                            .get()
+                            .satisfies(CamelClusterMember::isLeader)
+                            .satisfies(CamelClusterMember::isLocal);
+                });
+            }
+        }
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/resources/log4j2.properties b/components/camel-infinispan/camel-infinispan-embedded/src/test/resources/log4j2.properties
index 1301f96..b740e62 100644
--- a/components/camel-infinispan/camel-infinispan-embedded/src/test/resources/log4j2.properties
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/resources/log4j2.properties
@@ -35,6 +35,10 @@ logger.infinispan-camel.level = INFO
 logger.infinispan-camel-remote.name = org.apache.camel.component.infinispan.remote
 logger.infinispan-camel-remote.level = INFO
 logger.infinispan-camel-embedded.name = org.apache.camel.component.infinispan.embedded
+logger.infinispan-camel-embedded.level = INFO
+logger.infinispan-camel-remote-cluster.name = org.apache.camel.component.infinispan.embedded.cluster
+logger.infinispan-camel-remote-cluster.level = DEBUG
+
 logger.infinispan-test-infra-container.name = container.infinispan
 logger.infinispan-test-infra-container.level = INFO
 logger.infinispan-camel-test.name = org.apache.camel.test.junit5
diff --git a/components/camel-infinispan/camel-infinispan/pom.xml b/components/camel-infinispan/camel-infinispan/pom.xml
index 7db9c45..4b7b32b 100644
--- a/components/camel-infinispan/camel-infinispan/pom.xml
+++ b/components/camel-infinispan/camel-infinispan/pom.xml
@@ -35,10 +35,6 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-support</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.camel</groupId>
             <artifactId>camel-infinispan-common</artifactId>
         </dependency>
         <dependency>
@@ -70,6 +66,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>camel-master</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-infinispan-common</artifactId>
             <version>${project.version}</version>
             <type>test-jar</type>
@@ -144,6 +145,11 @@
             <artifactId>awaitility</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 
diff --git a/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicy.java b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicy.java
deleted file mode 100644
index 71774e5..0000000
--- a/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicy.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan.remote;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-
-import org.apache.camel.Service;
-import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.component.infinispan.InfinispanRoutePolicy;
-import org.apache.camel.component.infinispan.InfinispanUtil;
-import org.apache.camel.support.service.ServiceSupport;
-import org.apache.camel.util.ObjectHelper;
-import org.infinispan.client.hotrod.Flag;
-import org.infinispan.client.hotrod.RemoteCache;
-import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
-import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
-import org.infinispan.client.hotrod.annotation.ClientListener;
-import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent;
-import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@ManagedResource(description = "Route policy using Infinispan Remote as clustered lock")
-public class InfinispanRemoteRoutePolicy extends InfinispanRoutePolicy {
-    private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanRemoteRoutePolicy.class);
-
-    private final InfinispanRemoteManager manager;
-
-    public InfinispanRemoteRoutePolicy(InfinispanRemoteConfiguration configuration) {
-        this(configuration, null, null);
-    }
-
-    public InfinispanRemoteRoutePolicy(InfinispanRemoteConfiguration configuration, String lockKey, String lockValue) {
-        super(lockKey, lockValue);
-
-        this.manager = new InfinispanRemoteManager(configuration);
-    }
-
-    @Override
-    protected Service createService() {
-        return new RemoteCacheService();
-    }
-
-    @Override
-    public void doStart() throws Exception {
-        super.doStart();
-        this.manager.start();
-    }
-
-    @Override
-    public void doStop() throws Exception {
-        super.doStop();
-        this.manager.stop();
-    }
-
-    // *************************************************************************
-    //
-    // *************************************************************************
-
-    @ClientListener
-    private final class RemoteCacheService extends ServiceSupport {
-        private final int lifespan;
-
-        private RemoteCache<String, String> cache;
-        private ScheduledExecutorService executorService;
-        private ScheduledFuture<?> future;
-        private Long version;
-
-        RemoteCacheService() {
-            this.lifespan = (int) getLifespanTimeUnit().toSeconds(getLifespan());
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        protected void doStart() throws Exception {
-            this.executorService = InfinispanUtil.newSingleThreadScheduledExecutor(getCamelContext(), this);
-            this.cache = manager.getCache(getLockMapName(), RemoteCache.class);
-            this.cache.addClientListener(this);
-            this.future = executorService.scheduleAtFixedRate(this::run, 0, getLifespan() / 2, getLifespanTimeUnit());
-        }
-
-        @Override
-        protected void doStop() throws Exception {
-            if (cache != null) {
-                this.cache.removeClientListener(this);
-
-                if (this.version != null) {
-                    this.cache.removeWithVersion(getLockKey(), this.version);
-                }
-            }
-
-            if (future != null) {
-                future.cancel(true);
-                future = null;
-            }
-
-            getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
-        }
-
-        private void run() {
-            if (!isRunAllowed()) {
-                return;
-            }
-
-            if (isLeader() && version != null) {
-                LOGGER.debug("Lock refresh key={} with version={}", getLockKey(), version);
-
-                // I'm still the leader, so refresh the key so it does not expire.
-                if (!cache.replaceWithVersion(getLockKey(), getLockValue(), version, lifespan)) {
-                    setLeader(false);
-                } else {
-                    version = cache.getWithMetadata(getLockKey()).getVersion();
-                    LOGGER.debug("Lock refreshed key={} with new version={}", getLockKey(), version);
-                }
-            }
-
-            if (!isLeader()) {
-                Object result = cache.withFlags(Flag.FORCE_RETURN_VALUE).putIfAbsent(getLockKey(), getLockValue(),
-                        getLifespan(), getLifespanTimeUnit());
-                if (result == null) {
-                    // Acquired the key so I'm the leader.
-                    setLeader(true);
-
-                    // Get the version
-                    version = cache.getWithMetadata(getLockKey()).getVersion();
-
-                    LOGGER.debug("Lock acquired key={} with version={}", getLockKey(), version);
-                } else if (ObjectHelper.equal(getLockValue(), result) && !isLeader()) {
-                    // Hey, I may have recovered from failure (or reboot was really
-                    // fast) and my key was still there so yeah, I'm the leader again!
-                    setLeader(true);
-
-                    // Get the version
-                    version = cache.getWithMetadata(getLockKey()).getVersion();
-
-                    LOGGER.debug("Lock resumed key={} with version={}", getLockKey(), version);
-                } else {
-                    setLeader(false);
-                }
-            }
-        }
-
-        @ClientCacheEntryRemoved
-        public void onCacheEntryRemoved(ClientCacheEntryRemovedEvent<String> event) {
-            LOGGER.debug("onCacheEntryRemoved lock-key={}, event-key={}", getLockKey(), event.getKey());
-            if (ObjectHelper.equal(getLockKey(), event.getKey())) {
-                run();
-            }
-        }
-
-        @ClientCacheEntryExpired
-        public void onCacheEntryExpired(ClientCacheEntryExpiredEvent<String> event) {
-            LOGGER.debug("onCacheEntryExpired lock-key={}, event-key={}", getLockKey(), event.getKey());
-            if (ObjectHelper.equal(getLockKey(), event.getKey())) {
-                run();
-            }
-        }
-    }
-}
diff --git a/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterConfiguration.java b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterConfiguration.java
new file mode 100644
index 0000000..54682a3
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterConfiguration.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.Map;
+
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterConfiguration;
+import org.apache.camel.component.infinispan.remote.InfinispanRemoteConfiguration;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.Configuration;
+
+public class InfinispanRemoteClusterConfiguration
+        extends InfinispanClusterConfiguration<InfinispanRemoteConfiguration>
+        implements Cloneable {
+
+    public InfinispanRemoteClusterConfiguration() {
+        super(new InfinispanRemoteConfiguration());
+    }
+
+    // ***********************************************
+    // Properties
+    // ***********************************************
+
+    public String getHosts() {
+        return getConfiguration().getHosts();
+    }
+
+    public void setHosts(String hosts) {
+        getConfiguration().setHosts(hosts);
+    }
+
+    public boolean isSecure() {
+        return getConfiguration().isSecure();
+    }
+
+    public void setSecure(boolean secure) {
+        getConfiguration().setSecure(secure);
+    }
+
+    public String getUsername() {
+        return getConfiguration().getUsername();
+    }
+
+    public void setUsername(String username) {
+        getConfiguration().setUsername(username);
+    }
+
+    public String getPassword() {
+        return getConfiguration().getPassword();
+    }
+
+    public void setPassword(String password) {
+        getConfiguration().setPassword(password);
+    }
+
+    public String getSaslMechanism() {
+        return getConfiguration().getSaslMechanism();
+    }
+
+    public void setSaslMechanism(String saslMechanism) {
+        getConfiguration().setSaslMechanism(saslMechanism);
+    }
+
+    public String getSecurityRealm() {
+        return getConfiguration().getSecurityRealm();
+    }
+
+    public void setSecurityRealm(String securityRealm) {
+        getConfiguration().setSecurityRealm(securityRealm);
+    }
+
+    public String getSecurityServerName() {
+        return getConfiguration().getSecurityServerName();
+    }
+
+    public void setSecurityServerName(String securityServerName) {
+        getConfiguration().setSecurityServerName(securityServerName);
+    }
+
+    public Map<String, String> getConfigurationProperties() {
+        return getConfiguration().getConfigurationProperties();
+    }
+
+    public void setConfigurationProperties(Map<String, String> configurationProperties) {
+        getConfiguration().setConfigurationProperties(configurationProperties);
+    }
+
+    public void addConfigurationProperty(String key, String value) {
+        getConfiguration().addConfigurationProperty(key, value);
+    }
+
+    public RemoteCacheManager getCacheContainer() {
+        return getConfiguration().getCacheContainer();
+    }
+
+    public void setCacheContainer(RemoteCacheManager cacheContainer) {
+        getConfiguration().setCacheContainer(cacheContainer);
+    }
+
+    public Configuration getCacheContainerConfiguration() {
+        return getConfiguration().getCacheContainerConfiguration();
+    }
+
+    public void setCacheContainerConfiguration(Configuration cacheContainerConfiguration) {
+        getConfiguration().setCacheContainerConfiguration(cacheContainerConfiguration);
+    }
+
+    // ***********************************************
+    //
+    // ***********************************************
+
+    @Override
+    public InfinispanRemoteClusterConfiguration clone() {
+        return (InfinispanRemoteClusterConfiguration) super.clone();
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterService.java b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterService.java
new file mode 100644
index 0000000..bbfd461
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterService.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterService;
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterView;
+import org.apache.camel.util.ObjectHelper;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.Configuration;
+
+public class InfinispanRemoteClusterService extends InfinispanClusterService {
+    private InfinispanRemoteClusterConfiguration configuration;
+
+    public InfinispanRemoteClusterService() {
+        this.configuration = new InfinispanRemoteClusterConfiguration();
+    }
+
+    public InfinispanRemoteClusterService(InfinispanRemoteClusterConfiguration configuration) {
+        this.configuration = configuration.clone();
+    }
+
+    // *********************************************
+    // Properties
+    // *********************************************
+
+    public InfinispanRemoteClusterConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(InfinispanRemoteClusterConfiguration configuration) {
+        this.configuration = configuration.clone();
+    }
+
+    public void setConfigurationUri(String configurationUri) {
+        configuration.setConfigurationUri(configurationUri);
+    }
+
+    public RemoteCacheManager getCacheContainer() {
+        return configuration.getCacheContainer();
+    }
+
+    public void setCacheContainer(RemoteCacheManager cacheContainer) {
+        configuration.setCacheContainer(cacheContainer);
+    }
+
+    public Configuration getCacheContainerConfiguration() {
+        return configuration.getCacheContainerConfiguration();
+    }
+
+    public void setCacheContainerConfiguration(Configuration cacheContainerConfiguration) {
+        configuration.setCacheContainerConfiguration(cacheContainerConfiguration);
+    }
+
+    public long getLifespan() {
+        return configuration.getLifespan();
+    }
+
+    public void setLifespan(long lifespan) {
+        configuration.setLifespan(lifespan);
+    }
+
+    public TimeUnit getLifespanTimeUnit() {
+        return configuration.getLifespanTimeUnit();
+    }
+
+    public void setLifespanTimeUnit(TimeUnit lifespanTimeUnit) {
+        configuration.setLifespanTimeUnit(lifespanTimeUnit);
+    }
+
+    // *********************************************
+    // Impl
+    // *********************************************
+
+    @Override
+    protected InfinispanClusterView createView(String namespace) throws Exception {
+        // Validate parameters
+        ObjectHelper.notNull(getCamelContext(), "Camel Context");
+        ObjectHelper.notNull(getId(), "Cluster ID");
+
+        return new InfinispanRemoteClusterView(this, configuration, namespace);
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterView.java b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterView.java
new file mode 100644
index 0000000..a0dd216
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterView.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import org.apache.camel.cluster.CamelClusterMember;
+import org.apache.camel.component.infinispan.InfinispanUtil;
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterService;
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterView;
+import org.apache.camel.component.infinispan.remote.InfinispanRemoteManager;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.support.service.ServiceSupport;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
+import org.infinispan.client.hotrod.annotation.ClientListener;
+import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.util.function.Predicates.negate;
+
+public class InfinispanRemoteClusterView extends InfinispanClusterView {
+    private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanRemoteClusterService.class);
+
+    private final InfinispanRemoteClusterConfiguration configuration;
+    private final InfinispanRemoteManager manager;
+    private final LocalMember localMember;
+    private final LeadershipService leadership;
+
+    private RemoteCache<String, String> cache;
+
+    protected InfinispanRemoteClusterView(
+                                          InfinispanRemoteClusterService cluster,
+                                          InfinispanRemoteClusterConfiguration configuration,
+                                          String namespace) {
+        super(cluster, namespace);
+
+        this.configuration = configuration;
+        this.manager = new InfinispanRemoteManager(this.configuration.getConfiguration());
+        this.leadership = new LeadershipService();
+        this.localMember = new LocalMember(cluster.getId());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void doStart() throws Exception {
+        super.doStart();
+
+        ServiceHelper.startService(manager);
+
+        this.cache = manager.getCache(getNamespace(), RemoteCache.class);
+
+        ServiceHelper.startService(leadership);
+    }
+
+    @Override
+    public void doStop() throws Exception {
+        super.doStop();
+
+        LOGGER.info("shutdown service: {}", getClusterService().getId());
+
+        ServiceHelper.stopService(leadership);
+        ServiceHelper.stopService(manager);
+
+        this.cache = null;
+    }
+
+    @Override
+    public CamelClusterMember getLocalMember() {
+        return this.localMember;
+    }
+
+    @Override
+    public List<CamelClusterMember> getMembers() {
+        return this.cache != null
+                ? cache.keySet().stream()
+                        .filter(negate(InfinispanClusterService.LEADER_KEY::equals))
+                        .map(ClusterMember::new)
+                        .collect(Collectors.toList())
+                : Collections.emptyList();
+    }
+
+    @Override
+    public Optional<CamelClusterMember> getLeader() {
+        if (this.cache == null) {
+            return Optional.empty();
+        }
+
+        String id = cache.get(InfinispanClusterService.LEADER_KEY);
+        if (id == null) {
+            return Optional.empty();
+        }
+
+        return Optional.of(new ClusterMember(id));
+    }
+
+    @Override
+    protected boolean isLeader(String id) {
+        if (this.cache == null) {
+            return false;
+        }
+        if (id == null) {
+            return false;
+        }
+
+        final String key = InfinispanClusterService.LEADER_KEY;
+        final String val = this.cache.get(key);
+
+        return Objects.equals(id, val);
+    }
+
+    // *****************************************
+    //
+    // Service
+    //
+    // *****************************************
+
+    @ClientListener
+    private final class LeadershipService extends ServiceSupport {
+        private final int lifespan;
+        private final AtomicBoolean running;
+
+        private ScheduledExecutorService executorService;
+        private Long version;
+
+        LeadershipService() {
+            this.lifespan = (int) configuration.getLifespanTimeUnit().toSeconds(configuration.getLifespan());
+            this.running = new AtomicBoolean(false);
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+            super.doStart();
+
+            this.running.set(true);
+            this.executorService = InfinispanUtil.newSingleThreadScheduledExecutor(
+                    getCamelContext(),
+                    this,
+                    getLocalMember().getId());
+
+            // register the local member to the inventory
+            cache.put(
+                    getLocalMember().getId(),
+                    "false",
+                    configuration.getLifespan(),
+                    configuration.getLifespanTimeUnit());
+
+            cache.addClientListener(this);
+
+            executorService.scheduleAtFixedRate(
+                    this::run,
+                    0,
+                    configuration.getLifespan() / 2,
+                    configuration.getLifespanTimeUnit());
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+            super.doStop();
+
+            this.running.set(false);
+
+            if (cache != null) {
+                cache.removeClientListener(this);
+            }
+
+            getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
+
+            if (cache != null) {
+                if (this.version != null) {
+                    cache.removeWithVersion(InfinispanClusterService.LEADER_KEY, this.version);
+                }
+
+                LOGGER.info("Removing local member, key={}", getLocalMember().getId());
+                cache.remove(getLocalMember().getId());
+            }
+
+            this.version = null;
+        }
+
+        private boolean isLeader() {
+            return getLocalMember().isLeader();
+        }
+
+        private void setLeader(boolean leader) {
+            ((LocalMember) getLocalMember()).setLeader(leader);
+        }
+
+        private synchronized void run() {
+            if (!running.get()) {
+                return;
+            }
+
+            final String leaderKey = InfinispanClusterService.LEADER_KEY;
+            final String localId = getLocalMember().getId();
+
+            if (isLeader() && version != null) {
+                LOGGER.debug("Lock refresh key={}, id{} with version={}", leaderKey, localId, version);
+
+                // I'm still the leader, so refresh the key so it does not expire.
+                if (!cache.replaceWithVersion(
+                        leaderKey,
+                        getClusterService().getId(),
+                        version,
+                        lifespan)) {
+
+                    LOGGER.debug("Failed to refresh the lock key={}, id={}, version={}", leaderKey, localId, version);
+
+                    setLeader(false);
+                } else {
+                    version = cache.getWithMetadata(leaderKey).getVersion();
+
+                    LOGGER.debug("Lock refreshed key={}, ud={}, with new version={}", leaderKey, localId, version);
+                }
+            }
+
+            if (!isLeader()) {
+                LOGGER.debug("Try to acquire lock key={}, id={}", leaderKey, localId);
+
+                Object result = cache.withFlags(Flag.FORCE_RETURN_VALUE)
+                        .putIfAbsent(
+                                leaderKey,
+                                localId,
+                                configuration.getLifespan(),
+                                configuration.getLifespanTimeUnit());
+
+                if (result == null) {
+                    // Acquired the key so I'm the leader.
+                    setLeader(true);
+
+                    // Get the version
+                    version = cache.getWithMetadata(leaderKey).getVersion();
+
+                    LOGGER.debug("Lock acquired key={}, id={}, with version={}", leaderKey, localId, version);
+
+                } else if (Objects.equals(getClusterService().getId(), result) && !isLeader()) {
+                    // Hey, I may have recovered from failure (or reboot was really
+                    // fast) and my key was still there so yeah, I'm the leader again!
+                    setLeader(true);
+
+                    // Get the version
+                    version = cache.getWithMetadata(leaderKey).getVersion();
+
+                    LOGGER.debug("Lock resumed key={}, id={} with version={}", leaderKey, localId, version);
+                } else {
+                    LOGGER.debug("Failed to acquire the lock key={}, id={}", leaderKey, localId);
+
+                    setLeader(false);
+                }
+            }
+
+            // refresh local membership
+            cache.put(
+                    getLocalMember().getId(),
+                    isLeader() ? "true" : "false",
+                    configuration.getLifespan(),
+                    configuration.getLifespanTimeUnit());
+        }
+
+        @ClientCacheEntryRemoved
+        public void onCacheEntryRemoved(ClientCacheEntryRemovedEvent<String> event) {
+            if (!running.get()) {
+                return;
+            }
+
+            LOGGER.debug("onCacheEntryRemoved id={}, lock-key={}, event-key={}",
+                    getLocalMember().getId(),
+                    InfinispanClusterService.LEADER_KEY,
+                    event.getKey());
+
+            if (Objects.equals(InfinispanClusterService.LEADER_KEY, event.getKey())) {
+                executorService.execute(this::run);
+            }
+        }
+
+        @ClientCacheEntryExpired
+        public void onCacheEntryExpired(ClientCacheEntryExpiredEvent<String> event) {
+            if (!running.get()) {
+                return;
+            }
+
+            LOGGER.debug("onCacheEntryExpired id={}, lock-key={}, event-key={}",
+                    getLocalMember().getId(),
+                    InfinispanClusterService.LEADER_KEY,
+                    event.getKey());
+
+            if (Objects.equals(InfinispanClusterService.LEADER_KEY, event.getKey())) {
+                executorService.execute(this::run);
+            }
+        }
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicyTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicyTest.java
deleted file mode 100644
index f09887d..0000000
--- a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicyTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan.remote;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.infinispan.InfinispanRoutePolicy;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.test.infra.infinispan.services.InfinispanService;
-import org.apache.camel.test.infra.infinispan.services.InfinispanServiceFactory;
-import org.infinispan.client.hotrod.RemoteCacheManager;
-import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
-import org.infinispan.configuration.cache.CacheMode;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class InfinispanRemoteRoutePolicyTest {
-    public static final String CACHE_NAME = "_route_policy";
-
-    @RegisterExtension
-    public static InfinispanService service = InfinispanServiceFactory.createService();
-
-    @Test
-    public void testLeadership() throws Exception {
-        try (RemoteCacheManager cacheContainer = createCacheContainer(service)) {
-
-            cacheContainer.start();
-
-            final InfinispanRoutePolicy policy1 = createRoutePolicy(cacheContainer, "route1");
-            final InfinispanRoutePolicy policy2 = createRoutePolicy(cacheContainer, "route2");
-
-            try (CamelContext context = new DefaultCamelContext()) {
-                context.start();
-
-                RouteBuilder.addRoutes(context, b -> b.from("direct:r1").routePolicy(policy1).to("mock:p1"));
-
-                await().atMost(10, TimeUnit.SECONDS).until(policy1::isLeader);
-
-                RouteBuilder.addRoutes(context, b -> b.from("direct:r2").routePolicy(policy2).to("mock:p2"));
-
-                assertTrue(policy1.isLeader());
-                assertFalse(policy2.isLeader());
-
-                policy1.shutdown();
-
-                await().atMost(10, TimeUnit.SECONDS).until(policy2::isLeader);
-
-                assertFalse(policy1.isLeader());
-                assertTrue(policy2.isLeader());
-            }
-        }
-    }
-
-    // *****************************
-    //
-    // *****************************
-
-    private static RemoteCacheManager createCacheContainer(InfinispanService service) {
-        ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
-
-        // for default tests, we force return value for all the
-        // operations
-        clientBuilder
-                .forceReturnValues(true);
-
-        // add server from the test infra service
-        clientBuilder
-                .addServer()
-                .host(service.host())
-                .port(service.port());
-
-        // add security info
-        clientBuilder
-                .security()
-                .authentication()
-                .username(service.username())
-                .password(service.password())
-                .serverName("infinispan")
-                .saslMechanism("DIGEST-MD5")
-                .realm("default");
-
-        RemoteCacheManager cacheContainer = new RemoteCacheManager(clientBuilder.build());
-        cacheContainer.administration()
-                .getOrCreateCache(
-                        CACHE_NAME,
-                        new org.infinispan.configuration.cache.ConfigurationBuilder()
-                                .clustering()
-                                .cacheMode(CacheMode.DIST_SYNC).build());
-
-        return cacheContainer;
-    }
-
-    private static InfinispanRoutePolicy createRoutePolicy(RemoteCacheManager cacheContainer, String lockValue) {
-        InfinispanRemoteConfiguration configuration = new InfinispanRemoteConfiguration();
-        configuration.setCacheContainer(cacheContainer);
-
-        InfinispanRemoteRoutePolicy policy = new InfinispanRemoteRoutePolicy(configuration);
-        policy.setLockMapName(CACHE_NAME);
-        policy.setLockKey("lock-key");
-        policy.setLockValue(lockValue);
-
-        return policy;
-    }
-}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/AbstractInfinispanRemoteClusteredTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/AbstractInfinispanRemoteClusteredTest.java
new file mode 100644
index 0000000..aca2ea8
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/AbstractInfinispanRemoteClusteredTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.camel.test.infra.infinispan.services.InfinispanService;
+import org.apache.camel.test.infra.infinispan.services.InfinispanServiceFactory;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.infinispan.remote.cluster.InfinispanRemoteClusteredTestSupport.createCache;
+import static org.apache.camel.component.infinispan.remote.cluster.InfinispanRemoteClusteredTestSupport.createConfiguration;
+import static org.assertj.core.api.Assertions.assertThat;
+
+abstract class AbstractInfinispanRemoteClusteredTest {
+    @RegisterExtension
+    public static InfinispanService service = InfinispanServiceFactory.createService();
+
+    @Timeout(value = 1, unit = TimeUnit.MINUTES)
+    @Test
+    public void test() throws Exception {
+        final Logger logger = LoggerFactory.getLogger(getClass());
+        final List<String> clients = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList());
+        final List<String> results = new ArrayList<>();
+        final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(clients.size() * 2);
+        final CountDownLatch latch = new CountDownLatch(clients.size());
+        final String viewName = "myView";
+
+        Configuration configuration = createConfiguration(service);
+
+        try (RemoteCacheManager cacheContainer = new RemoteCacheManager(configuration)) {
+            createCache(cacheContainer, viewName);
+
+            for (String id : clients) {
+                scheduler.submit(() -> {
+                    try {
+                        run(cacheContainer, viewName, id);
+                        logger.debug("Node {} is shutting down", id);
+                        results.add(id);
+                    } catch (Exception e) {
+                        logger.warn("", e);
+                    } finally {
+                        latch.countDown();
+                    }
+                });
+            }
+
+            latch.await();
+            scheduler.shutdownNow();
+
+            assertThat(results).hasSameSizeAs(clients);
+            assertThat(results).containsAll(clients);
+        }
+    }
+
+    protected abstract void run(RemoteCacheManager cacheContainer, String namespace, String id) throws Exception;
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredMasterTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredMasterTest.java
new file mode 100644
index 0000000..73621cc
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredMasterTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+
+public class InfinispanRemoteClusteredMasterTest extends AbstractInfinispanRemoteClusteredTest {
+    @Override
+    protected void run(RemoteCacheManager cacheContainer, String namespace, String id) throws Exception {
+        final int events = ThreadLocalRandom.current().nextInt(2, 6);
+        final CountDownLatch contextLatch = new CountDownLatch(events);
+
+        //Set up a single node cluster.
+        InfinispanRemoteClusterService clusterService = new InfinispanRemoteClusterService();
+        clusterService.setCacheContainer(cacheContainer);
+        clusterService.setId("node-" + id);
+
+        try (DefaultCamelContext context = new DefaultCamelContext()) {
+            context.disableJMX();
+            context.setName("context-" + id);
+            context.addService(clusterService);
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    fromF("master:%s:timer:%s?delay=1000&period=1000&repeatCount=%d", namespace, id, events)
+                            .routeId("route-" + id)
+                            .log("From id=${routeId} counter=${header.CamelTimerCounter}")
+                            .process(e -> contextLatch.countDown());
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+        }
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyFactoryTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyFactoryTest.java
new file mode 100644
index 0000000..bbbc1d1
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyFactoryTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.cluster.ClusteredRoutePolicyFactory;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+
+public class InfinispanRemoteClusteredRoutePolicyFactoryTest extends AbstractInfinispanRemoteClusteredTest {
+    @Override
+    protected void run(RemoteCacheManager cacheContainer, String namespace, String id) throws Exception {
+        final int events = ThreadLocalRandom.current().nextInt(2, 6);
+        final CountDownLatch contextLatch = new CountDownLatch(events);
+
+        //Set up a single node cluster.
+        InfinispanRemoteClusterService clusterService = new InfinispanRemoteClusterService();
+        clusterService.setCacheContainer(cacheContainer);
+        clusterService.setId("node-" + id);
+
+        try (DefaultCamelContext context = new DefaultCamelContext()) {
+            context.disableJMX();
+            context.setName("context-" + id);
+            context.addService(clusterService);
+            context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace(namespace));
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    fromF("timer:%s?delay=1000&period=1000&repeatCount=%d", id, events)
+                            .routeId("route-" + id)
+                            .log("From id=${routeId} counter=${header.CamelTimerCounter}")
+                            .process(e -> contextLatch.countDown());
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+        }
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyTest.java
new file mode 100644
index 0000000..5dcafca
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.cluster.ClusteredRoutePolicy;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+
+public class InfinispanRemoteClusteredRoutePolicyTest extends AbstractInfinispanRemoteClusteredTest {
+    @Override
+    protected void run(RemoteCacheManager cacheContainer, String namespace, String id) throws Exception {
+        final int events = ThreadLocalRandom.current().nextInt(2, 6);
+        final CountDownLatch contextLatch = new CountDownLatch(events);
+
+        //Set up a single node cluster.
+        InfinispanRemoteClusterService clusterService = new InfinispanRemoteClusterService();
+        clusterService.setCacheContainer(cacheContainer);
+        clusterService.setId("node-" + id);
+
+        try (DefaultCamelContext context = new DefaultCamelContext()) {
+            context.disableJMX();
+            context.setName("context-" + id);
+            context.addService(clusterService);
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    fromF("timer:%s?delay=1000&period=1000&repeatCount=%d", id, events)
+                            .routeId("route-" + id)
+                            .routePolicy(ClusteredRoutePolicy.forNamespace(namespace))
+                            .log("From id=${routeId} counter=${header.CamelTimerCounter}")
+                            .process(e -> contextLatch.countDown());
+                }
+            });
+
+            // Start the context after some random time so the startup order
+            // changes for each test.
+            Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+            context.start();
+
+            contextLatch.await();
+        }
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredTestSupport.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredTestSupport.java
new file mode 100644
index 0000000..a78ab82
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredTestSupport.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import org.apache.camel.test.infra.infinispan.services.InfinispanService;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.Configuration;
+import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
+import org.infinispan.configuration.cache.CacheMode;
+
+public final class InfinispanRemoteClusteredTestSupport {
+    private InfinispanRemoteClusteredTestSupport() {
+    }
+
+    public static Configuration createConfiguration(InfinispanService service) {
+        return new ConfigurationBuilder()
+                .addServer()
+                .host(service.host())
+                .port(service.port())
+                .security()
+                .authentication()
+                .username(service.username())
+                .password(service.password())
+                .serverName("infinispan")
+                .saslMechanism("DIGEST-MD5")
+                .realm("default")
+                .build();
+
+    }
+
+    public static void createCache(RemoteCacheManager cacheContainer, String cacheName) {
+        cacheContainer.administration()
+                .getOrCreateCache(
+                        cacheName,
+                        new org.infinispan.configuration.cache.ConfigurationBuilder()
+                                .clustering()
+                                .cacheMode(CacheMode.DIST_SYNC).build());
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredViewTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredViewTest.java
new file mode 100644
index 0000000..dd76a64
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredViewTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.cluster.CamelClusterMember;
+import org.apache.camel.cluster.CamelClusterView;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.infra.infinispan.services.InfinispanService;
+import org.apache.camel.test.infra.infinispan.services.InfinispanServiceFactory;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import static org.apache.camel.component.infinispan.remote.cluster.InfinispanRemoteClusteredTestSupport.createCache;
+import static org.apache.camel.component.infinispan.remote.cluster.InfinispanRemoteClusteredTestSupport.createConfiguration;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class InfinispanRemoteClusteredViewTest {
+    @RegisterExtension
+    public static InfinispanService service = InfinispanServiceFactory.createService();
+
+    @Test
+    public void getLeaderTest() throws Exception {
+        final String viewName = "myView";
+
+        Configuration configuration = createConfiguration(service);
+
+        try (RemoteCacheManager cacheContainer = new RemoteCacheManager(configuration)) {
+            createCache(cacheContainer, viewName);
+
+            InfinispanRemoteClusterService clusterService = new InfinispanRemoteClusterService();
+            clusterService.setCacheContainer(cacheContainer);
+            clusterService.setId("node");
+
+            //Set up context with single locked route.
+            try (DefaultCamelContext context = new DefaultCamelContext()) {
+                context.disableJMX();
+                context.addService(clusterService);
+
+                context.addRoutes(new RouteBuilder() {
+                    @Override
+                    public void configure() {
+                        fromF("master:%s:timer:infinispan?repeatCount=1", viewName)
+                                .routeId("route1")
+                                .stop();
+                    }
+                });
+
+                context.start();
+
+                CamelClusterView view = clusterService.getView(viewName);
+
+                await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+                    assertThat(view.getLeader())
+                            .get()
+                            .satisfies(CamelClusterMember::isLeader)
+                            .satisfies(CamelClusterMember::isLocal);
+                });
+            }
+        }
+    }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/resources/log4j2.properties b/components/camel-infinispan/camel-infinispan/src/test/resources/log4j2.properties
index 3847970..5a3ff87 100644
--- a/components/camel-infinispan/camel-infinispan/src/test/resources/log4j2.properties
+++ b/components/camel-infinispan/camel-infinispan/src/test/resources/log4j2.properties
@@ -34,6 +34,8 @@ logger.infinispan-camel.name = org.apache.camel.component.infinispan
 logger.infinispan-camel.level = INFO
 logger.infinispan-camel-remote.name = org.apache.camel.component.infinispan.remote
 logger.infinispan-camel-remote.level = INFO
+logger.infinispan-camel-remote-cluster.name = org.apache.camel.component.infinispan.remote.cluster
+logger.infinispan-camel-remote-cluster.level = DEBUG
 logger.infinispan-camel-embedded.name = org.apache.camel.component.infinispan.embedded
 logger.infinispan-test-infra-container.name = container.infinispan
 logger.infinispan-test-infra-container.level = WARN