You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/06/16 15:39:05 UTC
[04/13] camel git commit: CAMEL-11362: create a LeaderElectionservice
CAMEL-11362: create a LeaderElectionservice
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b6f1bdd8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b6f1bdd8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b6f1bdd8
Branch: refs/heads/master
Commit: b6f1bdd8807efaa3e085d410affd4497e24aeab3
Parents: 9ddff22
Author: lburgazzoli <lb...@gmail.com>
Authored: Wed May 31 18:12:47 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Fri Jun 16 17:37:53 2017 +0200
----------------------------------------------------------------------
.../camel/ha/AbstractCamelClusterView.java | 121 ---------
.../java/org/apache/camel/ha/CamelCluster.java | 4 +-
.../apache/camel/ha/CamelClusterFactory.java | 27 ++
.../org/apache/camel/ha/CamelClusterHelper.java | 28 ++
.../org/apache/camel/ha/CamelClusterView.java | 5 +-
.../org/apache/camel/ha/LeaderRoutePolicy.java | 169 ------------
.../camel/impl/ha/AbstractCamelCluster.java | 115 +++++++++
.../camel/impl/ha/AbstractCamelClusterView.java | 137 ++++++++++
.../camel/impl/ha/ClusteredRoutePolicy.java | 256 +++++++++++++++++++
.../atomix/ha/AtomixClusterMember.java | 45 ----
10 files changed, 570 insertions(+), 337 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/ha/AbstractCamelClusterView.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ha/AbstractCamelClusterView.java b/camel-core/src/main/java/org/apache/camel/ha/AbstractCamelClusterView.java
deleted file mode 100644
index 64cee58..0000000
--- a/camel-core/src/main/java/org/apache/camel/ha/AbstractCamelClusterView.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.ha;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.StampedLock;
-import java.util.function.BiConsumer;
-import java.util.function.Predicate;
-
-public abstract class AbstractCamelClusterView implements CamelClusterView {
- private final CamelCluster cluster;
- private final String namespace;
- private final List<FilteringConsumer> consumers;
- private final StampedLock lock;
-
- protected AbstractCamelClusterView(CamelCluster cluster, String namespace) {
- this.cluster = cluster;
- this.namespace = namespace;
- this.consumers = new ArrayList<>();
- this.lock = new StampedLock();
- }
-
- @Override
- public CamelCluster getCluster() {
- return this.cluster;
- }
-
- @Override
- public String getNamespace() {
- return this.namespace;
- }
-
- @Override
- public void addEventListener(BiConsumer<Event, Object> consumer) {
- long stamp = lock.writeLock();
-
- try {
- consumers.add(new FilteringConsumer(e -> true, consumer));
- } finally {
- lock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void addEventListener(Predicate<Event> predicate, BiConsumer<Event, Object> consumer) {
- long stamp = lock.writeLock();
-
- try {
- this.consumers.add(new FilteringConsumer(predicate, consumer));
- } finally {
- lock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void removeEventListener(BiConsumer<Event, Object> consumer) {
- long stamp = lock.writeLock();
-
- try {
- consumers.removeIf(c -> c.getConsumer().equals(consumer));
- } finally {
- lock.unlockWrite(stamp);
- }
- }
-
- // **************************************
- // Events
- // **************************************
-
- protected void fireEvent(CamelClusterView.Event event, Object payload) {
- long stamp = lock.readLock();
-
- try {
- for (int i = 0; i < consumers.size(); i++) {
- consumers.get(0).accept(event, payload);
- }
- } finally {
- lock.unlockRead(stamp);
- }
- }
-
- // **************************************
- // Helpers
- // **************************************
-
- private final class FilteringConsumer implements BiConsumer<Event, Object> {
- private final Predicate<Event> predicate;
- private final BiConsumer<Event, Object> consumer;
-
- FilteringConsumer(Predicate<Event> predicate, BiConsumer<Event, Object> consumer) {
- this.predicate = predicate;
- this.consumer = consumer;
- }
-
- @Override
- public void accept(CamelClusterView.Event event, Object payload) {
- if (predicate.test(event)) {
- consumer.accept(event, payload);
- }
- }
-
- public BiConsumer<Event, Object> getConsumer() {
- return this.consumer;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/ha/CamelCluster.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelCluster.java b/camel-core/src/main/java/org/apache/camel/ha/CamelCluster.java
index 5da1e8c..d6c151c 100644
--- a/camel-core/src/main/java/org/apache/camel/ha/CamelCluster.java
+++ b/camel-core/src/main/java/org/apache/camel/ha/CamelCluster.java
@@ -16,9 +16,11 @@
*/
package org.apache.camel.ha;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Service;
import org.apache.camel.spi.HasId;
-public interface CamelCluster extends HasId {
+public interface CamelCluster extends Service, CamelContextAware, HasId {
/**
* Creates a view of the cluster bound to a namespace.
*
http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/ha/CamelClusterFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterFactory.java b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterFactory.java
new file mode 100644
index 0000000..be332b4
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.ha;
+
+import org.apache.camel.CamelContext;
+
+@FunctionalInterface
+public interface CamelClusterFactory {
+ /**
+ * Creates an instance of a cluster.
+ */
+ CamelCluster newInstance(CamelContext camelContext) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/ha/CamelClusterHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterHelper.java b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterHelper.java
new file mode 100644
index 0000000..c6f91ed
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterHelper.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.ha;
+
+import java.util.function.Predicate;
+
+public final class CamelClusterHelper {
+ private CamelClusterHelper() {
+ }
+
+ public static Predicate<CamelClusterView.Event> leadershipEventFilter() {
+ return e -> e == CamelClusterView.Event.LEADERSHIP_CHANGED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java
index 7af5a21..bda7f8b 100644
--- a/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java
+++ b/camel-core/src/main/java/org/apache/camel/ha/CamelClusterView.java
@@ -20,10 +20,13 @@ import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Service;
+
/**
* Represents the View of the cluster at some given period of time.
*/
-public interface CamelClusterView {
+public interface CamelClusterView extends Service, CamelContextAware {
enum Event {
KEEP_ALIVE,
http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/ha/LeaderRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ha/LeaderRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/ha/LeaderRoutePolicy.java
deleted file mode 100644
index 40ae1e8..0000000
--- a/camel-core/src/main/java/org/apache/camel/ha/LeaderRoutePolicy.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.ha;
-
-import java.util.HashSet;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.Route;
-import org.apache.camel.api.management.ManagedAttribute;
-import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.support.RoutePolicySupport;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ReferenceCount;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@ManagedResource(description = "Route policy using ...")
-public class LeaderRoutePolicy extends RoutePolicySupport implements CamelContextAware {
- private static final Logger LOGGER = LoggerFactory.getLogger(LeaderRoutePolicy.class);
-
- private final AtomicBoolean leader;
- private final Set<Route> startedRoutes;
- private final Set<Route> stoppedRoutes;
- private final ReferenceCount refCount;
- private final CamelClusterView clusterView;
- private final CamelClusterMember clusterMember;
- private final BiConsumer<CamelClusterView.Event, Object> clusterEventConsumer;
- private CamelContext camelContext;
-
- public LeaderRoutePolicy(CamelClusterView clusterView, CamelClusterMember clusterMember) {
- this.clusterMember = clusterMember;
- this.clusterView = clusterView;
- this.clusterEventConsumer = this::onClusterEvent;
- this.stoppedRoutes = new HashSet<>();
- this.startedRoutes = new HashSet<>();
- this.leader = new AtomicBoolean(false);
-
- this.refCount = ReferenceCount.on(
- () -> clusterView.addEventListener(clusterEventConsumer),
- () -> clusterView.removeEventListener(clusterEventConsumer)
- );
- }
-
- @Override
- public CamelContext getCamelContext() {
- return camelContext;
- }
-
- @Override
- public void setCamelContext(CamelContext camelContext) {
- this.camelContext = camelContext;
- }
-
- @Override
- public synchronized void onInit(Route route) {
- super.onInit(route);
-
- LOGGER.info("Route managed by {}. Setting route {} AutoStartup flag to false.", getClass(), route.getId());
- route.getRouteContext().getRoute().setAutoStartup("false");
-
- stoppedRoutes.add(route);
-
- this.refCount.retain();
-
- startManagedRoutes();
- }
-
- @Override
- public synchronized void doShutdown() {
- this.refCount.release();
- }
-
- // ****************************************************
- // Management
- // ****************************************************
-
- @ManagedAttribute(description = "Is this route the master or a slave")
- public boolean isLeader() {
- return leader.get();
- }
-
- // ****************************************************
- // Route managements
- // ****************************************************
-
- private void onClusterEvent(CamelClusterView.Event event, Object payload) {
- if (event == CamelClusterView.Event.KEEP_ALIVE) {
- LOGGER.debug("Got KEEP_ALIVE from cluster '{}' with payload '{}'", clusterView.getCluster().getId(), Objects.toString(payload));
- }
- if (event == CamelClusterView.Event.LEADERSHIP_CHANGED) {
- boolean isLeader = ObjectHelper.equal(clusterMember.getId(), clusterView.getMaster().getId());
-
- if (isLeader && leader.compareAndSet(false, isLeader)) {
- LOGGER.info("Leadership taken");
- startManagedRoutes();
- } else if (!isLeader && leader.getAndSet(isLeader)) {
- LOGGER.info("Leadership lost");
- stopManagedRoutes();
- }
- }
- }
-
- private synchronized void startManagedRoutes() {
- if (isLeader()) {
- doStartManagedRoutes();
- } else {
- // If the leadership has been lost in the meanwhile, stop any
- // eventually started route
- doStopManagedRoutes();
- }
- }
-
- private synchronized void doStartManagedRoutes() {
- try {
- for (Route route : stoppedRoutes) {
- LOGGER.debug("Starting route {}", route.getId());
- startRoute(route);
- startedRoutes.add(route);
- }
-
- stoppedRoutes.removeAll(startedRoutes);
- } catch (Exception e) {
- handleException(e);
- }
- }
-
- private synchronized void stopManagedRoutes() {
- if (isLeader()) {
- // If became a leader in the meanwhile, start any eventually stopped
- // route
- doStartManagedRoutes();
- } else {
- doStopManagedRoutes();
- }
- }
-
- private synchronized void doStopManagedRoutes() {
- try {
- for (Route route : startedRoutes) {
- LOGGER.debug("Stopping route {}", route.getId());
- stopRoute(route);
- stoppedRoutes.add(route);
- }
-
- startedRoutes.removeAll(stoppedRoutes);
- } catch (Exception e) {
- handleException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelCluster.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelCluster.java b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelCluster.java
new file mode 100644
index 0000000..9c4691e
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelCluster.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.ha;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.StampedLock;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ha.CamelCluster;
+import org.apache.camel.ha.CamelClusterView;
+import org.apache.camel.support.ServiceSupport;
+
+public abstract class AbstractCamelCluster<T extends CamelClusterView> extends ServiceSupport implements CamelCluster {
+ private final String id;
+ private final Map<String, T> views;
+ private final StampedLock lock;
+ private CamelContext camelContext;
+
+ protected AbstractCamelCluster(String id) {
+ this(id, null);
+ }
+
+ protected AbstractCamelCluster(String id, CamelContext camelContext) {
+ this.id = id;
+ this.camelContext = camelContext;
+ this.views = new HashMap<>();
+ this.lock = new StampedLock();
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ long stamp = lock.readLock();
+
+ try {
+ for (T view : views.values()) {
+ view.start();
+ }
+ } finally {
+ lock.unlockRead(stamp);
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ long stamp = lock.readLock();
+
+ try {
+ for (T view : views.values()) {
+ view.stop();
+ }
+ } finally {
+ lock.unlockRead(stamp);
+ }
+ }
+
+ @Override
+ public CamelClusterView createView(String namespace) throws Exception {
+ long stamp = lock.writeLock();
+
+ try {
+ T view = views.get(namespace);
+
+ if (view == null) {
+ view = doCreateView(namespace);
+ view.setCamelContext(this.camelContext);
+
+ views.put(namespace, view);
+
+ if (AbstractCamelCluster.this.isRunAllowed()) {
+ view.start();
+ }
+ }
+
+ return view;
+ } finally {
+ lock.unlockWrite(stamp);
+ }
+ }
+
+ // **********************************
+ // Implementation
+ // **********************************
+
+ protected abstract T doCreateView(String namespace) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java
new file mode 100644
index 0000000..1149a7f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ha/AbstractCamelClusterView.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.ha;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ha.CamelCluster;
+import org.apache.camel.ha.CamelClusterView;
+import org.apache.camel.support.ServiceSupport;
+
+public abstract class AbstractCamelClusterView extends ServiceSupport implements CamelClusterView {
+ private final CamelCluster cluster;
+ private final String namespace;
+ private final List<FilteringConsumer> consumers;
+ private final StampedLock lock;
+ private CamelContext camelContext;
+
+ protected AbstractCamelClusterView(CamelCluster cluster, String namespace) {
+ this.cluster = cluster;
+ this.namespace = namespace;
+ this.consumers = new ArrayList<>();
+ this.lock = new StampedLock();
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return null;
+ }
+
+ @Override
+ public CamelCluster getCluster() {
+ return this.cluster;
+ }
+
+ @Override
+ public String getNamespace() {
+ return this.namespace;
+ }
+
+ @Override
+ public void addEventListener(BiConsumer<Event, Object> consumer) {
+ long stamp = lock.writeLock();
+
+ try {
+ consumers.add(new FilteringConsumer(e -> true, consumer));
+ } finally {
+ lock.unlockWrite(stamp);
+ }
+ }
+
+ @Override
+ public void addEventListener(Predicate<Event> predicate, BiConsumer<Event, Object> consumer) {
+ long stamp = lock.writeLock();
+
+ try {
+ consumers.add(new FilteringConsumer(predicate, consumer));
+ } finally {
+ lock.unlockWrite(stamp);
+ }
+ }
+
+ @Override
+ public void removeEventListener(BiConsumer<Event, Object> consumer) {
+ long stamp = lock.writeLock();
+
+ try {
+ consumers.removeIf(c -> c.getConsumer().equals(consumer));
+ } finally {
+ lock.unlockWrite(stamp);
+ }
+ }
+
+ // **************************************
+ // Events
+ // **************************************
+
+ protected void fireEvent(CamelClusterView.Event event, Object payload) {
+ long stamp = lock.readLock();
+
+ try {
+ for (int i = 0; i < consumers.size(); i++) {
+ consumers.get(i).accept(event, payload);
+ }
+ } finally {
+ lock.unlockRead(stamp);
+ }
+ }
+
+ // **************************************
+ // Helpers
+ // **************************************
+
+ private final class FilteringConsumer implements BiConsumer<Event, Object> {
+ private final Predicate<Event> predicate;
+ private final BiConsumer<Event, Object> consumer;
+
+ FilteringConsumer(Predicate<Event> predicate, BiConsumer<Event, Object> consumer) {
+ this.predicate = predicate;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void accept(CamelClusterView.Event event, Object payload) {
+ if (predicate.test(event)) {
+ consumer.accept(event, payload);
+ }
+ }
+
+ public BiConsumer<Event, Object> getConsumer() {
+ return this.consumer;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
new file mode 100644
index 0000000..4aa1fa2
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/ha/ClusteredRoutePolicy.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.ha;
+
+import java.util.EventObject;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Route;
+import org.apache.camel.StartupListener;
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.ha.CamelCluster;
+import org.apache.camel.ha.CamelClusterHelper;
+import org.apache.camel.ha.CamelClusterView;
+import org.apache.camel.management.event.CamelContextStartedEvent;
+import org.apache.camel.support.EventNotifierSupport;
+import org.apache.camel.support.RoutePolicySupport;
+import org.apache.camel.util.ReferenceCount;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ManagedResource(description = "Clustered Route policy using")
+public final class ClusteredRoutePolicy extends RoutePolicySupport implements CamelContextAware {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClusteredRoutePolicy.class);
+
+ private final AtomicBoolean leader;
+ private final Set<Route> startedRoutes;
+ private final Set<Route> stoppedRoutes;
+ private final ReferenceCount refCount;
+ private final CamelClusterView clusterView;
+ private final BiConsumer<CamelClusterView.Event, Object> leadershipEventConsumer;
+ private final CamelContextStartupListener listener;
+ private final AtomicBoolean contextStarted;
+ private CamelContext camelContext;
+
+ public ClusteredRoutePolicy(CamelClusterView clusterView) {
+ this.clusterView = clusterView;
+ this.leadershipEventConsumer = this::onLeadershipEvent;
+
+ this.stoppedRoutes = new HashSet<>();
+ this.startedRoutes = new HashSet<>();
+ this.leader = new AtomicBoolean(false);
+ this.contextStarted = new AtomicBoolean(false);
+
+ try {
+ this.listener = new CamelContextStartupListener();
+ this.listener.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ // Cleanup the policy when all the routes it manages have been shut down
+ // so it can be shared among routes.
+ this.refCount = ReferenceCount.onRelease(() -> {
+ if (camelContext != null) {
+ camelContext.getManagementStrategy().removeEventNotifier(listener);
+ }
+
+ clusterView.removeEventListener(leadershipEventConsumer);
+ setLeader(false);
+ });
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ if (this.camelContext == camelContext) {
+ return;
+ }
+
+ if (this.camelContext != null && this.camelContext != camelContext) {
+ throw new IllegalStateException(
+ "CamelContext should not be changed: current=" + this.camelContext + ", new=" + camelContext
+ );
+ }
+
+ try {
+ this.camelContext = camelContext;
+ this.camelContext.addStartupListener(this.listener);
+ this.camelContext.getManagementStrategy().addEventNotifier(this.listener);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onInit(Route route) {
+ super.onInit(route);
+
+ LOGGER.info("Route managed by {}. Setting route {} AutoStartup flag to false.", getClass(), route.getId());
+ route.getRouteContext().getRoute().setAutoStartup("false");
+
+ this.refCount.retain();
+ this.stoppedRoutes.add(route);
+
+ startManagedRoutes();
+ }
+
+ @Override
+ public void doShutdown() {
+ this.refCount.release();
+ }
+
+ // ****************************************************
+ // Management
+ // ****************************************************
+
+ @ManagedAttribute(description = "Is this route the master or a slave")
+ public boolean isLeader() {
+ return leader.get();
+ }
+
+ // ****************************************************
+ // Route managements
+ // ****************************************************
+
+ private synchronized void setLeader(boolean isLeader) {
+ if (isLeader && leader.compareAndSet(false, isLeader)) {
+ LOGGER.debug("Leadership taken");
+ startManagedRoutes();
+ } else if (!isLeader && leader.getAndSet(isLeader)) {
+ LOGGER.debug("Leadership lost");
+ stopManagedRoutes();
+ }
+ }
+
+ private void startManagedRoutes() {
+ if (isLeader()) {
+ doStartManagedRoutes();
+ } else {
+ // If the leadership has been lost in the meanwhile, stop any
+ // eventually started route
+ doStopManagedRoutes();
+ }
+ }
+
+ private void doStartManagedRoutes() {
+ try {
+ for (Route route : stoppedRoutes) {
+ LOGGER.debug("Starting route {}", route.getId());
+ camelContext.startRoute(route.getId());
+ startedRoutes.add(route);
+ }
+
+ stoppedRoutes.removeAll(startedRoutes);
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+
+ private void stopManagedRoutes() {
+ if (isLeader()) {
+ // If became a leader in the meanwhile, start any eventually stopped
+ // route
+ doStartManagedRoutes();
+ } else {
+ doStopManagedRoutes();
+ }
+ }
+
+ private void doStopManagedRoutes() {
+ try {
+ for (Route route : startedRoutes) {
+ LOGGER.debug("Stopping route {}", route.getId());
+ stopRoute(route);
+ stoppedRoutes.add(route);
+ }
+
+ startedRoutes.removeAll(stoppedRoutes);
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+
+ // ****************************************************
+ // Event handling
+ // ****************************************************
+
+ private void onLeadershipEvent(CamelClusterView.Event event, Object payload) {
+ setLeader(clusterView.getLocalMember().isMaster());
+ }
+
+ private class CamelContextStartupListener extends EventNotifierSupport implements StartupListener {
+ @Override
+ public void notify(EventObject event) throws Exception {
+ onCamelContextStarted();
+ }
+
+ @Override
+ public boolean isEnabled(EventObject event) {
+ return event instanceof CamelContextStartedEvent;
+ }
+
+ @Override
+ public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception {
+ if (alreadyStarted) {
+ // Invoke it only if the context was already started as this
+ // method is not invoked at last event as documented but after
+ // routes warm-up so this is useful for routes deployed after
+ // the camel context has been started-up. For standard routes
+ // configuration the notification of the camel context started
+ // is provided by EventNotifier.
+ //
+ // We should check why this callback is not invoked at latest
+ // stage, or maybe rename it as it is misleading and provide a
+ // better alternative for intercept camel events.
+ onCamelContextStarted();
+ }
+ }
+
+ private void onCamelContextStarted() {
+ // Start managing the routes only when the camel context is started
+ // so start/stop of managed routes do not clash with CamelContext
+ // startup
+ if (contextStarted.compareAndSet(false, true)) {
+ clusterView.addEventListener(CamelClusterHelper.leadershipEventFilter(), leadershipEventConsumer);
+ setLeader(clusterView.getLocalMember().isMaster());
+ }
+ }
+ }
+
+ // ****************************************************
+ // Static helpers
+ // ****************************************************
+
+ public static ClusteredRoutePolicy forNamespace(CamelCluster cluster, String namespace) throws Exception {
+ return new ClusteredRoutePolicy(cluster.createView(namespace));
+ }
+
+ public static ClusteredRoutePolicy forView(CamelClusterView view) throws Exception {
+ return new ClusteredRoutePolicy(view);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/b6f1bdd8/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterMember.java
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterMember.java b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterMember.java
deleted file mode 100644
index 6268931..0000000
--- a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterMember.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.atomix.ha;
-
-import io.atomix.group.DistributedGroup;
-import io.atomix.group.GroupMember;
-import org.apache.camel.ha.CamelClusterMember;
-
-class AtomixClusterMember<M extends GroupMember> implements CamelClusterMember {
- private final DistributedGroup group;
- private final M member;
-
- AtomixClusterMember(DistributedGroup group, M member) {
- this.group = group;
- this.member = member;
- }
-
- @Override
- public String getId() {
- return member.id();
- }
-
- @Override
- public boolean isMaster() {
- return group.election().term().leader().equals(member);
- }
-
- M getGroupMember() {
- return member;
- }
-}