You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/11/09 21:31:30 UTC
camel git commit: CAMEL-10426 - Added
CuratorMultiMasterLeaderRoutePolicy
Repository: camel
Updated Branches:
refs/heads/master 33063a86c -> 34fdcb3ca
CAMEL-10426 - Added CuratorMultiMasterLeaderRoutePolicy
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/34fdcb3c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/34fdcb3c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/34fdcb3c
Branch: refs/heads/master
Commit: 34fdcb3ca67aebad9af2a05ee737f32f8d5f5982
Parents: 33063a8
Author: Paolo Antinori <pa...@redhat.com>
Authored: Wed Nov 9 15:07:32 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Nov 9 22:24:35 2016 +0100
----------------------------------------------------------------------
components/camel-zookeeper/pom.xml | 16 +
.../src/main/docs/zookeeper-component.adoc | 9 +-
.../CuratorMultiMasterLeaderElection.java | 171 +++++++
.../CuratorMultiMasterLeaderRoutePolicy.java | 191 ++++++++
...MultiMasterCuratorLeaderRoutePolicyTest.java | 460 +++++++++++++++++++
.../src/test/resources/log4j2.properties | 10 +-
6 files changed, 854 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/pom.xml b/components/camel-zookeeper/pom.xml
index 2021d2c..08dc904 100644
--- a/components/camel-zookeeper/pom.xml
+++ b/components/camel-zookeeper/pom.xml
@@ -105,4 +105,20 @@
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <childDelegation>false</childDelegation>
+ <useFile>true</useFile>
+ <forkCount>1</forkCount>
+ <reuseForks>true</reuseForks>
+ <forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+
</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc b/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc
index e569ea2..5f8d5cf 100644
--- a/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc
+++ b/components/camel-zookeeper/src/main/docs/zookeeper-component.adoc
@@ -269,18 +269,23 @@ from("direct:policy-controlled")
.to("mock:controlled");
----
-There are currently 2 policies defined in the component, with different SLAs:
+There are currently 3 policies defined in the component, with different SLAs:
* `ZooKeeperRoutePolicy`
* `CuratorLeaderRoutePolicy` (since *2.19*)
+* `MultiMasterCuratorLeaderRoutePolicy` (since *2.19*)
*ZooKeeperRoutePolicy* supports multiple active nodes, but it's activation kicks in only after a Camel component and its correspondent Consumer have already been started,
this introduces, depending on your routes definition, the risk that you component can already start consuming events and producing `Exchange`s, before the policy could estabilish
that the node should not be activated.
-
+
*CuratorLeaderRoutePolicy* supports only a single active node, but it's bound to a different `CamelContext` lifecycle method; this Policy kicks in before any route or consumer is started
thus you can be sure that no even is processed before the Policy takes its decision.
+*MultiMasterCuratorLeaderRoutePolicy* support multiple active nodes, and it's bound to the same lifecycle method as `CuratorLeaderRoutePolicy`; this Policy kicks in before any route or consumer is started
+ thus you can be sure that no even is processed before the Policy takes its decision.
+
+
[[Zookeeper-SeeAlso]]
See Also
^^^^^^^^
http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java
new file mode 100644
index 0000000..9c91b73
--- /dev/null
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderElection.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.policy;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.StatefulService;
+import org.apache.camel.impl.JavaUuidGenerator;
+import org.apache.camel.spi.UuidGenerator;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
+import org.apache.curator.framework.recipes.locks.Lease;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>CuratorMultiMasterLeaderElection</code> uses the leader election capabilities of a
+ * ZooKeeper cluster to control which nodes are enabled. It is typically used in
+ * fail-over scenarios controlling identical instances of an application across
+ * a cluster of Camel based servers. <p> The election is configured providing the number of instances that are required
+ * to be active..
+ * <p> All instances of the election must also be configured with the same path on the ZooKeeper
+ * cluster where the election will be carried out. It is good practice for this
+ * to indicate the application e.g. <tt>/someapplication/someroute/</tt> note
+ * that these nodes should exist before using the election. <p> See <a
+ * href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection">
+ * for more on how Leader election</a> is archived with ZooKeeper.
+ */
+public class CuratorMultiMasterLeaderElection implements ConnectionStateListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CuratorMultiMasterLeaderElection.class);
+
+ private final String candidateName;
+ private final List<ElectionWatcher> watchers = new ArrayList<ElectionWatcher>();
+ private final int desiredActiveNodes;
+ private AtomicBoolean activeNode = new AtomicBoolean(false);
+ private UuidGenerator uuidGenerator = new JavaUuidGenerator();
+ private InterProcessSemaphoreV2 leaderSelector;
+ private CuratorFramework client;
+ private Lease lease;
+
+ public CuratorMultiMasterLeaderElection(String uri, int desiredActiveNodes) {
+ this.candidateName = createCandidateName();
+ this.desiredActiveNodes = desiredActiveNodes;
+
+ String connectionString = uri.substring(1 + uri.indexOf(':')).split("/")[0];
+ String protocol = uri.substring(0, uri.indexOf(':'));
+ String path = uri.replace(protocol + ":" + connectionString, "");
+ client = CuratorFrameworkFactory.newClient(connectionString, new ExponentialBackoffRetry(1000, 3));
+ client.getConnectionStateListenable().addListener(this);
+ leaderSelector = new InterProcessSemaphoreV2(client, path, this.desiredActiveNodes);
+ client.start();
+
+
+ }
+
+ // stolen from org/apache/camel/processor/CamelInternalProcessor
+ public static boolean isCamelStopping(CamelContext context) {
+ if (context instanceof StatefulService) {
+ StatefulService ss = (StatefulService) context;
+ return ss.isStopping() || ss.isStopped();
+ }
+ return false;
+ }
+
+ public void shutdownClients() {
+ try {
+ leaderSelector.returnLease(lease);
+ } finally {
+ client.close();
+ }
+ }
+
+ /*
+ * Blocking method
+ */
+ public void requestResource() {
+ LOG.info("Requested to become active from {}", candidateName);
+ try {
+ lease = leaderSelector.acquire();
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to obtain access to become a leader node.");
+ }
+ LOG.info("{} is now active", candidateName);
+ activeNode.set(true);
+ notifyElectionWatchers();
+ }
+
+ public boolean isMaster() {
+ return activeNode.get();
+ }
+
+ private String createCandidateName() {
+ StringBuilder builder = new StringBuilder();
+ try {
+ /* UUID would be enough, also using hostname for human readability */
+ builder.append(InetAddress.getLocalHost().getCanonicalHostName());
+ } catch (UnknownHostException ex) {
+ LOG.warn("Failed to get the local hostname.", ex);
+ builder.append("unknown-host");
+ }
+ builder.append("-").append(uuidGenerator.generateUuid());
+ return builder.toString();
+ }
+
+ public String getCandidateName() {
+ return candidateName;
+ }
+
+ private void notifyElectionWatchers() {
+ for (ElectionWatcher watcher : watchers) {
+ try {
+ watcher.electionResultChanged();
+ } catch (Exception e) {
+ LOG.warn("Election watcher " + watcher + " of type " + watcher.getClass() + " threw an exception.", e);
+ }
+ }
+ }
+
+ public boolean addElectionWatcher(ElectionWatcher e) {
+ return watchers.add(e);
+ }
+
+ @Override
+ public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+ switch (connectionState) {
+ case SUSPENDED:
+ case LOST:
+ LOG.info("Received {} state from connection. Giving up lock.", connectionState);
+
+ try {
+ leaderSelector.returnLease(lease);
+ } finally {
+ this.activeNode.set(false);
+ notifyElectionWatchers();
+ }
+
+ break;
+ default:
+ LOG.info("Connection state changed: {}", connectionState);
+ requestResource();
+
+ }
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java
new file mode 100644
index 0000000..b220ee4
--- /dev/null
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/CuratorMultiMasterLeaderRoutePolicy.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.policy;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.camel.NonManagedService;
+import org.apache.camel.Route;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.support.RoutePolicySupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ * <code>CuratorMultiMasterLeaderRoutePolicy</code> uses Apache Curator InterProcessSemaphoreV2 receipe to implement the behavior of having
+ * at multiple active instances of a route, controlled by a specific policy, running. It is typically used in
+ * fail-over scenarios controlling identical instances of a route across a cluster of Camel based servers.
+ * <p>
+ * The policy affects the normal startup lifecycle of CamelContext and Routes, automatically set autoStart property of
+ * routes controlled by this policy to false.
+ * After Curator receipe identifies the current Policy instance as the Leader between a set of clients that are
+ * competing for the role, it will start the route, and only at that moment the route will start its business.
+ * This specific behavior is designed to avoid scenarios where such a policy would kick in only after a route had
+ * already been started, with the risk, for consumers for example, that some source event might have already been
+ * consumed.
+ * <p>
+ * All instances of the policy must also be configured with the same path on the
+ * ZooKeeper cluster where the election will be carried out. It is good practice
+ * for this to indicate the application e.g. <tt>/someapplication/someroute/</tt> note
+ * that these nodes should exist before using the policy.
+ * <p>
+ * See <a href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection">
+ * for more on how Leader election</a> is archived with ZooKeeper.
+ */
+public class CuratorMultiMasterLeaderRoutePolicy extends RoutePolicySupport implements ElectionWatcher, NonManagedService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CuratorMultiMasterLeaderRoutePolicy.class);
+ private final String uri;
+ private final Lock lock = new ReentrantLock();
+ private final Set<Route> suspendedRoutes = new CopyOnWriteArraySet<Route>();
+ private final AtomicBoolean shouldProcessExchanges = new AtomicBoolean();
+ private volatile boolean shouldStopRoute = true;
+ private final int enabledCount;
+
+
+ private final Lock electionLock = new ReentrantLock();
+
+ private CuratorMultiMasterLeaderElection election;
+
+ public CuratorMultiMasterLeaderRoutePolicy(String uri, int enabledCount) {
+ this.uri = uri;
+ this.enabledCount = enabledCount;
+ }
+ public CuratorMultiMasterLeaderRoutePolicy(String uri) {
+ this(uri, 1);
+ }
+
+ @Override
+ public void onInit(Route route) {
+ ensureElectionIsCreated();
+ LOG.info("Route managed by {}. Setting route [{}] AutoStartup flag to false.", this.getClass(), route.getId());
+ route.getRouteContext().getRoute().setAutoStartup("false");
+
+
+ if (election.isMaster()) {
+ if (shouldStopRoute) {
+ startManagedRoute(route);
+ }
+ } else {
+ if (shouldStopRoute) {
+ stopManagedRoute(route);
+ }
+ }
+
+ }
+
+ private void ensureElectionIsCreated() {
+ if (election == null) {
+ electionLock.lock();
+ try {
+ if (election == null) { // re-test
+ election = new CuratorMultiMasterLeaderElection(uri, enabledCount);
+ election.addElectionWatcher(this);
+
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ electionLock.unlock();
+ }
+ }
+ }
+
+ private void startManagedRoute(Route route) {
+ try {
+ lock.lock();
+ if (suspendedRoutes.contains(route)) {
+ startRoute(route);
+ suspendedRoutes.remove(route);
+ }
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void stopManagedRoute(Route route) {
+ try {
+ lock.lock();
+ // check that we should still suspend once the lock is acquired
+ if (!suspendedRoutes.contains(route) && !shouldProcessExchanges.get()) {
+ stopRoute(route);
+ suspendedRoutes.add(route);
+ }
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void electionResultChanged() {
+ if (election.isMaster()) {
+ startAllStoppedRoutes();
+ }
+ }
+
+ private void startAllStoppedRoutes() {
+ try {
+ lock.lock();
+
+ if (!suspendedRoutes.isEmpty()) {
+ if (log.isDebugEnabled()) {
+ log.info("{} route(s) have been stopped previously by policy, restarting.", suspendedRoutes.size());
+ }
+ for (Route suspended : suspendedRoutes) {
+ DefaultCamelContext ctx = (DefaultCamelContext)suspended.getRouteContext().getCamelContext();
+ while (!ctx.isStarted()) {
+ log.info("Context {} is not started yet. Sleeping for a bit.", ctx.getName());
+ Thread.sleep(5000);
+ }
+ log.info("Starting route [{}] defined in context [{}].", suspended.getId(), ctx.getName());
+ startRoute(suspended);
+ }
+ suspendedRoutes.clear();
+ }
+
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ try {
+ electionLock.lock();
+ election.shutdownClients();
+ election = null;
+ } finally {
+ electionLock.unlock();
+ }
+ }
+
+ public CuratorMultiMasterLeaderElection getElection() {
+ return election;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java
new file mode 100644
index 0000000..a6c9946
--- /dev/null
+++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/MultiMasterCuratorLeaderRoutePolicyTest.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.policy;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+
+public class MultiMasterCuratorLeaderRoutePolicyTest extends ZooKeeperTestSupport {
+ public static final String ZNODE = "/multimaster";
+ public static final String BASE_ZNODE = "/someapp";
+ private static final Logger LOG = LoggerFactory.getLogger(MultiMasterCuratorLeaderRoutePolicyTest.class);
+
+
+ protected CamelContext createCamelContext() throws Exception {
+ disableJMX();
+ return super.createCamelContext();
+ }
+
+
+ @Test
+ public void ensureRoutesDoNotStartAutomatically() throws Exception {
+ DefaultCamelContext context = new DefaultCamelContext();
+
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ CuratorMultiMasterLeaderRoutePolicy policy = new CuratorMultiMasterLeaderRoutePolicy("zookeeper:localhost:" + getServerPort() + BASE_ZNODE + ZNODE + 2);
+ from("timer://foo?fixedRate=true&period=5").routePolicy(policy).id("single_route").autoStartup(true).to("mock:controlled");
+ }
+ });
+ context.start();
+ // this check verifies that a route marked as autostartable is not started automatically. It will be the policy responsibility to eventually start it.
+ assertThat(context.getRouteStatus("single_route").isStarted(), is(false));
+ assertThat(context.getRouteStatus("single_route").isStarting(), is(false));
+ try {
+ context.shutdown();
+ } catch (Exception e) {
+ //concurrency can raise some InterruptedException but we don't really care in this scenario.
+ }
+ }
+
+ @Test
+ public void oneMasterOneSlaveScenarioContolledByPolicy() throws Exception {
+ final String path = "oneMasterOneSlaveScenarioContolledByPolicy";
+ final String firstDestination = "first" + System.currentTimeMillis();
+ final String secondDestination = "second" + System.currentTimeMillis();
+ final CountDownLatch waitForSecondRouteCompletedLatch = new CountDownLatch(1);
+ final int activeNodesDesired = 1;
+
+ MultiMasterZookeeperPolicyEnforcedContext first = createEnforcedContext(firstDestination, activeNodesDesired, path);
+ DefaultCamelContext controlledContext = (DefaultCamelContext) first.controlledContext;
+ // get reference to the Policy object to check if it's already a master
+ CuratorMultiMasterLeaderRoutePolicy routePolicy = (CuratorMultiMasterLeaderRoutePolicy) controlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
+
+ assertWeHaveMasters(routePolicy);
+
+ LOG.info("Starting first CamelContext");
+ final MultiMasterZookeeperPolicyEnforcedContext[] arr = new MultiMasterZookeeperPolicyEnforcedContext[1];
+
+ new Thread() {
+ @Override
+ public void run() {
+ MultiMasterZookeeperPolicyEnforcedContext second = null;
+ try {
+ LOG.info("Starting second CamelContext in a separate thread");
+ second = createEnforcedContext(secondDestination, activeNodesDesired, path);
+ arr[0] = second;
+ second.sendMessageToEnforcedRoute("message for second", 0);
+ waitForSecondRouteCompletedLatch.countDown();
+ } catch (Exception e) {
+ LOG.error("Error in the thread controlling the second context", e);
+ fail("Error in the thread controlling the second context: " + e.getMessage());
+ }
+
+
+ }
+ }.start();
+
+ first.sendMessageToEnforcedRoute("message for first", 1);
+
+ waitForSecondRouteCompletedLatch.await(2, TimeUnit.MINUTES);
+ LOG.info("Explicitly shutting down the first camel context.");
+
+ LOG.info("Shutting down first con");
+ first.shutdown();
+
+ MultiMasterZookeeperPolicyEnforcedContext second = arr[0];
+
+ DefaultCamelContext secondCamelContext = (DefaultCamelContext) second.controlledContext;
+ assertWeHaveMasters((CuratorMultiMasterLeaderRoutePolicy)secondCamelContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0));
+
+ //second.mock = secondCamelContext.getEndpoint("mock:controlled", MockEndpoint.class);
+ second.sendMessageToEnforcedRoute("message for slave", 1);
+ second.shutdown();
+ }
+
+
+ @Test
+ public void oneMasterOneSlaveAndFlippedAgainScenarioContolledByPolicy() throws Exception {
+ final String path = "oneMasterOneSlaveScenarioContolledByPolicy";
+ final String firstDestination = "first" + System.currentTimeMillis();
+ final String secondDestination = "second" + System.currentTimeMillis();
+ final CountDownLatch waitForSecondRouteCompletedLatch = new CountDownLatch(1);
+ final int activeNodeDesired = 1;
+
+ MultiMasterZookeeperPolicyEnforcedContext first = createEnforcedContext(firstDestination, activeNodeDesired, path);
+ DefaultCamelContext controlledContext = (DefaultCamelContext) first.controlledContext;
+ // get reference to the Policy object to check if it's already a master
+ CuratorMultiMasterLeaderRoutePolicy routePolicy = (CuratorMultiMasterLeaderRoutePolicy) controlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
+
+ assertWeHaveMasters(routePolicy);
+
+ LOG.info("Starting first CamelContext");
+ final MultiMasterZookeeperPolicyEnforcedContext[] arr = new MultiMasterZookeeperPolicyEnforcedContext[1];
+
+ new Thread() {
+ @Override
+ public void run() {
+ MultiMasterZookeeperPolicyEnforcedContext slave = null;
+ try {
+ LOG.info("Starting second CamelContext in a separate thread");
+ slave = createEnforcedContext(secondDestination, activeNodeDesired, path);
+ arr[0] = slave;
+ slave.sendMessageToEnforcedRoute("message for second", 0);
+ waitForSecondRouteCompletedLatch.countDown();
+ } catch (Exception e) {
+ LOG.error("Error in the thread controlling the second context", e);
+ fail("Error in the thread controlling the second context: " + e.getMessage());
+ }
+
+
+ }
+ }.start();
+
+ first.sendMessageToEnforcedRoute("message for first", 1);
+
+ waitForSecondRouteCompletedLatch.await(2, TimeUnit.MINUTES);
+ MultiMasterZookeeperPolicyEnforcedContext second = arr[0];
+
+ LOG.info("Explicitly shutting down the first camel context.");
+ first.shutdown();
+
+ DefaultCamelContext secondCamelContext = (DefaultCamelContext) second.controlledContext;
+ assertWeHaveMasters((CuratorMultiMasterLeaderRoutePolicy)secondCamelContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0));
+
+ CountDownLatch restartFirstLatch = new CountDownLatch(1);
+ LOG.info("Start back first context");
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ first.startup();
+ restartFirstLatch.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+ restartFirstLatch.await();
+ second.sendMessageToEnforcedRoute("message for second", 1);
+ first.mock.reset();
+ first.sendMessageToEnforcedRoute("message for first", 0);
+ second.shutdown();
+ controlledContext = (DefaultCamelContext) first.controlledContext;
+ // get reference to the Policy object to check if it's already a master
+ routePolicy = (CuratorMultiMasterLeaderRoutePolicy) controlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
+ log.info("Asserting route is up. context: [{}]", controlledContext.getName());
+ assertWeHaveMasters(routePolicy);
+ first.controlledContext.setTracing(true);
+ first.mock = controlledContext.getEndpoint("mock:controlled", MockEndpoint.class);
+ first.sendMessageToEnforcedRoute("message for first", 1);
+ first.shutdown();
+ }
+
+
+
+
+ @Test
+ public void oneMasterTwoSlavesScenarioContolledByPolicy() throws Exception {
+ final String path = "oneMasterTwoSlavesScenarioContolledByPolicy";
+ final String master = "master" + System.currentTimeMillis();
+ final String secondDestination = "second" + System.currentTimeMillis();
+ final String thirdDestination = "third" + System.currentTimeMillis();
+ final CountDownLatch waitForNonActiveRoutesLatch = new CountDownLatch(2);
+ final int activeNodesDesired = 1;
+
+ LOG.info("Starting first CamelContext");
+ MultiMasterZookeeperPolicyEnforcedContext first = createEnforcedContext(master, activeNodesDesired, path);
+ DefaultCamelContext controlledContext = (DefaultCamelContext) first.controlledContext;
+ // get reference to the Policy object to check if it's already a master
+ CuratorMultiMasterLeaderRoutePolicy routePolicy = (CuratorMultiMasterLeaderRoutePolicy) controlledContext.getRouteDefinition(master).getRoutePolicies().get(0);
+
+ assertWeHaveMasters(routePolicy);
+
+ final MultiMasterZookeeperPolicyEnforcedContext[] arr = new MultiMasterZookeeperPolicyEnforcedContext[2];
+
+ new Thread() {
+ @Override
+ public void run() {
+ MultiMasterZookeeperPolicyEnforcedContext second = null;
+ try {
+ LOG.info("Starting second CamelContext");
+ second = createEnforcedContext(secondDestination, activeNodesDesired, path);
+ arr[0] = second;
+ second.sendMessageToEnforcedRoute("message for second", 0);
+ waitForNonActiveRoutesLatch.countDown();
+ } catch (Exception e) {
+ LOG.error("Error in the thread controlling the second context", e);
+ fail("Error in the thread controlling the second context: " + e.getMessage());
+ }
+
+
+ }
+ }.start();
+
+ new Thread() {
+ @Override
+ public void run() {
+ MultiMasterZookeeperPolicyEnforcedContext third = null;
+ try {
+ LOG.info("Starting third CamelContext");
+ third = createEnforcedContext(thirdDestination, activeNodesDesired, path);
+ arr[1] = third;
+ third.sendMessageToEnforcedRoute("message for third", 0);
+ waitForNonActiveRoutesLatch.countDown();
+ } catch (Exception e) {
+ LOG.error("Error in the thread controlling the third context", e);
+ fail("Error in the thread controlling the third context: " + e.getMessage());
+ }
+
+
+ }
+ }.start();
+
+ // Send messages to the master and the slave.
+ // The route is enabled in the master and gets through, but that sent to
+ // the slave context is rejected.
+ first.sendMessageToEnforcedRoute("message for master", 1);
+
+ waitForNonActiveRoutesLatch.await();
+ LOG.info("Explicitly shutting down the first camel context.");
+ // trigger failover by killing the master..
+ first.shutdown();
+ // let's find out who's active now:
+
+ CuratorMultiMasterLeaderRoutePolicy routePolicySecond = (CuratorMultiMasterLeaderRoutePolicy) arr[0].controlledContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0);
+ CuratorMultiMasterLeaderRoutePolicy routePolicyThird = (CuratorMultiMasterLeaderRoutePolicy) arr[1].controlledContext.getRouteDefinition(thirdDestination).getRoutePolicies().get(0);
+
+ MultiMasterZookeeperPolicyEnforcedContext newMaster = null;
+ MultiMasterZookeeperPolicyEnforcedContext slave = null;
+
+ final int maxWait = 20;
+ for (int i = 0; i < maxWait; i++) {
+ if (routePolicySecond.getElection().isMaster()) {
+ newMaster = arr[0];
+ slave = arr[1];
+ LOG.info("[second] is the new master");
+ break;
+ } else if (routePolicyThird.getElection().isMaster()) {
+ newMaster = arr[1];
+ slave = arr[0];
+ LOG.info("[third] is the new master");
+ break;
+ } else {
+ Thread.sleep(2000);
+ LOG.info("waiting for a new master to be elected");
+ }
+ }
+ assertThat(newMaster, is(notNullValue()));
+
+ newMaster.sendMessageToEnforcedRoute("message for second", 1);
+ slave.sendMessageToEnforcedRoute("message for third", 0);
+ slave.shutdown();
+ newMaster.shutdown();
+ }
+
+
+ @Test
+ public void twoMasterOneSlavesScenarioContolledByPolicy() throws Exception {
+ final String path = "twoMasterOneSlavesScenarioContolledByPolicy";
+ final String firstDestination = "first" + System.currentTimeMillis();
+ final String secondDestination = "second" + System.currentTimeMillis();
+ final String thirdDestination = "third" + System.currentTimeMillis();
+ final CountDownLatch waitForThirdRouteCompletedLatch = new CountDownLatch(1);
+ final int activeNodeDesired = 2;
+
+ MultiMasterZookeeperPolicyEnforcedContext first = createEnforcedContext(firstDestination, activeNodeDesired, path);
+ DefaultCamelContext firstControlledContext = (DefaultCamelContext) first.controlledContext;
+ CuratorMultiMasterLeaderRoutePolicy firstRoutePolicy = (CuratorMultiMasterLeaderRoutePolicy) firstControlledContext.getRouteDefinition(firstDestination).getRoutePolicies().get(0);
+
+ MultiMasterZookeeperPolicyEnforcedContext second = createEnforcedContext(secondDestination, activeNodeDesired, path);
+ DefaultCamelContext secondControlledContext = (DefaultCamelContext) second.controlledContext;
+ CuratorMultiMasterLeaderRoutePolicy secondRoutePolicy = (CuratorMultiMasterLeaderRoutePolicy) secondControlledContext.getRouteDefinition(secondDestination).getRoutePolicies().get(0);
+
+ assertWeHaveMasters(firstRoutePolicy, secondRoutePolicy);
+
+ final MultiMasterZookeeperPolicyEnforcedContext[] arr = new MultiMasterZookeeperPolicyEnforcedContext[1];
+
+
+ new Thread() {
+ @Override
+ public void run() {
+ MultiMasterZookeeperPolicyEnforcedContext third = null;
+ try {
+ LOG.info("Starting third CamelContext");
+ third = createEnforcedContext(thirdDestination, activeNodeDesired, path);
+ arr[0] = third;
+ third.sendMessageToEnforcedRoute("message for third", 0);
+ waitForThirdRouteCompletedLatch.countDown();
+ } catch (Exception e) {
+ LOG.error("Error in the thread controlling the third context", e);
+ fail("Error in the thread controlling the third context: " + e.getMessage());
+ }
+
+
+ }
+ }.start();
+
+ first.sendMessageToEnforcedRoute("message for first", 1);
+ second.sendMessageToEnforcedRoute("message for second", 1);
+
+
+ waitForThirdRouteCompletedLatch.await();
+
+ LOG.info("Explicitly shutting down the first camel context.");
+ first.shutdown();
+
+
+ arr[0].sendMessageToEnforcedRoute("message for third", 1);
+ second.shutdown();
+ arr[0].shutdown();
+ }
+
+ void assertWeHaveMasters(CuratorMultiMasterLeaderRoutePolicy... routePolicies) throws InterruptedException {
+ final int maxWait = 20;
+ boolean global = false;
+ for (int i = 0; i < maxWait; i++) {
+ boolean iteration = true;
+ for (CuratorMultiMasterLeaderRoutePolicy policy : routePolicies) {
+ log.info("Policy: {}, master: {}", policy, policy.getElection().isMaster());
+ iteration = iteration & policy.getElection().isMaster();
+ }
+ if (iteration) {
+ LOG.info("the number of required active routes is available");
+ global = true;
+ break;
+ } else {
+ Thread.sleep(2000);
+ LOG.info("waiting routes to become leader and be activated.");
+ }
+ }
+ if (!global) {
+ fail("The expected number of route never became master");
+ }
+ }
+
+
+ private class MultiMasterZookeeperPolicyEnforcedContext {
+ CamelContext controlledContext;
+ ProducerTemplate template;
+ MockEndpoint mock;
+ String routename;
+ String path;
+
+ MultiMasterZookeeperPolicyEnforcedContext(String name, int activeNodesDesired, String path) throws Exception {
+ controlledContext = new DefaultCamelContext();
+ routename = name;
+ this.path = path;
+ template = controlledContext.createProducerTemplate();
+ mock = controlledContext.getEndpoint("mock:controlled", MockEndpoint.class);
+ controlledContext.addRoutes(new FailoverRoute(name, activeNodesDesired, path));
+ controlledContext.start();
+ }
+
+ public void sendMessageToEnforcedRoute(String message, int expected) throws InterruptedException {
+ mock.expectedMessageCount(expected);
+ try {
+ LOG.info("Sending message to: {}", "vm:" + routename);
+ template.sendBody("vm:" + routename, ExchangePattern.InOut, message);
+ } catch (Exception e) {
+ if (expected > 0) {
+ LOG.error(e.getMessage(), e);
+ fail("Expected messages...");
+ }
+ }
+ mock.await(2, TimeUnit.SECONDS);
+ mock.assertIsSatisfied(2000);
+ }
+
+ public void shutdown() throws Exception {
+ LogFactory.getLog(getClass()).debug("stopping");
+ controlledContext.stop();
+ LogFactory.getLog(getClass()).debug("stopped");
+ }
+
+
+ public void startup() throws Exception {
+ LogFactory.getLog(getClass()).debug("starting");
+ controlledContext.start();
+ LogFactory.getLog(getClass()).debug("started");
+ }
+ }
+
+ private MultiMasterZookeeperPolicyEnforcedContext createEnforcedContext(String name, int activeNodesDesired, String path) throws Exception, InterruptedException {
+ MultiMasterZookeeperPolicyEnforcedContext context = new MultiMasterZookeeperPolicyEnforcedContext(name, activeNodesDesired, path);
+ delay(1000);
+ return context;
+ }
+
+ public class FailoverRoute extends RouteBuilder {
+
+ private String path;
+ private String routename;
+ private int activeNodesDesired;
+
+ public FailoverRoute(String routename, int activeNodesDesired, String path) {
+ // need names as if we use the same direct ep name in two contexts
+ // in the same vm shutting down one context shuts the endpoint for
+ // both.
+ this.routename = routename;
+ this.activeNodesDesired = activeNodesDesired;
+ this.path = path;
+ }
+
+ public void configure() throws Exception {
+ CuratorMultiMasterLeaderRoutePolicy policy = new CuratorMultiMasterLeaderRoutePolicy("zookeeper:localhost:" + getServerPort() + BASE_ZNODE + ZNODE + "/" + path, this.activeNodesDesired);
+ from("vm:" + routename).routePolicy(policy).id(routename).to("mock:controlled");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/34fdcb3c/components/camel-zookeeper/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/resources/log4j2.properties b/components/camel-zookeeper/src/test/resources/log4j2.properties
index 81447cc..7536ead 100644
--- a/components/camel-zookeeper/src/test/resources/log4j2.properties
+++ b/components/camel-zookeeper/src/test/resources/log4j2.properties
@@ -23,18 +23,26 @@ appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
appender.out.type = Console
appender.out.name = out
appender.out.layout.type = PatternLayout
+# appender.out.layout.pattern = %highlight{%d [%t] %-5level: %msg%n%throwable}{FATAL=red, ERROR=red, WARN=blue, INFO=black, DEBUG=grey, TRACE=blue}
appender.out.layout.pattern = [%t] %c{1} %-5p %m%n
+
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.camel-zookeeper.name = org.apache.camel.component.zookeeper
logger.camel-zookeeper.level = INFO
+logger.camel-zookeeper-policy.name = org.apache.camel.component.zookeeper.policy
+logger.camel-zookeeper-policy.level = INFO
logger.camel-support.name = org.apache.camel.support
logger.camel-support.level = INFO
+logger.camel.name = org.apache.camel
+logger.camel.level = INFO
+
+
logger.springframework.name = org.springframework
logger.springframework.level = WARN
rootLogger.level = INFO
-# rootLogger.appenderRef.stdout.ref = out
+#rootLogger.appenderRef.stdout.ref = out
rootLogger.appenderRef.file.ref = file