You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2021/01/29 10:21:18 UTC
[camel] branch master updated: camel-infinispan: implement route
policy using clustering facilities
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new a4f6087 camel-infinispan: implement route policy using clustering facilities
a4f6087 is described below
commit a4f6087b2a48b6dbe10621f45ad975239a374e03
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Wed Jan 27 18:15:52 2021 +0100
camel-infinispan: implement route policy using clustering facilities
---
.../camel-infinispan-common/pom.xml | 4 +
.../infinispan/InfinispanRoutePolicy.java | 246 ----------------
.../camel/component/infinispan/InfinispanUtil.java | 18 ++
.../cluster/InfinispanClusterConfiguration.java | 74 +++++
.../cluster/InfinispanClusterService.java | 23 ++
.../infinispan/cluster/InfinispanClusterView.java | 117 ++++++++
.../InfinispanRoutePolicyTestSupport.java | 58 ----
.../camel-infinispan-embedded/pom.xml | 14 +-
.../embedded/InfinispanEmbeddedRoutePolicy.java | 150 ----------
.../InfinispanEmbeddedClusterConfiguration.java | 60 ++++
.../cluster/InfinispanEmbeddedClusterService.java | 102 +++++++
.../cluster/InfinispanEmbeddedClusterView.java | 289 +++++++++++++++++++
.../InfinispanEmbeddedRoutePolicyTest.java | 98 -------
.../AbstractInfinispanEmbeddedClusteredTest.java | 73 +++++
.../InfinispanEmbeddedClusteredMasterTest.java | 59 ++++
...panEmbeddedClusteredRoutePolicyFactoryTest.java | 61 ++++
...InfinispanEmbeddedClusteredRoutePolicyTest.java | 61 ++++
.../InfinispanEmbeddedClusteredTestSupport.java | 38 +++
.../InfinispanEmbeddedClusteredViewTest.java | 72 +++++
.../src/test/resources/log4j2.properties | 4 +
.../camel-infinispan/camel-infinispan/pom.xml | 14 +-
.../remote/InfinispanRemoteRoutePolicy.java | 174 ------------
.../InfinispanRemoteClusterConfiguration.java | 130 +++++++++
.../cluster/InfinispanRemoteClusterService.java | 98 +++++++
.../cluster/InfinispanRemoteClusterView.java | 316 +++++++++++++++++++++
.../remote/InfinispanRemoteRoutePolicyTest.java | 124 --------
.../AbstractInfinispanRemoteClusteredTest.java | 84 ++++++
.../InfinispanRemoteClusteredMasterTest.java | 59 ++++
...ispanRemoteClusteredRoutePolicyFactoryTest.java | 61 ++++
.../InfinispanRemoteClusteredRoutePolicyTest.java | 61 ++++
.../InfinispanRemoteClusteredTestSupport.java | 53 ++++
.../cluster/InfinispanRemoteClusteredViewTest.java | 81 ++++++
.../src/test/resources/log4j2.properties | 2 +
33 files changed, 2020 insertions(+), 858 deletions(-)
diff --git a/components/camel-infinispan/camel-infinispan-common/pom.xml b/components/camel-infinispan/camel-infinispan-common/pom.xml
index 3989a98..1c857bf 100644
--- a/components/camel-infinispan/camel-infinispan-common/pom.xml
+++ b/components/camel-infinispan/camel-infinispan-common/pom.xml
@@ -42,6 +42,10 @@
<artifactId>camel-support</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-cluster</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-core</artifactId>
<version>${infinispan-version}</version>
diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanRoutePolicy.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanRoutePolicy.java
deleted file mode 100644
index 93f5fe5..0000000
--- a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanRoutePolicy.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.Route;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.Service;
-import org.apache.camel.api.management.ManagedAttribute;
-import org.apache.camel.support.RoutePolicySupport;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ReferenceCount;
-import org.apache.camel.util.StringHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class InfinispanRoutePolicy extends RoutePolicySupport implements CamelContextAware {
- private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanRoutePolicy.class);
-
- private final AtomicBoolean leader;
- private final Set<Route> startedRoutes;
- private final Set<Route> stoppeddRoutes;
- private final ReferenceCount refCount;
-
- private CamelContext camelContext;
- private boolean shouldStopRoute;
- private String lockMapName;
- private String lockKey;
- private String lockValue;
- private long lifespan;
- private TimeUnit lifespanTimeUnit;
- private Service service;
-
- protected InfinispanRoutePolicy(String lockKey, String lockValue) {
- this.stoppeddRoutes = new HashSet<>();
- this.startedRoutes = new HashSet<>();
- this.leader = new AtomicBoolean();
- this.shouldStopRoute = true;
- this.lockKey = lockKey;
- this.lockValue = lockValue;
- this.lifespan = 30;
- this.lifespanTimeUnit = TimeUnit.SECONDS;
- this.service = null;
- this.refCount = ReferenceCount.on(this::startService, this::stopService);
- }
-
- @Override
- public CamelContext getCamelContext() {
- return camelContext;
- }
-
- @Override
- public void setCamelContext(CamelContext camelContext) {
- this.camelContext = camelContext;
- }
-
- @Override
- public synchronized void onInit(Route route) {
- super.onInit(route);
-
- LOGGER.info("Route managed by {}. Setting route {} AutoStartup flag to false.", getClass(), route.getId());
- route.setAutoStartup(false);
-
- stoppeddRoutes.add(route);
-
- this.refCount.retain();
-
- startManagedRoutes();
- }
-
- @Override
- public synchronized void doShutdown() {
- this.refCount.release();
- }
-
- protected abstract Service createService();
-
- // ****************************************
- // Helpers
- // ****************************************
-
- private void startService() {
- // validate
- StringHelper.notEmpty(lockMapName, "lockMapName", this);
- StringHelper.notEmpty(lockKey, "lockKey", this);
- StringHelper.notEmpty(lockValue, "lockValue", this);
- ObjectHelper.notNull(camelContext, "camelContext", this);
-
- try {
- if (lifespanTimeUnit.convert(lifespan, TimeUnit.SECONDS) < 2) {
- throw new IllegalArgumentException("Lock lifespan can not be less that 2 seconds");
- }
-
- this.service = createService();
- this.service.start();
- } catch (Exception e) {
- throw new RuntimeCamelException(e);
- }
- }
-
- private void stopService() {
- leader.set(false);
-
- try {
- if (this.service != null) {
- this.service.stop();
- }
- } catch (Exception e) {
- throw new RuntimeCamelException(e);
- }
- }
-
- protected void setLeader(boolean isLeader) {
- if (isLeader && leader.compareAndSet(false, isLeader)) {
- LOGGER.info("Leadership taken (map={}, key={}, val={})", lockMapName, lockKey, lockValue);
- startManagedRoutes();
- } else if (!isLeader && leader.getAndSet(isLeader)) {
- LOGGER.info("Leadership lost (map={}, key={} val={})", lockMapName, lockKey, lockValue);
- stopManagedRoutes();
- }
- }
-
- private synchronized void startManagedRoutes() {
- if (!isLeader()) {
- return;
- }
-
- try {
- for (Route route : stoppeddRoutes) {
- LOGGER.debug("Starting route {}", route.getId());
- startRoute(route);
- startedRoutes.add(route);
- }
-
- stoppeddRoutes.removeAll(startedRoutes);
- } catch (Exception e) {
- handleException(e);
- }
- }
-
- private synchronized void stopManagedRoutes() {
- if (isLeader()) {
- return;
- }
-
- try {
- for (Route route : startedRoutes) {
- LOGGER.debug("Stopping route {}", route.getId());
- stopRoute(route);
- stoppeddRoutes.add(route);
- }
-
- startedRoutes.removeAll(stoppeddRoutes);
- } catch (Exception e) {
- handleException(e);
- }
- }
-
- // *************************************************************************
- // Getter/Setters
- // *************************************************************************
-
- @ManagedAttribute(description = "Whether to stop route when starting up and failed to become master")
- public boolean isShouldStopRoute() {
- return shouldStopRoute;
- }
-
- public void setShouldStopRoute(boolean shouldStopRoute) {
- this.shouldStopRoute = shouldStopRoute;
- }
-
- @ManagedAttribute(description = "The lock map name")
- public String getLockMapName() {
- return lockMapName;
- }
-
- public void setLockMapName(String lockMapName) {
- this.lockMapName = lockMapName;
- }
-
- @ManagedAttribute(description = "The lock key")
- public String getLockKey() {
- return lockKey;
- }
-
- public void setLockKey(String lockKey) {
- this.lockKey = lockKey;
- }
-
- @ManagedAttribute(description = "The lock value")
- public String getLockValue() {
- return lockValue;
- }
-
- public void setLockValue(String lockValue) {
- this.lockValue = lockValue;
- }
-
- @ManagedAttribute(description = "The key lifespan for the lock")
- public long getLifespan() {
- return lifespan;
- }
-
- public void setLifespan(long lifespan) {
- this.lifespan = lifespan;
- }
-
- public void setLifespan(long lifespan, TimeUnit lifespanTimeUnit) {
- this.lifespan = lifespan;
- this.lifespanTimeUnit = lifespanTimeUnit;
- }
-
- @ManagedAttribute(description = "The key lifespan time unit for the lock")
- public TimeUnit getLifespanTimeUnit() {
- return lifespanTimeUnit;
- }
-
- public void setLifespanTimeUnit(TimeUnit lifespanTimeUnit) {
- this.lifespanTimeUnit = lifespanTimeUnit;
- }
-
- @ManagedAttribute(description = "Is this route the master or a slave")
- public boolean isLeader() {
- return leader.get();
- }
-}
diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
index f9f0ebc..57edd34 100644
--- a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
+++ b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanUtil.java
@@ -23,6 +23,7 @@ import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.support.ResourceHelper;
@@ -66,4 +67,21 @@ public class InfinispanUtil {
source,
source.getClass().getSimpleName());
}
+
+ public static ScheduledExecutorService newSingleThreadScheduledExecutor(
+ CamelContextAware camelContextAware, Object source) {
+ return newSingleThreadScheduledExecutor(camelContextAware.getCamelContext(), source);
+ }
+
+ public static ScheduledExecutorService newSingleThreadScheduledExecutor(
+ CamelContext camelContext, Object source, String id) {
+ return camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(
+ source,
+ source.getClass().getSimpleName() + "-" + id);
+ }
+
+ public static ScheduledExecutorService newSingleThreadScheduledExecutor(
+ CamelContextAware camelContextAware, Object source, String id) {
+ return newSingleThreadScheduledExecutor(camelContextAware.getCamelContext(), source, id);
+ }
}
diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterConfiguration.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterConfiguration.java
new file mode 100644
index 0000000..617a95a
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterConfiguration.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.cluster;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.component.infinispan.InfinispanConfiguration;
+
+public abstract class InfinispanClusterConfiguration<C extends InfinispanConfiguration> implements Cloneable {
+ private final C configuration;
+ private long lifespan;
+ private TimeUnit lifespanTimeUnit;
+
+ protected InfinispanClusterConfiguration(C configuration) {
+ this.configuration = configuration;
+ this.lifespan = 30;
+ this.lifespanTimeUnit = TimeUnit.SECONDS;
+ }
+
+ // ***********************************************
+ // Properties
+ // ***********************************************
+
+ public long getLifespan() {
+ return lifespan;
+ }
+
+ public void setLifespan(long lifespan) {
+ this.lifespan = lifespan;
+ }
+
+ public TimeUnit getLifespanTimeUnit() {
+ return lifespanTimeUnit;
+ }
+
+ public void setLifespanTimeUnit(TimeUnit lifespanTimeUnit) {
+ this.lifespanTimeUnit = lifespanTimeUnit;
+ }
+
+ public void setConfigurationUri(String configurationUri) {
+ configuration.setConfigurationUri(configurationUri);
+ }
+
+ public C getConfiguration() {
+ return configuration;
+ }
+
+ // ***********************************************
+ //
+ // ***********************************************
+
+ @Override
+ public InfinispanClusterConfiguration clone() {
+ try {
+ return (InfinispanClusterConfiguration) super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterService.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterService.java
new file mode 100644
index 0000000..a6def76
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterService.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.cluster;
+
+import org.apache.camel.support.cluster.AbstractCamelClusterService;
+
+public abstract class InfinispanClusterService extends AbstractCamelClusterService<InfinispanClusterView> {
+ public static final String LEADER_KEY = "__camel_leader";
+}
diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterView.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterView.java
new file mode 100644
index 0000000..3a4b880
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/cluster/InfinispanClusterView.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.cluster;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.cluster.CamelClusterMember;
+import org.apache.camel.cluster.CamelClusterService;
+import org.apache.camel.support.cluster.AbstractCamelClusterView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class InfinispanClusterView extends AbstractCamelClusterView {
+ private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanClusterView.class);
+
+ protected InfinispanClusterView(CamelClusterService cluster, String namespace) {
+ super(cluster, namespace);
+ }
+
+ protected abstract boolean isLeader(String id);
+
+ // ***********************************************
+ //
+ // ***********************************************
+
+ protected final class LocalMember implements CamelClusterMember {
+ private final AtomicBoolean leader = new AtomicBoolean();
+ private final String id;
+
+ public LocalMember(String id) {
+ this.id = id;
+ }
+
+ public void setLeader(boolean master) {
+ if (master && this.leader.compareAndSet(false, true)) {
+ LOGGER.debug("Leadership taken for id: {}", id);
+
+ fireLeadershipChangedEvent(Optional.of(this));
+ return;
+ }
+ if (!master && this.leader.compareAndSet(true, false)) {
+ LOGGER.debug("Leadership lost for id: {}", id);
+
+ fireLeadershipChangedEvent(getLeader());
+ return;
+ }
+ }
+
+ @Override
+ public boolean isLeader() {
+ return leader.get();
+ }
+
+ @Override
+ public boolean isLocal() {
+ return true;
+ }
+
+ @Override
+ public String getId() {
+ return this.id;
+ }
+
+ @Override
+ public String toString() {
+ return "LocalMember{" + "leader=" + leader + '}';
+ }
+ }
+
+ protected final class ClusterMember implements CamelClusterMember {
+ private final String id;
+
+ public ClusterMember(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public boolean isLeader() {
+ return InfinispanClusterView.this.isLeader(id);
+ }
+
+ @Override
+ public boolean isLocal() {
+ if (id == null) {
+ return false;
+ }
+
+ return Objects.equals(id, getLocalMember().getId());
+ }
+
+ @Override
+ public String toString() {
+ return "ClusterMember{" + "id='" + id + '\'' + '}';
+ }
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan-common/src/test/java/org/apache/camel/component/infinispan/InfinispanRoutePolicyTestSupport.java b/components/camel-infinispan/camel-infinispan-common/src/test/java/org/apache/camel/component/infinispan/InfinispanRoutePolicyTestSupport.java
deleted file mode 100644
index ba5b6df..0000000
--- a/components/camel-infinispan/camel-infinispan-common/src/test/java/org/apache/camel/component/infinispan/InfinispanRoutePolicyTestSupport.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.junit.jupiter.api.Test;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public interface InfinispanRoutePolicyTestSupport {
- InfinispanRoutePolicy createRoutePolicy(String lockValue);
-
- @Test
- default void testLeadership() throws Exception {
- final InfinispanRoutePolicy policy1 = createRoutePolicy("route1");
- final InfinispanRoutePolicy policy2 = createRoutePolicy("route2");
-
- try (CamelContext context = new DefaultCamelContext()) {
- context.start();
-
- RouteBuilder.addRoutes(context, b -> b.from("direct:r1").routePolicy(policy1).to("mock:p1"));
-
- await().atMost(10, TimeUnit.SECONDS).until(policy1::isLeader);
-
- RouteBuilder.addRoutes(context, b -> b.from("direct:r2").routePolicy(policy2).to("mock:p2"));
-
- assertTrue(policy1.isLeader());
- assertFalse(policy2.isLeader());
-
- policy1.shutdown();
-
- await().atMost(10, TimeUnit.SECONDS).until(policy2::isLeader);
-
- assertFalse(policy1.isLeader());
- assertTrue(policy2.isLeader());
- }
- }
-}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/pom.xml b/components/camel-infinispan/camel-infinispan-embedded/pom.xml
index 34ed98f..b8a7bc4 100644
--- a/components/camel-infinispan/camel-infinispan-embedded/pom.xml
+++ b/components/camel-infinispan/camel-infinispan-embedded/pom.xml
@@ -35,10 +35,6 @@
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-support</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
<artifactId>camel-infinispan-common</artifactId>
</dependency>
<dependency>
@@ -65,6 +61,11 @@
<artifactId>camel-test-spring-junit5</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-master</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- testing - infinispan -->
<dependency>
@@ -114,6 +115,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicy.java b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicy.java
deleted file mode 100644
index 615a77c..0000000
--- a/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicy.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan.embedded;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-
-import org.apache.camel.Service;
-import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.component.infinispan.InfinispanRoutePolicy;
-import org.apache.camel.support.service.ServiceSupport;
-import org.apache.camel.util.ObjectHelper;
-import org.infinispan.Cache;
-import org.infinispan.notifications.Listener;
-import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
-import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
-import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@ManagedResource(description = "Route policy using Infinispan Embedded as clustered lock")
-public class InfinispanEmbeddedRoutePolicy extends InfinispanRoutePolicy {
- private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanEmbeddedRoutePolicy.class);
-
- private final InfinispanEmbeddedManager manager;
-
- public InfinispanEmbeddedRoutePolicy(InfinispanEmbeddedConfiguration configuration) {
- this(configuration, null, null);
- }
-
- public InfinispanEmbeddedRoutePolicy(InfinispanEmbeddedConfiguration configuration, String lockKey, String lockValue) {
- super(lockKey, lockValue);
-
- this.manager = new InfinispanEmbeddedManager(configuration);
- }
-
- @Override
- protected Service createService() {
- return new EmbeddedCacheService();
- }
-
- @Override
- public void doStart() throws Exception {
- super.doStart();
- this.manager.start();
- }
-
- @Override
- public void doStop() throws Exception {
- super.doStop();
- this.manager.stop();
- }
-
- // *************************************************************************
- //
- // *************************************************************************
-
- @Listener(clustered = true, sync = false)
- private final class EmbeddedCacheService extends ServiceSupport {
- private Cache<String, String> cache;
- private ScheduledExecutorService executorService;
- private ScheduledFuture<?> future;
-
- EmbeddedCacheService() {
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void doStart() throws Exception {
- this.executorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
- getClass().getSimpleName());
- this.cache = manager.getCache(getLockMapName(), Cache.class);
- this.cache.addListener(this);
- this.future = executorService.scheduleAtFixedRate(this::run, 0, getLifespan() / 2, getLifespanTimeUnit());
- }
-
- @Override
- protected void doStop() throws Exception {
- if (cache != null) {
- this.cache.removeListener(this);
- this.cache.remove(getLockKey(), getLockValue());
- }
-
- if (future != null) {
- future.cancel(true);
- future = null;
- }
-
- getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
- }
-
- private void run() {
- if (!isRunAllowed()) {
- return;
- }
-
- if (isLeader()) {
- // I'm still the leader, so refresh the key so it does not expire.
- if (!cache.replace(getLockKey(), getLockValue(), getLockValue(), getLifespan(), getLifespanTimeUnit())) {
- // Looks like I've lost the leadership.
- setLeader(false);
- }
- }
-
- if (!isLeader()) {
- Object result = cache.putIfAbsent(getLockKey(), getLockValue(), getLifespan(), getLifespanTimeUnit());
- if (result == null) {
- // Acquired the key so I'm the leader.
- setLeader(true);
- } else if (ObjectHelper.equal(getLockValue(), result) && !isLeader()) {
- // Hey, I may have recovered from failure (or reboot was really
- // fast) and my key was still there so yeah, I'm the leader again!
- setLeader(true);
- } else {
- setLeader(false);
- }
- }
- }
-
- @CacheEntryRemoved
- public void onCacheEntryRemoved(CacheEntryEvent<Object, Object> event) {
- LOGGER.debug("onCacheEntryExpired lock-key={}, event-key={}", getLockKey(), event.getKey());
- if (ObjectHelper.equal(getLockKey(), event.getKey())) {
- run();
- }
- }
-
- @CacheEntryExpired
- public void onCacheEntryExpired(CacheEntryEvent<Object, Object> event) {
- LOGGER.debug("onCacheEntryExpired lock-key={}, event-key={}", getLockKey(), event.getKey());
- if (ObjectHelper.equal(getLockKey(), event.getKey())) {
- run();
- }
- }
- }
-}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterConfiguration.java b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterConfiguration.java
new file mode 100644
index 0000000..45a698c
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterConfiguration.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterConfiguration;
+import org.apache.camel.component.infinispan.embedded.InfinispanEmbeddedConfiguration;
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.manager.EmbeddedCacheManager;
+
+public class InfinispanEmbeddedClusterConfiguration
+ extends InfinispanClusterConfiguration<InfinispanEmbeddedConfiguration>
+ implements Cloneable {
+
+ public InfinispanEmbeddedClusterConfiguration() {
+ super(new InfinispanEmbeddedConfiguration());
+ }
+
+ // ***********************************************
+ // Properties
+ // ***********************************************
+
+ public EmbeddedCacheManager getCacheContainer() {
+ return getConfiguration().getCacheContainer();
+ }
+
+ public void setCacheContainer(EmbeddedCacheManager cacheContainer) {
+ getConfiguration().setCacheContainer(cacheContainer);
+ }
+
+ public Configuration getCacheContainerConfiguration() {
+ return getConfiguration().getCacheContainerConfiguration();
+ }
+
+ public void setCacheContainerConfiguration(Configuration cacheContainerConfiguration) {
+ getConfiguration().setCacheContainerConfiguration(cacheContainerConfiguration);
+ }
+
+ // ***********************************************
+ //
+ // ***********************************************
+
+ @Override
+ public InfinispanEmbeddedClusterConfiguration clone() {
+ return (InfinispanEmbeddedClusterConfiguration) super.clone();
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterService.java b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterService.java
new file mode 100644
index 0000000..d471d20
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterService.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterService;
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterView;
+import org.apache.camel.util.ObjectHelper;
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InfinispanEmbeddedClusterService extends InfinispanClusterService {
+ private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanEmbeddedClusterService.class);
+
+ private InfinispanEmbeddedClusterConfiguration configuration;
+
+ public InfinispanEmbeddedClusterService() {
+ this.configuration = new InfinispanEmbeddedClusterConfiguration();
+ }
+
+ public InfinispanEmbeddedClusterService(InfinispanEmbeddedClusterConfiguration configuration) {
+ this.configuration = configuration.clone();
+ }
+
+ // *********************************************
+ // Properties
+ // *********************************************
+
+ public InfinispanEmbeddedClusterConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(InfinispanEmbeddedClusterConfiguration configuration) {
+ this.configuration = configuration.clone();
+ }
+
+ public void setConfigurationUri(String configurationUri) {
+ configuration.setConfigurationUri(configurationUri);
+ }
+
+ public EmbeddedCacheManager getCacheContainer() {
+ return configuration.getCacheContainer();
+ }
+
+ public void setCacheContainer(EmbeddedCacheManager cacheContainer) {
+ configuration.setCacheContainer(cacheContainer);
+ }
+
+ public Configuration getCacheContainerConfiguration() {
+ return configuration.getCacheContainerConfiguration();
+ }
+
+ public void setCacheContainerConfiguration(Configuration cacheContainerConfiguration) {
+ configuration.setCacheContainerConfiguration(cacheContainerConfiguration);
+ }
+
+ public long getLifespan() {
+ return configuration.getLifespan();
+ }
+
+ public void setLifespan(long lifespan) {
+ configuration.setLifespan(lifespan);
+ }
+
+ public TimeUnit getLifespanTimeUnit() {
+ return configuration.getLifespanTimeUnit();
+ }
+
+ public void setLifespanTimeUnit(TimeUnit lifespanTimeUnit) {
+ configuration.setLifespanTimeUnit(lifespanTimeUnit);
+ }
+
+ // *********************************************
+ // Impl
+ // *********************************************
+
+ @Override
+ protected InfinispanClusterView createView(String namespace) throws Exception {
+ // Validate parameters
+ ObjectHelper.notNull(getCamelContext(), "Camel Context");
+ ObjectHelper.notNull(getId(), "Cluster ID");
+
+ return new InfinispanEmbeddedClusterView(this, configuration, namespace);
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterView.java b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterView.java
new file mode 100644
index 0000000..2d09cf9
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/main/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusterView.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import org.apache.camel.cluster.CamelClusterMember;
+import org.apache.camel.component.infinispan.InfinispanUtil;
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterService;
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterView;
+import org.apache.camel.component.infinispan.embedded.InfinispanEmbeddedManager;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.support.service.ServiceSupport;
+import org.infinispan.Cache;
+import org.infinispan.notifications.Listener;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
+import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.util.function.Predicates.negate;
+
+public class InfinispanEmbeddedClusterView extends InfinispanClusterView {
+ private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanEmbeddedClusterService.class);
+
+ private final InfinispanEmbeddedClusterConfiguration configuration;
+ private final InfinispanEmbeddedManager manager;
+ private final LocalMember localMember;
+ private final LeadershipService leadership;
+
+ private Cache<String, String> cache;
+
+ protected InfinispanEmbeddedClusterView(
+ InfinispanEmbeddedClusterService cluster,
+ InfinispanEmbeddedClusterConfiguration configuration,
+ String namespace) {
+ super(cluster, namespace);
+
+ this.configuration = configuration;
+ this.manager = new InfinispanEmbeddedManager(this.configuration.getConfiguration());
+ this.leadership = new LeadershipService();
+ this.localMember = new LocalMember(cluster.getId());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void doStart() throws Exception {
+ super.doStart();
+
+ ServiceHelper.startService(manager);
+
+ this.cache = manager.getCache(getNamespace(), Cache.class);
+
+ ServiceHelper.startService(leadership);
+ }
+
+ @Override
+ public void doStop() throws Exception {
+ super.doStop();
+
+ ServiceHelper.stopService(leadership);
+ ServiceHelper.stopService(manager);
+
+ this.cache = null;
+ }
+
+ @Override
+ public CamelClusterMember getLocalMember() {
+ return this.localMember;
+ }
+
+ @Override
+ public List<CamelClusterMember> getMembers() {
+ return this.cache != null
+ ? cache.keySet().stream()
+ .filter(negate(InfinispanClusterService.LEADER_KEY::equals))
+ .map(ClusterMember::new)
+ .collect(Collectors.toList())
+ : Collections.emptyList();
+ }
+
+ @Override
+ public Optional<CamelClusterMember> getLeader() {
+ if (this.cache == null) {
+ return Optional.empty();
+ }
+
+ String id = cache.get(InfinispanClusterService.LEADER_KEY);
+ if (id == null) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new ClusterMember(id));
+ }
+
+ @Override
+ protected boolean isLeader(String id) {
+ if (this.cache == null) {
+ return false;
+ }
+ if (id == null) {
+ return false;
+ }
+
+ final String key = InfinispanClusterService.LEADER_KEY;
+ final String val = this.cache.get(key);
+
+ return Objects.equals(id, val);
+ }
+
+ // *****************************************
+ //
+ // Service
+ //
+ // *****************************************
+
+ @Listener(clustered = true, sync = false)
+ private final class LeadershipService extends ServiceSupport {
+ private final AtomicBoolean running;
+ private ScheduledExecutorService executorService;
+
+ LeadershipService() {
+ this.running = new AtomicBoolean(false);
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ this.running.set(true);
+ this.executorService = InfinispanUtil.newSingleThreadScheduledExecutor(
+ getCamelContext(),
+ this,
+ getLocalMember().getId());
+
+ // register the local member to the inventory
+ cache.put(
+ getLocalMember().getId(),
+ "false",
+ configuration.getLifespan(),
+ configuration.getLifespanTimeUnit());
+
+ cache.addListener(this);
+
+ executorService.scheduleAtFixedRate(
+ this::run,
+ 0,
+ configuration.getLifespan() / 2,
+ configuration.getLifespanTimeUnit());
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+
+ this.running.set(false);
+
+ if (cache != null) {
+ cache.removeListener(this);
+ }
+
+ getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
+
+ if (cache != null) {
+ cache.remove(InfinispanClusterService.LEADER_KEY, getClusterService().getId());
+
+ LOGGER.info("Removing local member, key={}", getLocalMember().getId());
+ cache.remove(getLocalMember().getId());
+ }
+ }
+
+ private boolean isLeader() {
+ return getLocalMember().isLeader();
+ }
+
+ private void setLeader(boolean leader) {
+ ((LocalMember) getLocalMember()).setLeader(leader);
+ }
+
+ private synchronized void run() {
+ if (!running.get()) {
+ return;
+ }
+
+ final String leaderKey = InfinispanClusterService.LEADER_KEY;
+ final String localId = getLocalMember().getId();
+
+ if (isLeader()) {
+ LOGGER.debug("Lock refresh key={}, id{}", leaderKey, localId);
+
+ // I'm still the leader, so refresh the key so it does not expire.
+ if (!cache.replace(
+ InfinispanClusterService.LEADER_KEY,
+ getClusterService().getId(),
+ getClusterService().getId(),
+ configuration.getLifespan(),
+ configuration.getLifespanTimeUnit())) {
+
+ LOGGER.debug("Failed to refresh the lock key={}, id={}", leaderKey, localId);
+
+ // Looks like I've lost the leadership.
+ setLeader(false);
+ }
+ }
+ if (!isLeader()) {
+ LOGGER.debug("Try to acquire lock key={}, id={}", leaderKey, localId);
+
+ Object result = cache.putIfAbsent(
+ InfinispanClusterService.LEADER_KEY,
+ getClusterService().getId(),
+ configuration.getLifespan(),
+ configuration.getLifespanTimeUnit());
+ if (result == null) {
+ LOGGER.debug("Lock acquired key={}, id={}", leaderKey, localId);
+ // Acquired the key so I'm the leader.
+ setLeader(true);
+ } else if (Objects.equals(getClusterService().getId(), result) && !isLeader()) {
+ LOGGER.debug("Lock resumed key={}, id={}", leaderKey, localId);
+ // Hey, I may have recovered from failure (or reboot was really
+ // fast) and my key was still there so yeah, I'm the leader again!
+ setLeader(true);
+ } else {
+ LOGGER.debug("Failed to acquire the lock key={}, id={}", leaderKey, localId);
+ setLeader(false);
+ }
+ }
+
+ // refresh local membership
+ cache.put(
+ getLocalMember().getId(),
+ isLeader() ? "true" : "false",
+ configuration.getLifespan(),
+ configuration.getLifespanTimeUnit());
+ }
+
+ @CacheEntryRemoved
+ public void onCacheEntryRemoved(CacheEntryEvent<Object, Object> event) {
+ if (!running.get()) {
+ return;
+ }
+
+ LOGGER.debug("onCacheEntryRemoved id={}, lock-key={}, event-key={}",
+ getLocalMember().getId(),
+ InfinispanClusterService.LEADER_KEY,
+ event.getKey());
+
+ if (Objects.equals(InfinispanClusterService.LEADER_KEY, event.getKey())) {
+ executorService.execute(this::run);
+ }
+ }
+
+ @CacheEntryExpired
+ public void onCacheEntryExpired(CacheEntryEvent<Object, Object> event) {
+ if (!running.get()) {
+ return;
+ }
+
+ LOGGER.debug("onCacheEntryExpired id={}, lock-key={}, event-key={}",
+ getLocalMember().getId(),
+ InfinispanClusterService.LEADER_KEY,
+ event.getKey());
+
+ if (Objects.equals(InfinispanClusterService.LEADER_KEY, event.getKey())) {
+ executorService.execute(this::run);
+ }
+ }
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicyTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicyTest.java
deleted file mode 100644
index 9c02b7a..0000000
--- a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/InfinispanEmbeddedRoutePolicyTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan.embedded;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.infinispan.InfinispanRoutePolicy;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.infinispan.commons.api.CacheContainerAdmin;
-import org.infinispan.configuration.cache.CacheMode;
-import org.infinispan.configuration.cache.ConfigurationBuilder;
-import org.infinispan.manager.DefaultCacheManager;
-import org.infinispan.manager.EmbeddedCacheManager;
-import org.junit.jupiter.api.Test;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class InfinispanEmbeddedRoutePolicyTest {
- public static final String CACHE_NAME = "_route_policy";
-
- @Test
- public void testLeadership() throws Exception {
- try (EmbeddedCacheManager cacheContainer = createCacheContainer()) {
-
- cacheContainer.start();
-
- final InfinispanRoutePolicy policy1 = createRoutePolicy(cacheContainer, "route1");
- final InfinispanRoutePolicy policy2 = createRoutePolicy(cacheContainer, "route2");
-
- try (CamelContext context = new DefaultCamelContext()) {
- context.start();
-
- RouteBuilder.addRoutes(context, b -> b.from("direct:r1").routePolicy(policy1).to("mock:p1"));
-
- await().atMost(10, TimeUnit.SECONDS).until(policy1::isLeader);
-
- RouteBuilder.addRoutes(context, b -> b.from("direct:r2").routePolicy(policy2).to("mock:p2"));
-
- assertTrue(policy1.isLeader());
- assertFalse(policy2.isLeader());
-
- policy1.shutdown();
-
- await().atMost(10, TimeUnit.SECONDS).until(policy2::isLeader);
-
- assertFalse(policy1.isLeader());
- assertTrue(policy2.isLeader());
- }
- }
- }
-
- // *****************************
- //
- // *****************************
-
- private static EmbeddedCacheManager createCacheContainer() {
- DefaultCacheManager cacheContainer = new DefaultCacheManager();
- cacheContainer.administration()
- .withFlags(CacheContainerAdmin.AdminFlag.VOLATILE)
- .getOrCreateCache(
- CACHE_NAME,
- new ConfigurationBuilder()
- .clustering().cacheMode(CacheMode.LOCAL)
- .build());
-
- return cacheContainer;
- }
-
- private static InfinispanRoutePolicy createRoutePolicy(EmbeddedCacheManager cacheContainer, String lockValue) {
- InfinispanEmbeddedConfiguration configuration = new InfinispanEmbeddedConfiguration();
- configuration.setCacheContainer(cacheContainer);
-
- InfinispanEmbeddedRoutePolicy policy = new InfinispanEmbeddedRoutePolicy(configuration);
- policy.setLockMapName(CACHE_NAME);
- policy.setLockKey("lock-key");
- policy.setLockValue(lockValue);
-
- return policy;
- }
-}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/AbstractInfinispanEmbeddedClusteredTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/AbstractInfinispanEmbeddedClusteredTest.java
new file mode 100644
index 0000000..4b1aaff
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/AbstractInfinispanEmbeddedClusteredTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.infinispan.manager.DefaultCacheManager;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+abstract class AbstractInfinispanEmbeddedClusteredTest {
+ @Timeout(value = 1, unit = TimeUnit.MINUTES)
+ @Test
+ public void test() throws Exception {
+ final Logger logger = LoggerFactory.getLogger(getClass());
+ final List<String> clients = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList());
+ final List<String> results = new ArrayList<>();
+ final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(clients.size() * 2);
+ final CountDownLatch latch = new CountDownLatch(clients.size());
+ final String viewName = "myView";
+
+ try (DefaultCacheManager cacheContainer = new DefaultCacheManager()) {
+ InfinispanEmbeddedClusteredTestSupport.createCache(cacheContainer, viewName);
+
+ for (String id : clients) {
+ scheduler.submit(() -> {
+ try {
+ run(cacheContainer, viewName, id);
+ logger.debug("Node {} is shutting down", id);
+ results.add(id);
+ } catch (Exception e) {
+ logger.warn("", e);
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ latch.await();
+ scheduler.shutdownNow();
+
+ assertThat(results).hasSameSizeAs(clients);
+ assertThat(results).containsAll(clients);
+ }
+ }
+
+ protected abstract void run(DefaultCacheManager cacheContainer, String namespace, String id) throws Exception;
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredMasterTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredMasterTest.java
new file mode 100644
index 0000000..37372d4
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredMasterTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.infinispan.manager.DefaultCacheManager;
+
+public class InfinispanEmbeddedClusteredMasterTest extends AbstractInfinispanEmbeddedClusteredTest {
+ @Override
+ protected void run(DefaultCacheManager cacheContainer, String namespace, String id) throws Exception {
+ int events = ThreadLocalRandom.current().nextInt(2, 6);
+ CountDownLatch contextLatch = new CountDownLatch(events);
+
+ //Set up a single node cluster.
+ InfinispanEmbeddedClusterService clusterService = new InfinispanEmbeddedClusterService();
+ clusterService.setCacheContainer(cacheContainer);
+ clusterService.setId("node-" + id);
+
+ try (DefaultCamelContext context = new DefaultCamelContext()) {
+ context.disableJMX();
+ context.setName("context-" + id);
+ context.addService(clusterService);
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ fromF("master:%s:timer:%s?delay=1000&period=1000", namespace, id)
+ .routeId("route-" + id)
+ .log("From ${routeId}")
+ .process(e -> contextLatch.countDown());
+ }
+ });
+
+ // Start the context after some random time so the startup order
+ // changes for each test.
+ Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+ context.start();
+
+ contextLatch.await();
+ }
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyFactoryTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyFactoryTest.java
new file mode 100644
index 0000000..08d611d
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyFactoryTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.cluster.ClusteredRoutePolicyFactory;
+import org.infinispan.manager.DefaultCacheManager;
+
+public class InfinispanEmbeddedClusteredRoutePolicyFactoryTest extends AbstractInfinispanEmbeddedClusteredTest {
+ @Override
+ protected void run(DefaultCacheManager cacheContainer, String namespace, String id) throws Exception {
+ int events = ThreadLocalRandom.current().nextInt(2, 6);
+ CountDownLatch contextLatch = new CountDownLatch(events);
+
+ //Set up a single node cluster.
+ InfinispanEmbeddedClusterService clusterService = new InfinispanEmbeddedClusterService();
+ clusterService.setCacheContainer(cacheContainer);
+ clusterService.setId("node-" + id);
+
+ try (DefaultCamelContext context = new DefaultCamelContext()) {
+ context.disableJMX();
+ context.setName("context-" + id);
+ context.addService(clusterService);
+ context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace(namespace));
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ fromF("timer:%s?delay=1000&period=1000", id)
+ .routeId("route-" + id)
+ .log("From ${routeId}")
+ .process(e -> contextLatch.countDown());
+ }
+ });
+
+ // Start the context after some random time so the startup order
+ // changes for each test.
+ Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+ context.start();
+
+ contextLatch.await();
+ }
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyTest.java
new file mode 100644
index 0000000..010c0ce
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredRoutePolicyTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.cluster.ClusteredRoutePolicy;
+import org.infinispan.manager.DefaultCacheManager;
+
+public class InfinispanEmbeddedClusteredRoutePolicyTest extends AbstractInfinispanEmbeddedClusteredTest {
+ @Override
+ protected void run(DefaultCacheManager cacheContainer, String namespace, String id) throws Exception {
+ int events = ThreadLocalRandom.current().nextInt(2, 6);
+ CountDownLatch contextLatch = new CountDownLatch(events);
+
+ //Set up a single node cluster.
+ InfinispanEmbeddedClusterService clusterService = new InfinispanEmbeddedClusterService();
+ clusterService.setCacheContainer(cacheContainer);
+ clusterService.setId("node-" + id);
+
+ try (DefaultCamelContext context = new DefaultCamelContext()) {
+ context.disableJMX();
+ context.setName("context-" + id);
+ context.addService(clusterService);
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ fromF("timer:%s?delay=1000&period=1000", id)
+ .routeId("route-" + id)
+ .routePolicy(ClusteredRoutePolicy.forNamespace(namespace))
+ .log("From ${routeId}")
+ .process(e -> contextLatch.countDown());
+ }
+ });
+
+ // Start the context after some random time so the startup order
+ // changes for each test.
+ Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+ context.start();
+
+ contextLatch.await();
+ }
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredTestSupport.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredTestSupport.java
new file mode 100644
index 0000000..bc8dbde
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredTestSupport.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import org.infinispan.commons.api.CacheContainerAdmin;
+import org.infinispan.configuration.cache.CacheMode;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.manager.DefaultCacheManager;
+
+public final class InfinispanEmbeddedClusteredTestSupport {
+ private InfinispanEmbeddedClusteredTestSupport() {
+ }
+
+ public static void createCache(DefaultCacheManager cacheContainer, String cacheName) {
+ cacheContainer.administration()
+ .withFlags(CacheContainerAdmin.AdminFlag.VOLATILE)
+ .getOrCreateCache(
+ cacheName,
+ new ConfigurationBuilder()
+ .clustering()
+ .cacheMode(CacheMode.LOCAL)
+ .build());
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredViewTest.java b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredViewTest.java
new file mode 100644
index 0000000..d15efc6
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/java/org/apache/camel/component/infinispan/embedded/cluster/InfinispanEmbeddedClusteredViewTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.embedded.cluster;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.cluster.CamelClusterMember;
+import org.apache.camel.cluster.CamelClusterView;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.infinispan.manager.DefaultCacheManager;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.component.infinispan.embedded.cluster.InfinispanEmbeddedClusteredTestSupport.createCache;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class InfinispanEmbeddedClusteredViewTest {
+ @Test
+ public void getLeaderTest() throws Exception {
+ final String viewName = "myView";
+
+ try (DefaultCacheManager cacheContainer = new DefaultCacheManager()) {
+ createCache(cacheContainer, viewName);
+
+ //Set up a single node cluster.
+ InfinispanEmbeddedClusterService clusterService = new InfinispanEmbeddedClusterService();
+ clusterService.setCacheContainer(cacheContainer);
+ clusterService.setId("node");
+
+ //Set up context with single locked route.
+ try (DefaultCamelContext context = new DefaultCamelContext()) {
+ context.disableJMX();
+ context.addService(clusterService);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ fromF("master:%s:timer:infinispan?repeatCount=1", viewName)
+ .routeId("route1")
+ .stop();
+ }
+ });
+
+ context.start();
+
+ CamelClusterView view = clusterService.getView(viewName);
+
+ await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertThat(view.getLeader())
+ .get()
+ .satisfies(CamelClusterMember::isLeader)
+ .satisfies(CamelClusterMember::isLocal);
+ });
+ }
+ }
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan-embedded/src/test/resources/log4j2.properties b/components/camel-infinispan/camel-infinispan-embedded/src/test/resources/log4j2.properties
index 1301f96..b740e62 100644
--- a/components/camel-infinispan/camel-infinispan-embedded/src/test/resources/log4j2.properties
+++ b/components/camel-infinispan/camel-infinispan-embedded/src/test/resources/log4j2.properties
@@ -35,6 +35,10 @@ logger.infinispan-camel.level = INFO
logger.infinispan-camel-remote.name = org.apache.camel.component.infinispan.remote
logger.infinispan-camel-remote.level = INFO
logger.infinispan-camel-embedded.name = org.apache.camel.component.infinispan.embedded
+logger.infinispan-camel-embedded.level = INFO
+logger.infinispan-camel-remote-cluster.name = org.apache.camel.component.infinispan.embedded.cluster
+logger.infinispan-camel-remote-cluster.level = DEBUG
+
logger.infinispan-test-infra-container.name = container.infinispan
logger.infinispan-test-infra-container.level = INFO
logger.infinispan-camel-test.name = org.apache.camel.test.junit5
diff --git a/components/camel-infinispan/camel-infinispan/pom.xml b/components/camel-infinispan/camel-infinispan/pom.xml
index 7db9c45..4b7b32b 100644
--- a/components/camel-infinispan/camel-infinispan/pom.xml
+++ b/components/camel-infinispan/camel-infinispan/pom.xml
@@ -35,10 +35,6 @@
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-support</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
<artifactId>camel-infinispan-common</artifactId>
</dependency>
<dependency>
@@ -70,6 +66,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-master</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-infinispan-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
@@ -144,6 +145,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
diff --git a/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicy.java b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicy.java
deleted file mode 100644
index 71774e5..0000000
--- a/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicy.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan.remote;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-
-import org.apache.camel.Service;
-import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.component.infinispan.InfinispanRoutePolicy;
-import org.apache.camel.component.infinispan.InfinispanUtil;
-import org.apache.camel.support.service.ServiceSupport;
-import org.apache.camel.util.ObjectHelper;
-import org.infinispan.client.hotrod.Flag;
-import org.infinispan.client.hotrod.RemoteCache;
-import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
-import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
-import org.infinispan.client.hotrod.annotation.ClientListener;
-import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent;
-import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@ManagedResource(description = "Route policy using Infinispan Remote as clustered lock")
-public class InfinispanRemoteRoutePolicy extends InfinispanRoutePolicy {
- private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanRemoteRoutePolicy.class);
-
- private final InfinispanRemoteManager manager;
-
- public InfinispanRemoteRoutePolicy(InfinispanRemoteConfiguration configuration) {
- this(configuration, null, null);
- }
-
- public InfinispanRemoteRoutePolicy(InfinispanRemoteConfiguration configuration, String lockKey, String lockValue) {
- super(lockKey, lockValue);
-
- this.manager = new InfinispanRemoteManager(configuration);
- }
-
- @Override
- protected Service createService() {
- return new RemoteCacheService();
- }
-
- @Override
- public void doStart() throws Exception {
- super.doStart();
- this.manager.start();
- }
-
- @Override
- public void doStop() throws Exception {
- super.doStop();
- this.manager.stop();
- }
-
- // *************************************************************************
- //
- // *************************************************************************
-
- @ClientListener
- private final class RemoteCacheService extends ServiceSupport {
- private final int lifespan;
-
- private RemoteCache<String, String> cache;
- private ScheduledExecutorService executorService;
- private ScheduledFuture<?> future;
- private Long version;
-
- RemoteCacheService() {
- this.lifespan = (int) getLifespanTimeUnit().toSeconds(getLifespan());
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void doStart() throws Exception {
- this.executorService = InfinispanUtil.newSingleThreadScheduledExecutor(getCamelContext(), this);
- this.cache = manager.getCache(getLockMapName(), RemoteCache.class);
- this.cache.addClientListener(this);
- this.future = executorService.scheduleAtFixedRate(this::run, 0, getLifespan() / 2, getLifespanTimeUnit());
- }
-
- @Override
- protected void doStop() throws Exception {
- if (cache != null) {
- this.cache.removeClientListener(this);
-
- if (this.version != null) {
- this.cache.removeWithVersion(getLockKey(), this.version);
- }
- }
-
- if (future != null) {
- future.cancel(true);
- future = null;
- }
-
- getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
- }
-
- private void run() {
- if (!isRunAllowed()) {
- return;
- }
-
- if (isLeader() && version != null) {
- LOGGER.debug("Lock refresh key={} with version={}", getLockKey(), version);
-
- // I'm still the leader, so refresh the key so it does not expire.
- if (!cache.replaceWithVersion(getLockKey(), getLockValue(), version, lifespan)) {
- setLeader(false);
- } else {
- version = cache.getWithMetadata(getLockKey()).getVersion();
- LOGGER.debug("Lock refreshed key={} with new version={}", getLockKey(), version);
- }
- }
-
- if (!isLeader()) {
- Object result = cache.withFlags(Flag.FORCE_RETURN_VALUE).putIfAbsent(getLockKey(), getLockValue(),
- getLifespan(), getLifespanTimeUnit());
- if (result == null) {
- // Acquired the key so I'm the leader.
- setLeader(true);
-
- // Get the version
- version = cache.getWithMetadata(getLockKey()).getVersion();
-
- LOGGER.debug("Lock acquired key={} with version={}", getLockKey(), version);
- } else if (ObjectHelper.equal(getLockValue(), result) && !isLeader()) {
- // Hey, I may have recovered from failure (or reboot was really
- // fast) and my key was still there so yeah, I'm the leader again!
- setLeader(true);
-
- // Get the version
- version = cache.getWithMetadata(getLockKey()).getVersion();
-
- LOGGER.debug("Lock resumed key={} with version={}", getLockKey(), version);
- } else {
- setLeader(false);
- }
- }
- }
-
- @ClientCacheEntryRemoved
- public void onCacheEntryRemoved(ClientCacheEntryRemovedEvent<String> event) {
- LOGGER.debug("onCacheEntryRemoved lock-key={}, event-key={}", getLockKey(), event.getKey());
- if (ObjectHelper.equal(getLockKey(), event.getKey())) {
- run();
- }
- }
-
- @ClientCacheEntryExpired
- public void onCacheEntryExpired(ClientCacheEntryExpiredEvent<String> event) {
- LOGGER.debug("onCacheEntryExpired lock-key={}, event-key={}", getLockKey(), event.getKey());
- if (ObjectHelper.equal(getLockKey(), event.getKey())) {
- run();
- }
- }
- }
-}
diff --git a/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterConfiguration.java b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterConfiguration.java
new file mode 100644
index 0000000..54682a3
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterConfiguration.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.Map;
+
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterConfiguration;
+import org.apache.camel.component.infinispan.remote.InfinispanRemoteConfiguration;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.Configuration;
+
+public class InfinispanRemoteClusterConfiguration
+ extends InfinispanClusterConfiguration<InfinispanRemoteConfiguration>
+ implements Cloneable {
+
+ public InfinispanRemoteClusterConfiguration() {
+ super(new InfinispanRemoteConfiguration());
+ }
+
+ // ***********************************************
+ // Properties
+ // ***********************************************
+
+ public String getHosts() {
+ return getConfiguration().getHosts();
+ }
+
+ public void setHosts(String hosts) {
+ getConfiguration().setHosts(hosts);
+ }
+
+ public boolean isSecure() {
+ return getConfiguration().isSecure();
+ }
+
+ public void setSecure(boolean secure) {
+ getConfiguration().setSecure(secure);
+ }
+
+ public String getUsername() {
+ return getConfiguration().getUsername();
+ }
+
+ public void setUsername(String username) {
+ getConfiguration().setUsername(username);
+ }
+
+ public String getPassword() {
+ return getConfiguration().getPassword();
+ }
+
+ public void setPassword(String password) {
+ getConfiguration().setPassword(password);
+ }
+
+ public String getSaslMechanism() {
+ return getConfiguration().getSaslMechanism();
+ }
+
+ public void setSaslMechanism(String saslMechanism) {
+ getConfiguration().setSaslMechanism(saslMechanism);
+ }
+
+ public String getSecurityRealm() {
+ return getConfiguration().getSecurityRealm();
+ }
+
+ public void setSecurityRealm(String securityRealm) {
+ getConfiguration().setSecurityRealm(securityRealm);
+ }
+
+ public String getSecurityServerName() {
+ return getConfiguration().getSecurityServerName();
+ }
+
+ public void setSecurityServerName(String securityServerName) {
+ getConfiguration().setSecurityServerName(securityServerName);
+ }
+
+ public Map<String, String> getConfigurationProperties() {
+ return getConfiguration().getConfigurationProperties();
+ }
+
+ public void setConfigurationProperties(Map<String, String> configurationProperties) {
+ getConfiguration().setConfigurationProperties(configurationProperties);
+ }
+
+ public void addConfigurationProperty(String key, String value) {
+ getConfiguration().addConfigurationProperty(key, value);
+ }
+
+ public RemoteCacheManager getCacheContainer() {
+ return getConfiguration().getCacheContainer();
+ }
+
+ public void setCacheContainer(RemoteCacheManager cacheContainer) {
+ getConfiguration().setCacheContainer(cacheContainer);
+ }
+
+ public Configuration getCacheContainerConfiguration() {
+ return getConfiguration().getCacheContainerConfiguration();
+ }
+
+ public void setCacheContainerConfiguration(Configuration cacheContainerConfiguration) {
+ getConfiguration().setCacheContainerConfiguration(cacheContainerConfiguration);
+ }
+
+ // ***********************************************
+ //
+ // ***********************************************
+
+ @Override
+ public InfinispanRemoteClusterConfiguration clone() {
+ return (InfinispanRemoteClusterConfiguration) super.clone();
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterService.java b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterService.java
new file mode 100644
index 0000000..bbfd461
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterService.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterService;
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterView;
+import org.apache.camel.util.ObjectHelper;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.Configuration;
+
+public class InfinispanRemoteClusterService extends InfinispanClusterService {
+ private InfinispanRemoteClusterConfiguration configuration;
+
+ public InfinispanRemoteClusterService() {
+ this.configuration = new InfinispanRemoteClusterConfiguration();
+ }
+
+ public InfinispanRemoteClusterService(InfinispanRemoteClusterConfiguration configuration) {
+ this.configuration = configuration.clone();
+ }
+
+ // *********************************************
+ // Properties
+ // *********************************************
+
+ public InfinispanRemoteClusterConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(InfinispanRemoteClusterConfiguration configuration) {
+ this.configuration = configuration.clone();
+ }
+
+ public void setConfigurationUri(String configurationUri) {
+ configuration.setConfigurationUri(configurationUri);
+ }
+
+ public RemoteCacheManager getCacheContainer() {
+ return configuration.getCacheContainer();
+ }
+
+ public void setCacheContainer(RemoteCacheManager cacheContainer) {
+ configuration.setCacheContainer(cacheContainer);
+ }
+
+ public Configuration getCacheContainerConfiguration() {
+ return configuration.getCacheContainerConfiguration();
+ }
+
+ public void setCacheContainerConfiguration(Configuration cacheContainerConfiguration) {
+ configuration.setCacheContainerConfiguration(cacheContainerConfiguration);
+ }
+
+ public long getLifespan() {
+ return configuration.getLifespan();
+ }
+
+ public void setLifespan(long lifespan) {
+ configuration.setLifespan(lifespan);
+ }
+
+ public TimeUnit getLifespanTimeUnit() {
+ return configuration.getLifespanTimeUnit();
+ }
+
+ public void setLifespanTimeUnit(TimeUnit lifespanTimeUnit) {
+ configuration.setLifespanTimeUnit(lifespanTimeUnit);
+ }
+
+ // *********************************************
+ // Impl
+ // *********************************************
+
+ @Override
+ protected InfinispanClusterView createView(String namespace) throws Exception {
+ // Validate parameters
+ ObjectHelper.notNull(getCamelContext(), "Camel Context");
+ ObjectHelper.notNull(getId(), "Cluster ID");
+
+ return new InfinispanRemoteClusterView(this, configuration, namespace);
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterView.java b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterView.java
new file mode 100644
index 0000000..a0dd216
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/main/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusterView.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import org.apache.camel.cluster.CamelClusterMember;
+import org.apache.camel.component.infinispan.InfinispanUtil;
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterService;
+import org.apache.camel.component.infinispan.cluster.InfinispanClusterView;
+import org.apache.camel.component.infinispan.remote.InfinispanRemoteManager;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.support.service.ServiceSupport;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryExpired;
+import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
+import org.infinispan.client.hotrod.annotation.ClientListener;
+import org.infinispan.client.hotrod.event.ClientCacheEntryExpiredEvent;
+import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.util.function.Predicates.negate;
+
+public class InfinispanRemoteClusterView extends InfinispanClusterView {
+ private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanRemoteClusterService.class);
+
+ private final InfinispanRemoteClusterConfiguration configuration;
+ private final InfinispanRemoteManager manager;
+ private final LocalMember localMember;
+ private final LeadershipService leadership;
+
+ private RemoteCache<String, String> cache;
+
+ protected InfinispanRemoteClusterView(
+ InfinispanRemoteClusterService cluster,
+ InfinispanRemoteClusterConfiguration configuration,
+ String namespace) {
+ super(cluster, namespace);
+
+ this.configuration = configuration;
+ this.manager = new InfinispanRemoteManager(this.configuration.getConfiguration());
+ this.leadership = new LeadershipService();
+ this.localMember = new LocalMember(cluster.getId());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void doStart() throws Exception {
+ super.doStart();
+
+ ServiceHelper.startService(manager);
+
+ this.cache = manager.getCache(getNamespace(), RemoteCache.class);
+
+ ServiceHelper.startService(leadership);
+ }
+
+ @Override
+ public void doStop() throws Exception {
+ super.doStop();
+
+ LOGGER.info("shutdown service: {}", getClusterService().getId());
+
+ ServiceHelper.stopService(leadership);
+ ServiceHelper.stopService(manager);
+
+ this.cache = null;
+ }
+
+ @Override
+ public CamelClusterMember getLocalMember() {
+ return this.localMember;
+ }
+
+ @Override
+ public List<CamelClusterMember> getMembers() {
+ return this.cache != null
+ ? cache.keySet().stream()
+ .filter(negate(InfinispanClusterService.LEADER_KEY::equals))
+ .map(ClusterMember::new)
+ .collect(Collectors.toList())
+ : Collections.emptyList();
+ }
+
+ @Override
+ public Optional<CamelClusterMember> getLeader() {
+ if (this.cache == null) {
+ return Optional.empty();
+ }
+
+ String id = cache.get(InfinispanClusterService.LEADER_KEY);
+ if (id == null) {
+ return Optional.empty();
+ }
+
+ return Optional.of(new ClusterMember(id));
+ }
+
+ @Override
+ protected boolean isLeader(String id) {
+ if (this.cache == null) {
+ return false;
+ }
+ if (id == null) {
+ return false;
+ }
+
+ final String key = InfinispanClusterService.LEADER_KEY;
+ final String val = this.cache.get(key);
+
+ return Objects.equals(id, val);
+ }
+
+ // *****************************************
+ //
+ // Service
+ //
+ // *****************************************
+
+ @ClientListener
+ private final class LeadershipService extends ServiceSupport {
+ private final int lifespan;
+ private final AtomicBoolean running;
+
+ private ScheduledExecutorService executorService;
+ private Long version;
+
+ LeadershipService() {
+ this.lifespan = (int) configuration.getLifespanTimeUnit().toSeconds(configuration.getLifespan());
+ this.running = new AtomicBoolean(false);
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ this.running.set(true);
+ this.executorService = InfinispanUtil.newSingleThreadScheduledExecutor(
+ getCamelContext(),
+ this,
+ getLocalMember().getId());
+
+ // register the local member to the inventory
+ cache.put(
+ getLocalMember().getId(),
+ "false",
+ configuration.getLifespan(),
+ configuration.getLifespanTimeUnit());
+
+ cache.addClientListener(this);
+
+ executorService.scheduleAtFixedRate(
+ this::run,
+ 0,
+ configuration.getLifespan() / 2,
+ configuration.getLifespanTimeUnit());
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+
+ this.running.set(false);
+
+ if (cache != null) {
+ cache.removeClientListener(this);
+ }
+
+ getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
+
+ if (cache != null) {
+ if (this.version != null) {
+ cache.removeWithVersion(InfinispanClusterService.LEADER_KEY, this.version);
+ }
+
+ LOGGER.info("Removing local member, key={}", getLocalMember().getId());
+ cache.remove(getLocalMember().getId());
+ }
+
+ this.version = null;
+ }
+
+ private boolean isLeader() {
+ return getLocalMember().isLeader();
+ }
+
+ private void setLeader(boolean leader) {
+ ((LocalMember) getLocalMember()).setLeader(leader);
+ }
+
+ private synchronized void run() {
+ if (!running.get()) {
+ return;
+ }
+
+ final String leaderKey = InfinispanClusterService.LEADER_KEY;
+ final String localId = getLocalMember().getId();
+
+ if (isLeader() && version != null) {
+ LOGGER.debug("Lock refresh key={}, id{} with version={}", leaderKey, localId, version);
+
+ // I'm still the leader, so refresh the key so it does not expire.
+ if (!cache.replaceWithVersion(
+ leaderKey,
+ getClusterService().getId(),
+ version,
+ lifespan)) {
+
+ LOGGER.debug("Failed to refresh the lock key={}, id={}, version={}", leaderKey, localId, version);
+
+ setLeader(false);
+ } else {
+ version = cache.getWithMetadata(leaderKey).getVersion();
+
+ LOGGER.debug("Lock refreshed key={}, ud={}, with new version={}", leaderKey, localId, version);
+ }
+ }
+
+ if (!isLeader()) {
+ LOGGER.debug("Try to acquire lock key={}, id={}", leaderKey, localId);
+
+ Object result = cache.withFlags(Flag.FORCE_RETURN_VALUE)
+ .putIfAbsent(
+ leaderKey,
+ localId,
+ configuration.getLifespan(),
+ configuration.getLifespanTimeUnit());
+
+ if (result == null) {
+ // Acquired the key so I'm the leader.
+ setLeader(true);
+
+ // Get the version
+ version = cache.getWithMetadata(leaderKey).getVersion();
+
+ LOGGER.debug("Lock acquired key={}, id={}, with version={}", leaderKey, localId, version);
+
+ } else if (Objects.equals(getClusterService().getId(), result) && !isLeader()) {
+ // Hey, I may have recovered from failure (or reboot was really
+ // fast) and my key was still there so yeah, I'm the leader again!
+ setLeader(true);
+
+ // Get the version
+ version = cache.getWithMetadata(leaderKey).getVersion();
+
+ LOGGER.debug("Lock resumed key={}, id={} with version={}", leaderKey, localId, version);
+ } else {
+ LOGGER.debug("Failed to acquire the lock key={}, id={}", leaderKey, localId);
+
+ setLeader(false);
+ }
+ }
+
+ // refresh local membership
+ cache.put(
+ getLocalMember().getId(),
+ isLeader() ? "true" : "false",
+ configuration.getLifespan(),
+ configuration.getLifespanTimeUnit());
+ }
+
+ @ClientCacheEntryRemoved
+ public void onCacheEntryRemoved(ClientCacheEntryRemovedEvent<String> event) {
+ if (!running.get()) {
+ return;
+ }
+
+ LOGGER.debug("onCacheEntryRemoved id={}, lock-key={}, event-key={}",
+ getLocalMember().getId(),
+ InfinispanClusterService.LEADER_KEY,
+ event.getKey());
+
+ if (Objects.equals(InfinispanClusterService.LEADER_KEY, event.getKey())) {
+ executorService.execute(this::run);
+ }
+ }
+
+ @ClientCacheEntryExpired
+ public void onCacheEntryExpired(ClientCacheEntryExpiredEvent<String> event) {
+ if (!running.get()) {
+ return;
+ }
+
+ LOGGER.debug("onCacheEntryExpired id={}, lock-key={}, event-key={}",
+ getLocalMember().getId(),
+ InfinispanClusterService.LEADER_KEY,
+ event.getKey());
+
+ if (Objects.equals(InfinispanClusterService.LEADER_KEY, event.getKey())) {
+ executorService.execute(this::run);
+ }
+ }
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicyTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicyTest.java
deleted file mode 100644
index f09887d..0000000
--- a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteRoutePolicyTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.infinispan.remote;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.infinispan.InfinispanRoutePolicy;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.test.infra.infinispan.services.InfinispanService;
-import org.apache.camel.test.infra.infinispan.services.InfinispanServiceFactory;
-import org.infinispan.client.hotrod.RemoteCacheManager;
-import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
-import org.infinispan.configuration.cache.CacheMode;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-import static org.awaitility.Awaitility.await;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class InfinispanRemoteRoutePolicyTest {
- public static final String CACHE_NAME = "_route_policy";
-
- @RegisterExtension
- public static InfinispanService service = InfinispanServiceFactory.createService();
-
- @Test
- public void testLeadership() throws Exception {
- try (RemoteCacheManager cacheContainer = createCacheContainer(service)) {
-
- cacheContainer.start();
-
- final InfinispanRoutePolicy policy1 = createRoutePolicy(cacheContainer, "route1");
- final InfinispanRoutePolicy policy2 = createRoutePolicy(cacheContainer, "route2");
-
- try (CamelContext context = new DefaultCamelContext()) {
- context.start();
-
- RouteBuilder.addRoutes(context, b -> b.from("direct:r1").routePolicy(policy1).to("mock:p1"));
-
- await().atMost(10, TimeUnit.SECONDS).until(policy1::isLeader);
-
- RouteBuilder.addRoutes(context, b -> b.from("direct:r2").routePolicy(policy2).to("mock:p2"));
-
- assertTrue(policy1.isLeader());
- assertFalse(policy2.isLeader());
-
- policy1.shutdown();
-
- await().atMost(10, TimeUnit.SECONDS).until(policy2::isLeader);
-
- assertFalse(policy1.isLeader());
- assertTrue(policy2.isLeader());
- }
- }
- }
-
- // *****************************
- //
- // *****************************
-
- private static RemoteCacheManager createCacheContainer(InfinispanService service) {
- ConfigurationBuilder clientBuilder = new ConfigurationBuilder();
-
- // for default tests, we force return value for all the
- // operations
- clientBuilder
- .forceReturnValues(true);
-
- // add server from the test infra service
- clientBuilder
- .addServer()
- .host(service.host())
- .port(service.port());
-
- // add security info
- clientBuilder
- .security()
- .authentication()
- .username(service.username())
- .password(service.password())
- .serverName("infinispan")
- .saslMechanism("DIGEST-MD5")
- .realm("default");
-
- RemoteCacheManager cacheContainer = new RemoteCacheManager(clientBuilder.build());
- cacheContainer.administration()
- .getOrCreateCache(
- CACHE_NAME,
- new org.infinispan.configuration.cache.ConfigurationBuilder()
- .clustering()
- .cacheMode(CacheMode.DIST_SYNC).build());
-
- return cacheContainer;
- }
-
- private static InfinispanRoutePolicy createRoutePolicy(RemoteCacheManager cacheContainer, String lockValue) {
- InfinispanRemoteConfiguration configuration = new InfinispanRemoteConfiguration();
- configuration.setCacheContainer(cacheContainer);
-
- InfinispanRemoteRoutePolicy policy = new InfinispanRemoteRoutePolicy(configuration);
- policy.setLockMapName(CACHE_NAME);
- policy.setLockKey("lock-key");
- policy.setLockValue(lockValue);
-
- return policy;
- }
-}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/AbstractInfinispanRemoteClusteredTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/AbstractInfinispanRemoteClusteredTest.java
new file mode 100644
index 0000000..aca2ea8
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/AbstractInfinispanRemoteClusteredTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.camel.test.infra.infinispan.services.InfinispanService;
+import org.apache.camel.test.infra.infinispan.services.InfinispanServiceFactory;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.camel.component.infinispan.remote.cluster.InfinispanRemoteClusteredTestSupport.createCache;
+import static org.apache.camel.component.infinispan.remote.cluster.InfinispanRemoteClusteredTestSupport.createConfiguration;
+import static org.assertj.core.api.Assertions.assertThat;
+
+abstract class AbstractInfinispanRemoteClusteredTest {
+ @RegisterExtension
+ public static InfinispanService service = InfinispanServiceFactory.createService();
+
+ @Timeout(value = 1, unit = TimeUnit.MINUTES)
+ @Test
+ public void test() throws Exception {
+ final Logger logger = LoggerFactory.getLogger(getClass());
+ final List<String> clients = IntStream.range(0, 3).mapToObj(Integer::toString).collect(Collectors.toList());
+ final List<String> results = new ArrayList<>();
+ final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(clients.size() * 2);
+ final CountDownLatch latch = new CountDownLatch(clients.size());
+ final String viewName = "myView";
+
+ Configuration configuration = createConfiguration(service);
+
+ try (RemoteCacheManager cacheContainer = new RemoteCacheManager(configuration)) {
+ createCache(cacheContainer, viewName);
+
+ for (String id : clients) {
+ scheduler.submit(() -> {
+ try {
+ run(cacheContainer, viewName, id);
+ logger.debug("Node {} is shutting down", id);
+ results.add(id);
+ } catch (Exception e) {
+ logger.warn("", e);
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ latch.await();
+ scheduler.shutdownNow();
+
+ assertThat(results).hasSameSizeAs(clients);
+ assertThat(results).containsAll(clients);
+ }
+ }
+
+ protected abstract void run(RemoteCacheManager cacheContainer, String namespace, String id) throws Exception;
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredMasterTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredMasterTest.java
new file mode 100644
index 0000000..73621cc
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredMasterTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+
+public class InfinispanRemoteClusteredMasterTest extends AbstractInfinispanRemoteClusteredTest {
+ @Override
+ protected void run(RemoteCacheManager cacheContainer, String namespace, String id) throws Exception {
+ final int events = ThreadLocalRandom.current().nextInt(2, 6);
+ final CountDownLatch contextLatch = new CountDownLatch(events);
+
+ //Set up a single node cluster.
+ InfinispanRemoteClusterService clusterService = new InfinispanRemoteClusterService();
+ clusterService.setCacheContainer(cacheContainer);
+ clusterService.setId("node-" + id);
+
+ try (DefaultCamelContext context = new DefaultCamelContext()) {
+ context.disableJMX();
+ context.setName("context-" + id);
+ context.addService(clusterService);
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ fromF("master:%s:timer:%s?delay=1000&period=1000&repeatCount=%d", namespace, id, events)
+ .routeId("route-" + id)
+ .log("From id=${routeId} counter=${header.CamelTimerCounter}")
+ .process(e -> contextLatch.countDown());
+ }
+ });
+
+ // Start the context after some random time so the startup order
+ // changes for each test.
+ Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+ context.start();
+
+ contextLatch.await();
+ }
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyFactoryTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyFactoryTest.java
new file mode 100644
index 0000000..bbbc1d1
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyFactoryTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.cluster.ClusteredRoutePolicyFactory;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+
+public class InfinispanRemoteClusteredRoutePolicyFactoryTest extends AbstractInfinispanRemoteClusteredTest {
+ @Override
+ protected void run(RemoteCacheManager cacheContainer, String namespace, String id) throws Exception {
+ final int events = ThreadLocalRandom.current().nextInt(2, 6);
+ final CountDownLatch contextLatch = new CountDownLatch(events);
+
+ //Set up a single node cluster.
+ InfinispanRemoteClusterService clusterService = new InfinispanRemoteClusterService();
+ clusterService.setCacheContainer(cacheContainer);
+ clusterService.setId("node-" + id);
+
+ try (DefaultCamelContext context = new DefaultCamelContext()) {
+ context.disableJMX();
+ context.setName("context-" + id);
+ context.addService(clusterService);
+ context.addRoutePolicyFactory(ClusteredRoutePolicyFactory.forNamespace(namespace));
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ fromF("timer:%s?delay=1000&period=1000&repeatCount=%d", id, events)
+ .routeId("route-" + id)
+ .log("From id=${routeId} counter=${header.CamelTimerCounter}")
+ .process(e -> contextLatch.countDown());
+ }
+ });
+
+ // Start the context after some random time so the startup order
+ // changes for each test.
+ Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+ context.start();
+
+ contextLatch.await();
+ }
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyTest.java
new file mode 100644
index 0000000..5dcafca
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredRoutePolicyTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.cluster.ClusteredRoutePolicy;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+
+public class InfinispanRemoteClusteredRoutePolicyTest extends AbstractInfinispanRemoteClusteredTest {
+ @Override
+ protected void run(RemoteCacheManager cacheContainer, String namespace, String id) throws Exception {
+ final int events = ThreadLocalRandom.current().nextInt(2, 6);
+ final CountDownLatch contextLatch = new CountDownLatch(events);
+
+ //Set up a single node cluster.
+ InfinispanRemoteClusterService clusterService = new InfinispanRemoteClusterService();
+ clusterService.setCacheContainer(cacheContainer);
+ clusterService.setId("node-" + id);
+
+ try (DefaultCamelContext context = new DefaultCamelContext()) {
+ context.disableJMX();
+ context.setName("context-" + id);
+ context.addService(clusterService);
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ fromF("timer:%s?delay=1000&period=1000&repeatCount=%d", id, events)
+ .routeId("route-" + id)
+ .routePolicy(ClusteredRoutePolicy.forNamespace(namespace))
+ .log("From id=${routeId} counter=${header.CamelTimerCounter}")
+ .process(e -> contextLatch.countDown());
+ }
+ });
+
+ // Start the context after some random time so the startup order
+ // changes for each test.
+ Thread.sleep(ThreadLocalRandom.current().nextInt(500));
+ context.start();
+
+ contextLatch.await();
+ }
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredTestSupport.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredTestSupport.java
new file mode 100644
index 0000000..a78ab82
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredTestSupport.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import org.apache.camel.test.infra.infinispan.services.InfinispanService;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.Configuration;
+import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
+import org.infinispan.configuration.cache.CacheMode;
+
+public final class InfinispanRemoteClusteredTestSupport {
+ private InfinispanRemoteClusteredTestSupport() {
+ }
+
+ public static Configuration createConfiguration(InfinispanService service) {
+ return new ConfigurationBuilder()
+ .addServer()
+ .host(service.host())
+ .port(service.port())
+ .security()
+ .authentication()
+ .username(service.username())
+ .password(service.password())
+ .serverName("infinispan")
+ .saslMechanism("DIGEST-MD5")
+ .realm("default")
+ .build();
+
+ }
+
+ public static void createCache(RemoteCacheManager cacheContainer, String cacheName) {
+ cacheContainer.administration()
+ .getOrCreateCache(
+ cacheName,
+ new org.infinispan.configuration.cache.ConfigurationBuilder()
+ .clustering()
+ .cacheMode(CacheMode.DIST_SYNC).build());
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredViewTest.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredViewTest.java
new file mode 100644
index 0000000..dd76a64
--- /dev/null
+++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/cluster/InfinispanRemoteClusteredViewTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.infinispan.remote.cluster;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.cluster.CamelClusterMember;
+import org.apache.camel.cluster.CamelClusterView;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.test.infra.infinispan.services.InfinispanService;
+import org.apache.camel.test.infra.infinispan.services.InfinispanServiceFactory;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import static org.apache.camel.component.infinispan.remote.cluster.InfinispanRemoteClusteredTestSupport.createCache;
+import static org.apache.camel.component.infinispan.remote.cluster.InfinispanRemoteClusteredTestSupport.createConfiguration;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class InfinispanRemoteClusteredViewTest {
+ @RegisterExtension
+ public static InfinispanService service = InfinispanServiceFactory.createService();
+
+ @Test
+ public void getLeaderTest() throws Exception {
+ final String viewName = "myView";
+
+ Configuration configuration = createConfiguration(service);
+
+ try (RemoteCacheManager cacheContainer = new RemoteCacheManager(configuration)) {
+ createCache(cacheContainer, viewName);
+
+ InfinispanRemoteClusterService clusterService = new InfinispanRemoteClusterService();
+ clusterService.setCacheContainer(cacheContainer);
+ clusterService.setId("node");
+
+ //Set up context with single locked route.
+ try (DefaultCamelContext context = new DefaultCamelContext()) {
+ context.disableJMX();
+ context.addService(clusterService);
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() {
+ fromF("master:%s:timer:infinispan?repeatCount=1", viewName)
+ .routeId("route1")
+ .stop();
+ }
+ });
+
+ context.start();
+
+ CamelClusterView view = clusterService.getView(viewName);
+
+ await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertThat(view.getLeader())
+ .get()
+ .satisfies(CamelClusterMember::isLeader)
+ .satisfies(CamelClusterMember::isLocal);
+ });
+ }
+ }
+ }
+}
diff --git a/components/camel-infinispan/camel-infinispan/src/test/resources/log4j2.properties b/components/camel-infinispan/camel-infinispan/src/test/resources/log4j2.properties
index 3847970..5a3ff87 100644
--- a/components/camel-infinispan/camel-infinispan/src/test/resources/log4j2.properties
+++ b/components/camel-infinispan/camel-infinispan/src/test/resources/log4j2.properties
@@ -34,6 +34,8 @@ logger.infinispan-camel.name = org.apache.camel.component.infinispan
logger.infinispan-camel.level = INFO
logger.infinispan-camel-remote.name = org.apache.camel.component.infinispan.remote
logger.infinispan-camel-remote.level = INFO
+logger.infinispan-camel-remote-cluster.name = org.apache.camel.component.infinispan.remote.cluster
+logger.infinispan-camel-remote-cluster.level = DEBUG
logger.infinispan-camel-embedded.name = org.apache.camel.component.infinispan.embedded
logger.infinispan-test-infra-container.name = container.infinispan
logger.infinispan-test-infra-container.level = WARN