You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/04/07 15:37:11 UTC
camel git commit: CAMEL-10287: Infinispan RoutePolicy to have one
route being master, and others as slaves
Repository: camel
Updated Branches:
refs/heads/master bef3d9224 -> dbd68347b
CAMEL-10287: Infinispan RoutePolicy to have one route being master, and others as slaves
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dbd68347
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dbd68347
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dbd68347
Branch: refs/heads/master
Commit: dbd68347b29ce7813262ed2a63fb31e36e94a034
Parents: bef3d92
Author: lburgazzoli <lb...@gmail.com>
Authored: Wed Apr 5 18:37:21 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Fri Apr 7 17:36:32 2017 +0200
----------------------------------------------------------------------
components/camel-infinispan/pom.xml | 14 +-
.../infinispan/InfinispanConfiguration.java | 26 +
.../component/infinispan/InfinispanManager.java | 44 +-
.../component/infinispan/InfinispanUtil.java | 2 +-
.../policy/InfinispanRoutePolicy.java | 522 +++++++++++++++++++
.../InfinispanEmbeddedRoutePolicyTest.java | 28 +
.../policy/InfinispanRemoteRoutePolicyTest.java | 40 ++
.../policy/InfinispanRoutePolicyTestBase.java | 118 +++++
.../src/test/resources/log4j.xml | 2 +-
9 files changed, 771 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/pom.xml b/components/camel-infinispan/pom.xml
index ddd590d..ea8751c 100644
--- a/components/camel-infinispan/pom.xml
+++ b/components/camel-infinispan/pom.xml
@@ -112,31 +112,25 @@
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
- <version>${log4j2-25-version}</version>
+ <version>${log4j2-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
- <version>${log4j2-25-version}</version>
+ <version>${log4j2-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
- <version>${log4j2-25-version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-1.2-api</artifactId>
- <version>${log4j2-25-version}</version>
+ <version>${log4j2-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-jcl</artifactId>
- <version>${log4j2-25-version}</version>
+ <version>${log4j2-version}</version>
<scope>test</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
index 0ab5852..0835974 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanConfiguration.java
@@ -17,7 +17,9 @@
package org.apache.camel.component.infinispan;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import org.apache.camel.spi.Metadata;
@@ -60,6 +62,8 @@ public class InfinispanConfiguration {
private Flag[] flags;
@UriParam(label = "advanced")
private String configurationUri;
+ @UriParam(label = "advanced")
+ private Map<String, String> configurationProperties;
public String getCommand() {
@@ -223,4 +227,26 @@ public class InfinispanConfiguration {
public void setConfigurationUri(String configurationUri) {
this.configurationUri = configurationUri;
}
+
+ public Map<String, String> getConfigurationProperties() {
+ return configurationProperties;
+ }
+
+ /**
+ * Infinispan configuration properties.
+ */
+ public void setConfigurationProperties(Map<String, String> configurationProperties) {
+ this.configurationProperties = configurationProperties;
+ }
+
+ /**
+ * Add configuration
+ */
+ public void addConfigurationProperty(String key, String value) {
+ if (this.configurationProperties == null) {
+ this.configurationProperties = new HashMap<>();
+ }
+
+ this.configurationProperties.put(key, value);
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
index eab846b..8b739bb 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanManager.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.infinispan;
+import java.util.Properties;
+
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Service;
@@ -25,6 +27,7 @@ import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.commons.api.BasicCache;
import org.infinispan.commons.api.BasicCacheContainer;
+import org.infinispan.manager.DefaultCacheManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +39,11 @@ public class InfinispanManager implements Service {
private BasicCacheContainer cacheContainer;
private boolean isManagedCacheContainer;
+ public InfinispanManager() {
+ this.camelContext = null;
+ this.configuration = new InfinispanConfiguration();
+ this.configuration.setCacheContainer(new DefaultCacheManager(true));
+ }
public InfinispanManager(InfinispanConfiguration configuration) {
this(null, configuration);
@@ -50,24 +58,34 @@ public class InfinispanManager implements Service {
public void start() throws Exception {
cacheContainer = configuration.getCacheContainer();
if (cacheContainer == null) {
+ ConfigurationBuilder builder = new ConfigurationBuilder();
+ builder.classLoader(Thread.currentThread().getContextClassLoader());
+
+ Properties properties = new Properties();
+
String uri = configuration.getConfigurationUri();
if (uri != null && camelContext != null) {
uri = camelContext.resolvePropertyPlaceholders(uri);
}
- ConfigurationBuilder configurationBuilder = new ConfigurationBuilder()
- .classLoader(Thread.currentThread().getContextClassLoader());
-
if (uri != null) {
- configurationBuilder.withProperties(InfinispanUtil.loadProperties(camelContext, uri));
+ properties.putAll(InfinispanUtil.loadProperties(camelContext, uri));
+ }
+ if (configuration.getConfigurationProperties() != null) {
+ properties.putAll(configuration.getConfigurationProperties());
}
- cacheContainer = new RemoteCacheManager(
- configurationBuilder
- .addServers(configuration.getHost())
- .build(),
- true);
+ if (!properties.isEmpty()) {
+ builder.withProperties(properties);
+ }
+
+ if (configuration.getHost() != null) {
+ builder.addServers(configuration.getHost());
+ }
+
+
+ cacheContainer = new RemoteCacheManager(builder.build(), true);
isManagedCacheContainer = true;
}
}
@@ -91,22 +109,22 @@ public class InfinispanManager implements Service {
return InfinispanUtil.isRemote(cacheContainer);
}
- public BasicCache<Object, Object> getCache() {
+ public <K, V > BasicCache<K, V> getCache() {
return getCache(configuration.getCacheName());
}
- public BasicCache<Object, Object> getCache(Exchange exchange) {
+ public <K, V > BasicCache<K, V> getCache(Exchange exchange) {
return getCache(exchange.getIn().getHeader(InfinispanConstants.CACHE_NAME, String.class));
}
- public BasicCache<Object, Object> getCache(String cacheName) {
+ public <K, V > BasicCache<K, V> getCache(String cacheName) {
if (cacheName == null) {
cacheName = configuration.getCacheName();
}
LOGGER.trace("Cache[{}]", cacheName);
- BasicCache<Object, Object> cache = InfinispanUtil.getCache(cacheContainer, cacheName);
+ BasicCache<K, V> cache = InfinispanUtil.getCache(cacheContainer, cacheName);
if (configuration.hasFlags() && InfinispanUtil.isEmbedded(cache)) {
cache = new DecoratedCache(InfinispanUtil.asAdvanced(cache), configuration.getFlags());
}
http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
index f562bb2..4de6319 100644
--- a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
@@ -106,7 +106,7 @@ public final class InfinispanUtil {
return ObjectHelper.isEmpty(message.getHeader(header));
}
- public static BasicCache<Object, Object> getCache(BasicCacheContainer cacheContainer, String cacheName) {
+ public static <K, V> BasicCache<K, V> getCache(BasicCacheContainer cacheContainer, String cacheName) {
return ObjectHelper.isEmpty(cacheName) ? cacheContainer.getCache() : cacheContainer.getCache(cacheName);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java
new file mode 100644
index 0000000..7103edf
--- /dev/null
+++ b/components/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicy.java
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.policy;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Route;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.Service;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.component.infinispan.InfinispanConfiguration;
+import org.apache.camel.component.infinispan.InfinispanManager;
+import org.apache.camel.component.infinispan.InfinispanUtil;
+import org.apache.camel.support.RoutePolicySupport;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StringHelper;
+import org.infinispan.Cache;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
+import org.infinispan.client.hotrod.annotation.ClientListener;
+import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
+import org.infinispan.commons.api.BasicCache;
+import org.infinispan.commons.api.BasicCacheContainer;
+import org.infinispan.notifications.Listener;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
+import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ManagedResource(description = "Route policy using Infinispan as clustered lock")
+public class InfinispanRoutePolicy extends RoutePolicySupport implements CamelContextAware {
+ private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanRoutePolicy.class);
+
+ private final AtomicBoolean leader;
+ private final Set<Route> suspendedRoutes;
+ private final InfinispanManager manager;
+
+ private Route route;
+ private CamelContext camelContext;
+ private ScheduledExecutorService executorService;
+ private boolean shouldStopConsumer;
+ private String lockMapName;
+ private String lockKey;
+ private String lockValue;
+ private long lifespan;
+ private TimeUnit lifespanTimeUnit;
+ private ScheduledFuture<?> future;
+ private Service service;
+
+ public InfinispanRoutePolicy(InfinispanConfiguration configuration) {
+ this(new InfinispanManager(configuration), null, null);
+ }
+
+ public InfinispanRoutePolicy(InfinispanManager manager) {
+ this(manager, null, null);
+ }
+
+ public InfinispanRoutePolicy(InfinispanManager manager, String lockKey, String lockValue) {
+ this.manager = manager;
+ this.suspendedRoutes = new HashSet<>();
+ this.leader = new AtomicBoolean(false);
+ this.shouldStopConsumer = true;
+ this.lockKey = lockKey;
+ this.lockValue = lockValue;
+ this.lifespan = 30;
+ this.lifespanTimeUnit = TimeUnit.SECONDS;
+ this.service = null;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public void onInit(Route route) {
+ super.onInit(route);
+ this.route = route;
+ }
+
+ @Override
+ public void onStart(Route route) {
+ try {
+ startService();
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
+
+ if (!leader.get() && shouldStopConsumer) {
+ stopConsumer(route);
+ }
+ }
+
+ @Override
+ public synchronized void onStop(Route route) {
+ try {
+ stopService();
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
+
+ suspendedRoutes.remove(route);
+ }
+
+ @Override
+ public synchronized void onSuspend(Route route) {
+ try {
+ stopService();
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
+
+ suspendedRoutes.remove(route);
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ // validate
+ StringHelper.notEmpty(lockMapName, "lockMapName", this);
+ StringHelper.notEmpty(lockKey, "lockKey", this);
+ StringHelper.notEmpty(lockValue, "lockValue", this);
+ ObjectHelper.notNull(camelContext, "camelContext", this);
+
+ if (this.lockValue == null) {
+ this.lockValue = camelContext.getUuidGenerator().generateUuid();
+ }
+
+ this.manager.start();
+ this.executorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "InfinispanRoutePolicy");
+
+ if (lifespanTimeUnit.convert(lifespan, TimeUnit.SECONDS) < 2) {
+ throw new IllegalArgumentException("Lock lifespan can not be less that 2 seconds");
+ }
+
+ BasicCache<String, String> cache = manager.getCache(lockMapName);
+ if (manager.isCacheContainerEmbedded()) {
+ this.service = new EmbeddedCacheService(InfinispanUtil.asEmbedded(cache));
+ } else {
+ this.service = new RemoteCacheService(InfinispanUtil.asRemote(cache));
+ }
+
+ super.doStart();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (future != null) {
+ future.cancel(true);
+ future = null;
+ }
+
+ if (this.service != null) {
+ this.service.stop();
+ }
+
+ getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
+
+ leader.set(false);
+ manager.stop();
+
+ super.doStop();
+ }
+
+ private void startService() throws Exception {
+ if (service == null) {
+ throw new IllegalStateException("An Infinispan CacheService should be configured");
+ }
+
+ service.start();
+ }
+
+ private void stopService() throws Exception {
+ leader.set(false);
+
+ if (this.service != null) {
+ this.service.stop();
+ }
+ }
+
+ // *************************************************************************
+ //
+ // *************************************************************************
+
+ protected void setLeader(boolean isLeader) {
+ if (isLeader && leader.compareAndSet(false, isLeader)) {
+ LOGGER.info("Leadership taken (map={}, key={}, val={})", lockMapName, lockKey, lockValue);
+
+ startAllStoppedConsumers();
+ } else if (!isLeader && leader.getAndSet(isLeader)) {
+ LOGGER.info("Leadership lost (map={}, key={} val={})", lockMapName, lockKey, lockValue);
+ }
+
+ if (!isLeader && this.route != null) {
+ stopConsumer(route);
+ }
+ }
+
+ private synchronized void startConsumer(Route route) {
+ try {
+ if (suspendedRoutes.contains(route)) {
+ startConsumer(route.getConsumer());
+ suspendedRoutes.remove(route);
+ }
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+
+ private synchronized void stopConsumer(Route route) {
+ try {
+ if (!suspendedRoutes.contains(route)) {
+ LOGGER.debug("Stopping consumer for {} ({})", route.getId(), route.getConsumer());
+ stopConsumer(route.getConsumer());
+ suspendedRoutes.add(route);
+ }
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+
+ private synchronized void startAllStoppedConsumers() {
+ try {
+ for (Route route : suspendedRoutes) {
+ LOGGER.debug("Starting consumer for {} ({})", route.getId(), route.getConsumer());
+ startConsumer(route.getConsumer());
+ }
+
+ suspendedRoutes.clear();
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+
+ // *************************************************************************
+ // Getter/Setters
+ // *************************************************************************
+
+ @ManagedAttribute(description = "The route id")
+ public String getRouteId() {
+ if (route != null) {
+ return route.getId();
+ }
+ return null;
+ }
+
+ @ManagedAttribute(description = "The consumer endpoint", mask = true)
+ public String getEndpointUrl() {
+ if (route != null && route.getConsumer() != null && route.getConsumer().getEndpoint() != null) {
+ return route.getConsumer().getEndpoint().toString();
+ }
+ return null;
+ }
+
+ @ManagedAttribute(description = "Whether to stop consumer when starting up and failed to become master")
+ public boolean isShouldStopConsumer() {
+ return shouldStopConsumer;
+ }
+
+ public void setShouldStopConsumer(boolean shouldStopConsumer) {
+ this.shouldStopConsumer = shouldStopConsumer;
+ }
+
+ @ManagedAttribute(description = "The lock map name")
+ public String getLockMapName() {
+ return lockMapName;
+ }
+
+ public void setLockMapName(String lockMapName) {
+ this.lockMapName = lockMapName;
+ }
+
+ @ManagedAttribute(description = "The lock key")
+ public String getLockKey() {
+ return lockKey;
+ }
+
+ public void setLockKey(String lockKey) {
+ this.lockKey = lockKey;
+ }
+
+ @ManagedAttribute(description = "The lock value")
+ public String getLockValue() {
+ return lockValue;
+ }
+
+ public void setLockValue(String lockValue) {
+ this.lockValue = lockValue;
+ }
+
+
+ @ManagedAttribute(description = "The key lifespan for the lock")
+ public long getLifespan() {
+ return lifespan;
+ }
+
+ public void setLifespan(long lifespan) {
+ this.lifespan = lifespan;
+ }
+
+ public void setLifespan(long lifespan, TimeUnit lifespanTimeUnit) {
+ this.lifespan = lifespan;
+ this.lifespanTimeUnit = lifespanTimeUnit;
+ }
+
+ @ManagedAttribute(description = "The key lifespan time unit for the lock")
+ public TimeUnit getLifespanTimeUnit() {
+ return lifespanTimeUnit;
+ }
+
+ public void setLifespanTimeUnit(TimeUnit lifespanTimeUnit) {
+ this.lifespanTimeUnit = lifespanTimeUnit;
+ }
+
+ @ManagedAttribute(description = "Is this route the master or a slave")
+ public boolean isLeader() {
+ return leader.get();
+ }
+
+ // *************************************************************************
+ //
+ // *************************************************************************
+
+ @Listener(clustered = true, sync = false)
+ private final class EmbeddedCacheService extends ServiceSupport implements Runnable {
+ private Cache<String, String> cache;
+ private ScheduledFuture<?> future;
+
+ public EmbeddedCacheService(Cache<String, String> cache) {
+ this.cache = cache;
+ this.future = null;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ this.future = executorService.scheduleAtFixedRate(this::run, 0, lifespan / 2, lifespanTimeUnit);
+ this.cache.addListener(this);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ this.cache.removeListener(this);
+ this.cache.remove(lockKey, lockValue);
+
+ if (future != null) {
+ future.cancel(true);
+ future = null;
+ }
+ }
+
+ @Override
+ public void run() {
+ if (!isRunAllowed() || !InfinispanRoutePolicy.this.isRunAllowed()) {
+ return;
+ }
+
+ if (isLeader()) {
+ // I'm still the leader, so refresh the key so it does not expire.
+ if (!cache.replace(lockKey, lockValue, lockValue, lifespan, lifespanTimeUnit)) {
+ // Looks like I've lost the leadership.
+ setLeader(false);
+ }
+ }
+
+ if (!isLeader()) {
+ Object result = cache.putIfAbsent(lockKey, lockValue, lifespan, lifespanTimeUnit);
+ if (result == null) {
+ // Acquired the key so I'm the leader.
+ setLeader(true);
+ } else if (ObjectHelper.equal(lockValue, result) && !isLeader()) {
+ // Hey, I may have recovered from failure (or reboot was really
+ // fast) and my key was still there so yeah, I'm the leader again!
+ setLeader(true);
+ } else {
+ setLeader(false);
+ }
+ }
+ }
+
+ @CacheEntryRemoved
+ public void onCacheEntryRemoved(CacheEntryEvent<Object, Object> event) {
+ if (ObjectHelper.equal(lockKey, event.getKey())) {
+ run();
+ }
+ }
+ @CacheEntryExpired
+ public void onCacheEntryExpired(CacheEntryEvent<Object, Object> event) {
+ if (ObjectHelper.equal(lockKey, event.getKey())) {
+ run();
+ }
+ }
+ }
+
+ @ClientListener
+ private final class RemoteCacheService extends ServiceSupport implements Runnable {
+ private RemoteCache<String, String> cache;
+ private ScheduledFuture<?> future;
+ private Long version;
+
+ public RemoteCacheService(RemoteCache<String, String> cache) {
+ this.cache = cache;
+ this.future = null;
+ this.version = null;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ this.future = executorService.scheduleAtFixedRate(this::run, 0, lifespan / 2, lifespanTimeUnit);
+ this.cache.addClientListener(this);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ this.cache.removeClientListener(this);
+
+ if (this.version != null) {
+ this.cache.removeWithVersion(lockKey, this.version);
+ }
+
+ if (future != null) {
+ future.cancel(true);
+ future = null;
+ }
+ }
+
+ @Override
+ public void run() {
+ if (!isRunAllowed() || !InfinispanRoutePolicy.this.isRunAllowed()) {
+ return;
+ }
+
+ if (isLeader() && version != null) {
+ LOGGER.debug("Lock refresh key={} with version={}", lockKey, version);
+
+ // I'm still the leader, so refresh the key so it does not expire.
+ if (!cache.replaceWithVersion(lockKey, lockValue, version, (int)lifespanTimeUnit.toSeconds(lifespan))) {
+ // Looks like I've lost the leadership.
+ setLeader(false);
+ }
+ }
+
+ if (!isLeader()) {
+ Object result = cache.putIfAbsent(lockKey, lockValue, lifespan, lifespanTimeUnit);
+ if (result == null) {
+ // Acquired the key so I'm the leader.
+ setLeader(true);
+
+ // Get the version
+ version = cache.getWithMetadata(lockKey).getVersion();
+
+ LOGGER.debug("Lock acquired key={} with version={}", lockKey, version);
+ } else if (ObjectHelper.equal(lockValue, result) && !isLeader()) {
+ // Hey, I may have recovered from failure (or reboot was really
+ // fast) and my key was still there so yeah, I'm the leader again!
+ setLeader(true);
+
+ // Get the version
+ version = cache.getWithMetadata(lockKey).getVersion();
+
+ LOGGER.debug("Lock resumed key={} with version={}", lockKey, version);
+ } else {
+ setLeader(false);
+ }
+ }
+ }
+
+ @ClientCacheEntryRemoved
+ public void onCacheEntryRemoved(ClientCacheEntryRemovedEvent<Object> event) {
+ if (ObjectHelper.equal(lockKey, event.getKey())) {
+ run();
+ }
+ }
+
+ @ClientCacheEntryExpired
+ public void onCacheEntryExpired(ClientCacheEntryExpiredEvent<Object> event) {
+ if (ObjectHelper.equal(lockKey, event.getKey())) {
+ run();
+ }
+ }
+ }
+
+ // *************************************************************************
+ // Helpers
+ // *************************************************************************
+
+ public static InfinispanRoutePolicy withManager(BasicCacheContainer cacheContainer) {
+ InfinispanConfiguration conf = new InfinispanConfiguration();
+ conf.setCacheContainer(cacheContainer);
+
+ return new InfinispanRoutePolicy(conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanEmbeddedRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanEmbeddedRoutePolicyTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanEmbeddedRoutePolicyTest.java
new file mode 100644
index 0000000..b029638
--- /dev/null
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanEmbeddedRoutePolicyTest.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.policy;
+
+import org.infinispan.commons.api.BasicCacheContainer;
+import org.infinispan.manager.DefaultCacheManager;
+
+public class InfinispanEmbeddedRoutePolicyTest extends InfinispanRoutePolicyTestBase {
+
+ @Override
+ protected BasicCacheContainer createCacheManager() throws Exception {
+ return new DefaultCacheManager(true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRemoteRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRemoteRoutePolicyTest.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRemoteRoutePolicyTest.java
new file mode 100644
index 0000000..5d80d77
--- /dev/null
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRemoteRoutePolicyTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.policy;
+
+import java.util.Properties;
+
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
+import org.infinispan.commons.api.BasicCacheContainer;
+import org.junit.Ignore;
+
+@Ignore("Disabled as it requires a transactional cache")
+public class InfinispanRemoteRoutePolicyTest extends InfinispanRoutePolicyTestBase {
+
+ @Override
+ protected BasicCacheContainer createCacheManager() throws Exception {
+ Properties props = new Properties();
+ props.setProperty("infinispan.client.hotrod.server_list", "127.0.0.1:11222");
+ props.setProperty("infinispan.client.hotrod.force_return_values", "true");
+
+ return new RemoteCacheManager(
+ new ConfigurationBuilder().withProperties(props).build(),
+ true
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java
new file mode 100644
index 0000000..2091ed6
--- /dev/null
+++ b/components/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/policy/InfinispanRoutePolicyTestBase.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.policy;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.infinispan.commons.api.BasicCacheContainer;
+import org.junit.Assert;
+import org.junit.Test;
+
+abstract class InfinispanRoutePolicyTestBase extends CamelTestSupport {
+ protected BasicCacheContainer cacheManager;
+ protected InfinispanRoutePolicy policy1;
+ protected InfinispanRoutePolicy policy2;
+
+ @Override
+ protected void doPreSetup() throws Exception {
+ this.cacheManager = createCacheManager();
+
+ this.policy1 = InfinispanRoutePolicy.withManager(cacheManager);
+ this.policy1.setLockMapName("camel-route-policy");
+ this.policy1.setLockKey("route-policy");
+ this.policy1.setLockValue("route1");
+
+ this.policy2 = InfinispanRoutePolicy.withManager(cacheManager);
+ this.policy2.setLockMapName("camel-route-policy");
+ this.policy2.setLockKey("route-policy");
+ this.policy2.setLockValue("route2");
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ if (this.cacheManager != null) {
+ this.cacheManager.stop();
+ }
+ }
+
+ protected abstract BasicCacheContainer createCacheManager() throws Exception;
+
+ // *******************************************
+ //
+ // *******************************************
+
+ @Test
+ public void testLeadership()throws Exception {
+ context.startRoute("route1");
+ while(!policy1.isLeader()) {
+ Thread.sleep(250);
+ }
+
+ context.startRoute("route2");
+ Thread.sleep(500);
+
+ Assert.assertTrue(policy1.isLeader());
+ Assert.assertFalse(policy2.isLeader());
+
+ context.stopRoute("route1");
+ while(!policy2.isLeader()) {
+ Thread.sleep(250);
+ }
+
+ Assert.assertFalse(policy1.isLeader());
+ Assert.assertTrue(policy2.isLeader());
+
+ context.startRoute("route1");
+ Thread.sleep(500);
+
+ Assert.assertFalse(policy1.isLeader());
+ Assert.assertTrue(policy2.isLeader());
+
+ context.stopRoute("route2");
+ while(!policy1.isLeader()) {
+ Thread.sleep(250);
+ }
+
+ Assert.assertTrue(policy1.isLeader());
+ Assert.assertFalse(policy2.isLeader());
+ }
+
+ // *******************************************
+ //
+ // *******************************************
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:route1")
+ .routeId("route1")
+ .autoStartup(false)
+ .routePolicy(policy1)
+ .to("log:org.apache.camel.component.infinispan.policy.1?level=INFO&showAll=true");
+ from("direct:route2")
+ .routeId("route2")
+ .autoStartup(false)
+ .routePolicy(policy2)
+ .to("log:org.apache.camel.component.infinispan.policy.2?level=INFO&showAll=true");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/dbd68347/components/camel-infinispan/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/components/camel-infinispan/src/test/resources/log4j.xml b/components/camel-infinispan/src/test/resources/log4j.xml
index 71a2b38..d041f18 100644
--- a/components/camel-infinispan/src/test/resources/log4j.xml
+++ b/components/camel-infinispan/src/test/resources/log4j.xml
@@ -64,7 +64,7 @@
<root>
<priority value="INFO" />
- <appender-ref ref="FILE" />
+ <appender-ref ref="CONSOLE" />
</root>
</log4j:configuration>