You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/04/07 15:37:11 UTC

camel git commit: CAMEL-10287: Infinispan RoutePolicy to have one route being master, and others as slaves

Repository: camel
Updated Branches:
  refs/heads/master bef3d9224 -> dbd68347b


CAMEL-10287: Infinispan RoutePolicy to have one route being master, and others as slaves


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dbd68347
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dbd68347
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dbd68347

Branch: refs/heads/master
Commit: dbd68347b29ce7813262ed2a63fb31e36e94a034
Parents: bef3d92
Author: lburgazzoli <lb...@gmail.com>
Authored: Wed Apr 5 18:37:21 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Fri Apr 7 17:36:32 2017 +0200

----------------------------------------------------------------------
 components/camel-infinispan/pom.xml             |  14 +-
 .../infinispan/InfinispanConfiguration.java     |  26 +
 .../component/infinispan/InfinispanManager.java |  44 +-
 .../component/infinispan/InfinispanUtil.java    |   2 +-
 .../policy/InfinispanRoutePolicy.java           | 522 +++++++++++++++++++
 .../InfinispanEmbeddedRoutePolicyTest.java      |  28 +
 .../policy/InfinispanRemoteRoutePolicyTest.java |  40 ++
 .../policy/InfinispanRoutePolicyTestBase.java   | 118 +++++
 .../src/test/resources/log4j.xml                |   2 +-
 9 files changed, 771 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/pom.xml b/components/camel-infinispan/pom.xml
index ddd590d..ea8751c 100644
--- a/components/camel-infinispan/pom.xml
+++ b/components/camel-infinispan/pom.xml
@@ -112,31 +112,25 @@
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-api</artifactId>
-            <version>${log4j2-25-version}</version>
+            <version>${log4j2-version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-core</artifactId>
-            <version>${log4j2-25-version}</version>
+            <version>${log4j2-version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-slf4j-impl</artifactId>
-            <version>${log4j2-25-version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-1.2-api</artifactId>
-            <version>${log4j2-25-version}</version>
+            <version>${log4j2-version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-jcl</artifactId>
-            <version>${log4j2-25-version}</version>
+            <version>${log4j2-version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
index 0ab5852..0835974 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
@@ -17,7 +17,9 @@
 package org.apache.camel.component.infinispan;
 
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.camel.spi.Metadata;
@@ -60,6 +62,8 @@ public class InfinispanConfiguration {
     private Flag[] flags;
     @UriParam(label = "advanced")
     private String configurationUri;
+    @UriParam(label = "advanced")
+    private Map<String, String> configurationProperties;
 
 
     public String getCommand() {
@@ -223,4 +227,26 @@ public class InfinispanConfiguration {
     public void setConfigurationUri(String configurationUri) {
         this.configurationUri = configurationUri;
     }
+
+    public Map<String, String> getConfigurationProperties() {
+        return configurationProperties;
+    }
+
+    /**
+     * Infinispan configuration properties.
+     */
+    public void setConfigurationProperties(Map<String, String> configurationProperties) {
+        this.configurationProperties = configurationProperties;
+    }
+
+    /**
+     * Add configuration
+     */
+    public void addConfigurationProperty(String key, String value) {
+        if (this.configurationProperties == null) {
+            this.configurationProperties = new HashMap<>();
+        }
+
+        this.configurationProperties.put(key, value);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
index eab846b..8b739bb 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.infinispan;
 
 
+import java.util.Properties;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Service;
@@ -25,6 +27,7 @@ import org.infinispan.client.hotrod.RemoteCacheManager;
 import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
 import org.infinispan.commons.api.BasicCache;
 import org.infinispan.commons.api.BasicCacheContainer;
+import org.infinispan.manager.DefaultCacheManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +39,11 @@ public class InfinispanManager implements Service {
     private BasicCacheContainer cacheContainer;
     private boolean isManagedCacheContainer;
 
+    public InfinispanManager() {
+        this.camelContext = null;
+        this.configuration = new InfinispanConfiguration();
+        this.configuration.setCacheContainer(new DefaultCacheManager(true));
+    }
 
     public InfinispanManager(InfinispanConfiguration configuration) {
         this(null, configuration);
@@ -50,24 +58,34 @@ public class InfinispanManager implements Service {
     public void start() throws Exception {
         cacheContainer = configuration.getCacheContainer();
         if (cacheContainer == null) {
+            ConfigurationBuilder builder = new ConfigurationBuilder();
+            builder.classLoader(Thread.currentThread().getContextClassLoader());
+
+            Properties properties = new Properties();
+
             String uri = configuration.getConfigurationUri();
             if (uri != null && camelContext != null) {
                 uri = camelContext.resolvePropertyPlaceholders(uri);
             }
 
-            ConfigurationBuilder configurationBuilder = new ConfigurationBuilder()
-                .classLoader(Thread.currentThread().getContextClassLoader());
-
             if (uri != null) {
-                configurationBuilder.withProperties(InfinispanUtil.loadProperties(camelContext, uri));
+                properties.putAll(InfinispanUtil.loadProperties(camelContext, uri));
+            }
+            if (configuration.getConfigurationProperties() != null) {
+                properties.putAll(configuration.getConfigurationProperties());
             }
 
-            cacheContainer = new RemoteCacheManager(
-                configurationBuilder
-                    .addServers(configuration.getHost())
-                    .build(),
-                true);
+            if (!properties.isEmpty()) {
+                builder.withProperties(properties);
+            }
+
+            if (configuration.getHost() != null) {
+                builder.addServers(configuration.getHost());
+            }
+
+
 
+            cacheContainer = new RemoteCacheManager(builder.build(), true);
             isManagedCacheContainer = true;
         }
     }
@@ -91,22 +109,22 @@ public class InfinispanManager implements Service {
         return InfinispanUtil.isRemote(cacheContainer);
     }
 
-    public BasicCache<Object, Object> getCache() {
+    public <K, V > BasicCache<K, V> getCache() {
         return getCache(configuration.getCacheName());
     }
 
-    public BasicCache<Object, Object> getCache(Exchange exchange) {
+    public <K, V > BasicCache<K, V> getCache(Exchange exchange) {
         return getCache(exchange.getIn().getHeader(InfinispanConstants.CACHE_NAME, String.class));
     }
 
-    public BasicCache<Object, Object> getCache(String cacheName) {
+    public <K, V > BasicCache<K, V> getCache(String cacheName) {
         if (cacheName == null) {
             cacheName = configuration.getCacheName();
         }
 
         LOGGER.trace("Cache[{}]", cacheName);
 
-        BasicCache<Object, Object> cache = InfinispanUtil.getCache(cacheContainer, cacheName);
+        BasicCache<K, V> cache = InfinispanUtil.getCache(cacheContainer, cacheName);
         if (configuration.hasFlags() && InfinispanUtil.isEmbedded(cache)) {
             cache = new DecoratedCache(InfinispanUtil.asAdvanced(cache), configuration.getFlags());
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
index f562bb2..4de6319 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
@@ -106,7 +106,7 @@ public final class InfinispanUtil {
         return ObjectHelper.isEmpty(message.getHeader(header));
     }
 
-    public static BasicCache<Object, Object> getCache(BasicCacheContainer cacheContainer, String cacheName) {
+    public static <K, V> BasicCache<K, V> getCache(BasicCacheContainer cacheContainer, String cacheName) {
         return ObjectHelper.isEmpty(cacheName) ? cacheContainer.getCache() : cacheContainer.getCache(cacheName);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java
new file mode 100644
index 0000000..7103edf
--- /dev/null
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.policy;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Route;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.Service;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.component.infinispan.InfinispanConfiguration;
+import org.apache.camel.component.infinispan.InfinispanManager;
+import org.apache.camel.component.infinispan.InfinispanUtil;
+import org.apache.camel.support.RoutePolicySupport;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StringHelper;
+import org.infinispan.Cache;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
+import org.infinispan.client.hotrod.annotation.ClientListener;
+import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
+import org.infinispan.commons.api.BasicCache;
+import org.infinispan.commons.api.BasicCacheContainer;
+import org.infinispan.notifications.Listener;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
+import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ManagedResource(description = "Route policy using Infinispan as clustered lock")
+public class InfinispanRoutePolicy extends RoutePolicySupport implements CamelContextAware {
+    private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanRoutePolicy.class);
+
+    private final AtomicBoolean leader;
+    private final Set<Route> suspendedRoutes;
+    private final InfinispanManager manager;
+
+    private Route route;
+    private CamelContext camelContext;
+    private ScheduledExecutorService executorService;
+    private boolean shouldStopConsumer;
+    private String lockMapName;
+    private String lockKey;
+    private String lockValue;
+    private long lifespan;
+    private TimeUnit lifespanTimeUnit;
+    private ScheduledFuture<?> future;
+    private Service service;
+
+    public InfinispanRoutePolicy(InfinispanConfiguration configuration) {
+        this(new InfinispanManager(configuration), null, null);
+    }
+
+    public InfinispanRoutePolicy(InfinispanManager manager) {
+        this(manager, null, null);
+    }
+
+    public InfinispanRoutePolicy(InfinispanManager manager, String lockKey, String lockValue) {
+        this.manager = manager;
+        this.suspendedRoutes =  new HashSet<>();
+        this.leader = new AtomicBoolean(false);
+        this.shouldStopConsumer = true;
+        this.lockKey = lockKey;
+        this.lockValue = lockValue;
+        this.lifespan = 30;
+        this.lifespanTimeUnit = TimeUnit.SECONDS;
+        this.service = null;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public void onInit(Route route) {
+        super.onInit(route);
+        this.route = route;
+    }
+
+    @Override
+    public void onStart(Route route) {
+        try {
+            startService();
+        } catch (Exception e) {
+            throw new RuntimeCamelException(e);
+        }
+
+        if (!leader.get() && shouldStopConsumer) {
+            stopConsumer(route);
+        }
+    }
+
+    @Override
+    public synchronized void onStop(Route route) {
+        try {
+            stopService();
+        } catch (Exception e) {
+            throw new RuntimeCamelException(e);
+        }
+
+        suspendedRoutes.remove(route);
+    }
+
+    @Override
+    public synchronized void onSuspend(Route route) {
+        try {
+            stopService();
+        } catch (Exception e) {
+            throw new RuntimeCamelException(e);
+        }
+
+        suspendedRoutes.remove(route);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        // validate
+        StringHelper.notEmpty(lockMapName, "lockMapName", this);
+        StringHelper.notEmpty(lockKey, "lockKey", this);
+        StringHelper.notEmpty(lockValue, "lockValue", this);
+        ObjectHelper.notNull(camelContext, "camelContext", this);
+
+        if (this.lockValue == null) {
+            this.lockValue = camelContext.getUuidGenerator().generateUuid();
+        }
+
+        this.manager.start();
+        this.executorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "InfinispanRoutePolicy");
+
+        if (lifespanTimeUnit.convert(lifespan, TimeUnit.SECONDS) < 2) {
+            throw new IllegalArgumentException("Lock lifespan can not be less that 2 seconds");
+        }
+
+        BasicCache<String, String> cache = manager.getCache(lockMapName);
+        if (manager.isCacheContainerEmbedded()) {
+            this.service = new EmbeddedCacheService(InfinispanUtil.asEmbedded(cache));
+        } else {
+            this.service = new RemoteCacheService(InfinispanUtil.asRemote(cache));
+        }
+
+        super.doStart();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (future != null) {
+            future.cancel(true);
+            future = null;
+        }
+
+        if (this.service != null) {
+            this.service.stop();
+        }
+
+        getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
+
+        leader.set(false);
+        manager.stop();
+
+        super.doStop();
+    }
+
+    private void startService() throws Exception {
+        if (service == null) {
+            throw new IllegalStateException("An Infinispan CacheService should be configured");
+        }
+
+        service.start();
+    }
+
+    private void stopService() throws Exception {
+        leader.set(false);
+
+        if (this.service != null) {
+            this.service.stop();
+        }
+    }
+
+    // *************************************************************************
+    //
+    // *************************************************************************
+
+    protected void setLeader(boolean isLeader) {
+        if (isLeader && leader.compareAndSet(false, isLeader)) {
+            LOGGER.info("Leadership taken (map={}, key={}, val={})", lockMapName, lockKey, lockValue);
+
+            startAllStoppedConsumers();
+        } else if (!isLeader && leader.getAndSet(isLeader)) {
+            LOGGER.info("Leadership lost (map={}, key={} val={})", lockMapName, lockKey, lockValue);
+        }
+
+        if (!isLeader && this.route != null) {
+            stopConsumer(route);
+        }
+    }
+
+    private synchronized void startConsumer(Route route) {
+        try {
+            if (suspendedRoutes.contains(route)) {
+                startConsumer(route.getConsumer());
+                suspendedRoutes.remove(route);
+            }
+        } catch (Exception e) {
+            handleException(e);
+        }
+    }
+
+    private synchronized void stopConsumer(Route route) {
+        try {
+            if (!suspendedRoutes.contains(route)) {
+                LOGGER.debug("Stopping consumer for {} ({})", route.getId(), route.getConsumer());
+                stopConsumer(route.getConsumer());
+                suspendedRoutes.add(route);
+            }
+        } catch (Exception e) {
+            handleException(e);
+        }
+    }
+
+    private synchronized void startAllStoppedConsumers() {
+        try {
+            for (Route route : suspendedRoutes) {
+                LOGGER.debug("Starting consumer for {} ({})", route.getId(), route.getConsumer());
+                startConsumer(route.getConsumer());
+            }
+
+            suspendedRoutes.clear();
+        } catch (Exception e) {
+            handleException(e);
+        }
+    }
+
+    // *************************************************************************
+    // Getter/Setters
+    // *************************************************************************
+
+    @ManagedAttribute(description = "The route id")
+    public String getRouteId() {
+        if (route != null) {
+            return route.getId();
+        }
+        return null;
+    }
+
+    @ManagedAttribute(description = "The consumer endpoint", mask = true)
+    public String getEndpointUrl() {
+        if (route != null && route.getConsumer() != null && route.getConsumer().getEndpoint() != null) {
+            return route.getConsumer().getEndpoint().toString();
+        }
+        return null;
+    }
+
+    @ManagedAttribute(description = "Whether to stop consumer when starting up and failed to become master")
+    public boolean isShouldStopConsumer() {
+        return shouldStopConsumer;
+    }
+
+    public void setShouldStopConsumer(boolean shouldStopConsumer) {
+        this.shouldStopConsumer = shouldStopConsumer;
+    }
+
+    @ManagedAttribute(description = "The lock map name")
+    public String getLockMapName() {
+        return lockMapName;
+    }
+
+    public void setLockMapName(String lockMapName) {
+        this.lockMapName = lockMapName;
+    }
+
+    @ManagedAttribute(description = "The lock key")
+    public String getLockKey() {
+        return lockKey;
+    }
+
+    public void setLockKey(String lockKey) {
+        this.lockKey = lockKey;
+    }
+
+    @ManagedAttribute(description = "The lock value")
+    public String getLockValue() {
+        return lockValue;
+    }
+
+    public void setLockValue(String lockValue) {
+        this.lockValue = lockValue;
+    }
+
+
+    @ManagedAttribute(description = "The key lifespan for the lock")
+    public long getLifespan() {
+        return lifespan;
+    }
+
+    public void setLifespan(long lifespan) {
+        this.lifespan = lifespan;
+    }
+
+    public void setLifespan(long lifespan, TimeUnit lifespanTimeUnit) {
+        this.lifespan = lifespan;
+        this.lifespanTimeUnit = lifespanTimeUnit;
+    }
+
+    @ManagedAttribute(description = "The key lifespan time unit for the lock")
+    public TimeUnit getLifespanTimeUnit() {
+        return lifespanTimeUnit;
+    }
+
+    public void setLifespanTimeUnit(TimeUnit lifespanTimeUnit) {
+        this.lifespanTimeUnit = lifespanTimeUnit;
+    }
+
+    @ManagedAttribute(description = "Is this route the master or a slave")
+    public boolean isLeader() {
+        return leader.get();
+    }
+
+    // *************************************************************************
+    //
+    // *************************************************************************
+
+    @Listener(clustered = true, sync = false)
+    private final class EmbeddedCacheService extends ServiceSupport implements Runnable {
+        private Cache<String, String> cache;
+        private ScheduledFuture<?> future;
+
+        public EmbeddedCacheService(Cache<String, String> cache) {
+            this.cache = cache;
+            this.future = null;
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+            this.future = executorService.scheduleAtFixedRate(this::run, 0, lifespan / 2, lifespanTimeUnit);
+            this.cache.addListener(this);
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+            this.cache.removeListener(this);
+            this.cache.remove(lockKey, lockValue);
+
+            if (future != null) {
+                future.cancel(true);
+                future = null;
+            }
+        }
+
+        @Override
+        public void run() {
+            if (!isRunAllowed() || !InfinispanRoutePolicy.this.isRunAllowed()) {
+                return;
+            }
+
+            if (isLeader()) {
+                // I'm still the leader, so refresh the key so it does not expire.
+                if (!cache.replace(lockKey, lockValue, lockValue, lifespan, lifespanTimeUnit)) {
+                    // Looks like I've lost the leadership.
+                    setLeader(false);
+                }
+            }
+
+            if (!isLeader()) {
+                Object result = cache.putIfAbsent(lockKey, lockValue, lifespan, lifespanTimeUnit);
+                if (result == null) {
+                    // Acquired the key so I'm the leader.
+                    setLeader(true);
+                } else if (ObjectHelper.equal(lockValue, result) && !isLeader()) {
+                    // Hey, I may have recovered from failure (or reboot was really
+                    // fast) and my key was still there so yeah, I'm the leader again!
+                    setLeader(true);
+                } else {
+                    setLeader(false);
+                }
+            }
+        }
+
+        @CacheEntryRemoved
+        public void onCacheEntryRemoved(CacheEntryEvent<Object, Object> event) {
+            if (ObjectHelper.equal(lockKey, event.getKey())) {
+                run();
+            }
+        }
+        @CacheEntryExpired
+        public void onCacheEntryExpired(CacheEntryEvent<Object, Object> event) {
+            if (ObjectHelper.equal(lockKey, event.getKey())) {
+                run();
+            }
+        }
+    }
+
+    @ClientListener
+    private final class RemoteCacheService extends ServiceSupport  implements Runnable {
+        private RemoteCache<String, String> cache;
+        private ScheduledFuture<?> future;
+        private Long version;
+
+        public RemoteCacheService(RemoteCache<String, String> cache) {
+            this.cache = cache;
+            this.future = null;
+            this.version = null;
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+            this.future = executorService.scheduleAtFixedRate(this::run, 0, lifespan / 2, lifespanTimeUnit);
+            this.cache.addClientListener(this);
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+            this.cache.removeClientListener(this);
+
+            if (this.version != null) {
+                this.cache.removeWithVersion(lockKey, this.version);
+            }
+
+            if (future != null) {
+                future.cancel(true);
+                future = null;
+            }
+        }
+
+        @Override
+        public void run() {
+            if (!isRunAllowed() || !InfinispanRoutePolicy.this.isRunAllowed()) {
+                return;
+            }
+
+            if (isLeader() && version != null) {
+                LOGGER.debug("Lock refresh key={} with version={}", lockKey, version);
+
+                // I'm still the leader, so refresh the key so it does not expire.
+                if (!cache.replaceWithVersion(lockKey, lockValue, version, (int)lifespanTimeUnit.toSeconds(lifespan))) {
+                    // Looks like I've lost the leadership.
+                    setLeader(false);
+                }
+            }
+
+            if (!isLeader()) {
+                Object result = cache.putIfAbsent(lockKey, lockValue, lifespan, lifespanTimeUnit);
+                if (result == null) {
+                    // Acquired the key so I'm the leader.
+                    setLeader(true);
+
+                    // Get the version
+                    version = cache.getWithMetadata(lockKey).getVersion();
+
+                    LOGGER.debug("Lock acquired key={} with version={}", lockKey, version);
+                } else if (ObjectHelper.equal(lockValue, result) && !isLeader()) {
+                    // Hey, I may have recovered from failure (or reboot was really
+                    // fast) and my key was still there so yeah, I'm the leader again!
+                    setLeader(true);
+
+                    // Get the version
+                    version = cache.getWithMetadata(lockKey).getVersion();
+
+                    LOGGER.debug("Lock resumed key={} with version={}", lockKey, version);
+                } else {
+                    setLeader(false);
+                }
+            }
+        }
+
+        @ClientCacheEntryRemoved
+        public void onCacheEntryRemoved(ClientCacheEntryRemovedEvent<Object> event) {
+            if (ObjectHelper.equal(lockKey, event.getKey())) {
+                run();
+            }
+        }
+
+        @ClientCacheEntryExpired
+        public void onCacheEntryExpired(ClientCacheEntryExpiredEvent<Object> event) {
+            if (ObjectHelper.equal(lockKey, event.getKey())) {
+                run();
+            }
+        }
+    }
+
+    // *************************************************************************
+    // Helpers
+    // *************************************************************************
+
+    public static InfinispanRoutePolicy withManager(BasicCacheContainer cacheContainer) {
+        InfinispanConfiguration conf = new InfinispanConfiguration();
+        conf.setCacheContainer(cacheContainer);
+
+        return new InfinispanRoutePolicy(conf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanEmbeddedRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanEmbeddedRoutePolicyTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanEmbeddedRoutePolicyTest.java
new file mode 100644
index 0000000..b029638
--- /dev/null
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanEmbeddedRoutePolicyTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.policy;
+
+import org.infinispan.commons.api.BasicCacheContainer;
+import org.infinispan.manager.DefaultCacheManager;
+
+public class InfinispanEmbeddedRoutePolicyTest extends InfinispanRoutePolicyTestBase {
+
+    @Override
+    protected BasicCacheContainer createCacheManager() throws Exception {
+        return new DefaultCacheManager(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRemoteRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRemoteRoutePolicyTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRemoteRoutePolicyTest.java
new file mode 100644
index 0000000..5d80d77
--- /dev/null
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRemoteRoutePolicyTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.policy;
+
+import java.util.Properties;
+
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
+import org.infinispan.commons.api.BasicCacheContainer;
+import org.junit.Ignore;
+
+@Ignore("Disabled as it requires a transactional cache")
+public class InfinispanRemoteRoutePolicyTest extends InfinispanRoutePolicyTestBase {
+
+    @Override
+    protected BasicCacheContainer createCacheManager() throws Exception {
+        Properties props = new Properties();
+        props.setProperty("infinispan.client.hotrod.server_list", "127.0.0.1:11222");
+        props.setProperty("infinispan.client.hotrod.force_return_values", "true");
+
+        return new RemoteCacheManager(
+            new ConfigurationBuilder().withProperties(props).build(),
+            true
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java
new file mode 100644
index 0000000..2091ed6
--- /dev/null
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.policy;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.infinispan.commons.api.BasicCacheContainer;
+import org.junit.Assert;
+import org.junit.Test;
+
+abstract class InfinispanRoutePolicyTestBase extends CamelTestSupport {
+    protected BasicCacheContainer cacheManager;
+    protected InfinispanRoutePolicy policy1;
+    protected InfinispanRoutePolicy policy2;
+
+    @Override
+    protected void doPreSetup() throws Exception {
+        this.cacheManager = createCacheManager();
+
+        this.policy1 = InfinispanRoutePolicy.withManager(cacheManager);
+        this.policy1.setLockMapName("camel-route-policy");
+        this.policy1.setLockKey("route-policy");
+        this.policy1.setLockValue("route1");
+
+        this.policy2 = InfinispanRoutePolicy.withManager(cacheManager);
+        this.policy2.setLockMapName("camel-route-policy");
+        this.policy2.setLockKey("route-policy");
+        this.policy2.setLockValue("route2");
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+
+        if (this.cacheManager != null) {
+            this.cacheManager.stop();
+        }
+    }
+
+    protected abstract BasicCacheContainer createCacheManager() throws Exception;
+
+    // *******************************************
+    //
+    // *******************************************
+
+    @Test
+    public void testLeadership()throws Exception {
+        context.startRoute("route1");
+        while(!policy1.isLeader()) {
+            Thread.sleep(250);
+        }
+
+        context.startRoute("route2");
+        Thread.sleep(500);
+
+        Assert.assertTrue(policy1.isLeader());
+        Assert.assertFalse(policy2.isLeader());
+
+        context.stopRoute("route1");
+        while(!policy2.isLeader()) {
+            Thread.sleep(250);
+        }
+
+        Assert.assertFalse(policy1.isLeader());
+        Assert.assertTrue(policy2.isLeader());
+
+        context.startRoute("route1");
+        Thread.sleep(500);
+
+        Assert.assertFalse(policy1.isLeader());
+        Assert.assertTrue(policy2.isLeader());
+
+        context.stopRoute("route2");
+        while(!policy1.isLeader()) {
+            Thread.sleep(250);
+        }
+
+        Assert.assertTrue(policy1.isLeader());
+        Assert.assertFalse(policy2.isLeader());
+    }
+
+    // *******************************************
+    //
+    // *******************************************
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:route1")
+                    .routeId("route1")
+                    .autoStartup(false)
+                    .routePolicy(policy1)
+                    .to("log:org.apache.camel.component.infinispan.policy.1?level=INFO&showAll=true");
+                from("direct:route2")
+                    .routeId("route2")
+                    .autoStartup(false)
+                    .routePolicy(policy2)
+                    .to("log:org.apache.camel.component.infinispan.policy.2?level=INFO&showAll=true");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/resources/log4j.xml b/components/camel-infinispan/src/test/resources/log4j.xml
index 71a2b38..d041f18 100644
--- a/components/camel-infinispan/src/test/resources/log4j.xml
+++ b/components/camel-infinispan/src/test/resources/log4j.xml
@@ -64,7 +64,7 @@
 
     <root>
         <priority value="INFO" />
-        <appender-ref ref="FILE" />
+        <appender-ref ref="CONSOLE" />
     </root>
 
 </log4j:configuration>