You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2012/09/03 10:58:31 UTC
svn commit: r1380174 - in /camel/trunk/components/camel-zookeeper/src:
main/java/org/apache/camel/component/zookeeper/policy/
test/java/org/apache/camel/component/zookeeper/
test/java/org/apache/camel/component/zookeeper/policy/
Author: ningjiang
Date: Mon Sep 3 08:58:31 2012
New Revision: 1380174
URL: http://svn.apache.org/viewvc?rev=1380174&view=rev
Log:
CAMEL-5546 Applied the patch with thanks to Andrew
Added:
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java
Modified:
camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java
camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java
Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java?rev=1380174&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ElectionWatcher.java Mon Sep 3 08:58:31 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.policy;
+
+
+public interface ElectionWatcher {
+ /**
+ * This method is called when there is a potential change to the master.
+ * Implementations should call "isMaster" on their ZookeeperElection
+ * instance to re-validate their status.
+ */
+ void electionResultChanged();
+}
Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java?rev=1380174&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperElection.java Mon Sep 3 08:58:31 2012
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.policy;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.zookeeper.SequenceComparator;
+import org.apache.camel.component.zookeeper.ZooKeeperEndpoint;
+import org.apache.camel.component.zookeeper.ZooKeeperMessage;
+import org.apache.camel.impl.JavaUuidGenerator;
+import org.apache.camel.spi.UuidGenerator;
+
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>ZooKeeperElection</code> uses the leader election capabilities of a
+ * ZooKeeper cluster to control which nodes are enabled. It is typically used in
+ * fail-over scenarios controlling identical instances of an application across
+ * a cluster of Camel based servers. <p> The election is configured with a 'top
+ * n' number of servers that should be marked as master, for a simple
+ * master/slave scenario this would be 1. Each instance will execute the
+ * election algorithm to obtain its position in the hierarchy of servers, if it
+ * is within the 'top n' servers then the node is enabled and isMaster() will
+ * return 'true'. If not it waits for a change in the leader hierarchy and then
+ * reruns this scenario to see if it is now in the top n. <p> All instances of
+ * the election must also be configured with the same path on the ZooKeeper
+ * cluster where the election will be carried out. It is good practice for this
+ * to indicate the application e.g. <tt>/someapplication/someroute/</tt> note
+ * that these nodes should exist before using the election. <p> See <a
+ * href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection">
+ * for more on how Leader election</a> is archived with ZooKeeper.
+ */
+public class ZooKeeperElection {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperElection.class);
+ private final ProducerTemplate producerTemplate;
+ private final CamelContext camelContext;
+ private final String uri;
+ private final String candidateName;
+ private final Lock lock = new ReentrantLock();
+ private final CountDownLatch electionComplete = new CountDownLatch(1);
+ private AtomicBoolean masterNode = new AtomicBoolean();
+ private volatile boolean isCandidateCreated;
+ private int enabledCount = 1;
+ private UuidGenerator uuidGenerator = new JavaUuidGenerator();
+ private final List<ElectionWatcher> watchers = new ArrayList<ElectionWatcher>();
+
+ public ZooKeeperElection(CamelContext camelContext, String uri, int enabledCount) {
+ this(camelContext.createProducerTemplate(), camelContext, uri, enabledCount);
+ }
+
+ public ZooKeeperElection(ProducerTemplate producerTemplate, CamelContext camelContext, String uri, int enabledCount) {
+ this.camelContext = camelContext;
+ this.producerTemplate = producerTemplate;
+ this.uri = uri;
+ this.enabledCount = enabledCount;
+ this.candidateName = createCandidateName();
+ }
+
+ public boolean isMaster() {
+ if (!isCandidateCreated) {
+ testAndCreateCandidateNode();
+ awaitElectionResults();
+
+ }
+ return masterNode.get();
+ }
+
+ private String createCandidateName() {
+ StringBuilder builder = new StringBuilder();
+ try {
+ /* UUID would be enough, also using hostname for human readability */
+ builder.append(InetAddress.getLocalHost().getCanonicalHostName());
+ } catch (UnknownHostException ex) {
+ LOG.warn("Failed to get the local hostname.", ex);
+ builder.append("unknown-host");
+ }
+ builder.append("-").append(uuidGenerator.generateUuid());
+ return builder.toString();
+ }
+
+ private void testAndCreateCandidateNode() {
+ try {
+ lock.lock();
+ if (!isCandidateCreated) {
+ createCandidateNode(camelContext);
+ isCandidateCreated = true;
+ }
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void awaitElectionResults() {
+ while (electionComplete.getCount() > 0) {
+ try {
+ LOG.debug("Awaiting election results...");
+ electionComplete.await();
+ } catch (InterruptedException e1) {
+ }
+ }
+ }
+
+ private ZooKeeperEndpoint createCandidateNode(CamelContext camelContext) {
+ LOG.info("Initializing ZookeeperElection with uri '{}'", uri);
+ ZooKeeperEndpoint zep = camelContext.getEndpoint(uri, ZooKeeperEndpoint.class);
+ zep.getConfiguration().setCreate(true);
+ String fullpath = createFullPathToCandidate(zep);
+ Exchange e = zep.createExchange();
+ e.setPattern(ExchangePattern.InOut);
+ e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_NODE, fullpath);
+ e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_CREATE_MODE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ producerTemplate.send(zep, e);
+
+ if (e.isFailed()) {
+ LOG.error("Error setting up election node " + fullpath, e.getException());
+ } else {
+ LOG.info("Candidate node '{}' has been created", fullpath);
+ try {
+ if (zep != null) {
+ camelContext.addRoutes(new ElectoralMonitorRoute(zep));
+ }
+ } catch (Exception ex) {
+ LOG.error("Error configuring ZookeeperElection", ex);
+ }
+ }
+ return zep;
+
+ }
+
+ private String createFullPathToCandidate(ZooKeeperEndpoint zep) {
+ String fullpath = zep.getConfiguration().getPath();
+ if (!fullpath.endsWith("/")) {
+ fullpath += "/";
+ }
+ fullpath += candidateName;
+ return fullpath;
+ }
+
+ private void handleException(Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+
+ private void notifyElectionWatchers() {
+ for (ElectionWatcher watcher : watchers) {
+ try {
+ watcher.electionResultChanged();
+ } catch (Exception e) {
+ LOG.warn("Election watcher " + watcher + " of type " + watcher.getClass() + " threw an exception.", e);
+ }
+ }
+ }
+
+ public boolean addElectionWatcher(ElectionWatcher e) {
+ return watchers.add(e);
+ }
+
+ public boolean removeElectionWatcher(ElectionWatcher o) {
+ return watchers.remove(o);
+ }
+
+ private class ElectoralMonitorRoute extends RouteBuilder {
+
+ private SequenceComparator comparator = new SequenceComparator();
+ private ZooKeeperEndpoint zep;
+
+ public ElectoralMonitorRoute(ZooKeeperEndpoint zep) {
+ this.zep = zep;
+ zep.getConfiguration().setListChildren(true);
+ zep.getConfiguration().setSendEmptyMessageOnDelete(true);
+ zep.getConfiguration().setRepeat(true);
+ }
+
+ @Override
+ public void configure() throws Exception {
+
+ /**
+ * TODO: this is cheap cheerful but suboptimal; it suffers from the
+ * 'herd effect' that on any change to the candidates list every
+ * policy instance will ask for the entire candidate list again.
+ * This is fine for small numbers of nodes (for scenarios like
+ * Master-Slave it is perfect) but could get noisy if large numbers
+ * of nodes were involved. <p> Better would be to find the position
+ * of this node in the list and watch the node in the position ahead
+ * node ahead of this and only request the candidate list when its
+ * status changes. This will require enhancing the consumer to allow
+ * custom operation lists.
+ */
+ from(zep).id("election-route-" + candidateName.substring(0, 8)).sort(body(), comparator).process(new Processor() {
+ @Override
+ public void process(Exchange e) throws Exception {
+ @SuppressWarnings("unchecked")
+ List<String> candidates = e.getIn().getMandatoryBody(List.class);
+
+ int location = Math.abs(Collections.binarySearch(candidates, candidateName));
+ /**
+ * check if the item at this location starts with this nodes
+ * candidate name
+ */
+ if (isOurCandidateAtLocationInCandidatesList(candidates, location)) {
+
+ masterNode.set(location <= enabledCount);
+ LOG.debug(
+ "This node is number '{}' on the candidate list, election is configured for the top '{}'. this node will be {}",
+ new Object[]{location, enabledCount, masterNode.get() ? "enabled" : "disabled"}
+ );
+ }
+ electionComplete.countDown();
+
+ notifyElectionWatchers();
+ }
+
+ private boolean isOurCandidateAtLocationInCandidatesList(List<String> candidates, int location) {
+ return location <= candidates.size() && candidates.get(location - 1).startsWith(candidateName);
+ }
+ });
+ }
+ }
+}
Modified: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java?rev=1380174&r1=1380173&r2=1380174&view=diff
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java (original)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java Mon Sep 3 08:58:31 2012
@@ -16,32 +16,15 @@
*/
package org.apache.camel.component.zookeeper.policy;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Collections;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
-import org.apache.camel.ProducerTemplate;
import org.apache.camel.Route;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.zookeeper.SequenceComparator;
-import org.apache.camel.component.zookeeper.ZooKeeperEndpoint;
-import org.apache.camel.component.zookeeper.ZooKeeperMessage;
-import org.apache.camel.impl.JavaUuidGenerator;
import org.apache.camel.impl.RoutePolicySupport;
-import org.apache.camel.spi.UuidGenerator;
-
-import org.apache.zookeeper.CreateMode;
/**
* <code>ZooKeeperRoutePolicy</code> uses the leader election capabilities of a
@@ -65,81 +48,57 @@ import org.apache.zookeeper.CreateMode;
* See <a href="http://hadoop.apache.org/zookeeper/docs/current/recipes.html#sc_leaderElection">
* for more on how Leader election</a> is archived with ZooKeeper.
*/
-public class ZooKeeperRoutePolicy extends RoutePolicySupport {
+public class ZooKeeperRoutePolicy extends RoutePolicySupport implements ElectionWatcher {
private final String uri;
private final int enabledCount;
- private String candidateName;
private final Lock lock = new ReentrantLock();
- private final CountDownLatch electionComplete = new CountDownLatch(1);
private final Set<Route> suspendedRoutes = new CopyOnWriteArraySet<Route>();
private final AtomicBoolean shouldProcessExchanges = new AtomicBoolean();
- private ProducerTemplate template;
private volatile boolean shouldStopConsumer = true;
- private final UuidGenerator uuidGenerator = new JavaUuidGenerator();
- private volatile boolean isCandidateCreated;
+
+ private final Lock electionLock = new ReentrantLock();
+ private ZooKeeperElection election;
public ZooKeeperRoutePolicy(String uri, int enabledCount) {
this.uri = uri;
this.enabledCount = enabledCount;
- createCandidateName();
}
- private void createCandidateName() {
- /** UUID would be enough, also using hostname for human readability */
- StringBuilder b = new StringBuilder(fetchHostname());
- b.append("-").append(uuidGenerator.generateUuid());
- this.candidateName = b.toString();
- }
-
- private String fetchHostname() {
- try {
- return InetAddress.getLocalHost().getCanonicalHostName();
- } catch (UnknownHostException ex) {
- log.warn("Unable to determine the local hostname, using a default.", ex);
- return "default";
- }
+ public ZooKeeperRoutePolicy(ZooKeeperElection election) {
+ this.election = election;
+ this.uri = null;
+ this.enabledCount = -1;
}
@Override
public void onExchangeBegin(Route route, Exchange exchange) {
- testAndCreateCandidateNode(route);
+ ensureElectionIsCreated(route);
- awaitElectionResults();
- if (!shouldProcessExchanges.get()) {
+ if (election.isMaster()) {
if (shouldStopConsumer) {
- stopConsumer(route);
+ startConsumer(route);
}
-
- IllegalStateException e = new IllegalStateException("Zookeeper based route policy prohibits processing exchanges, stopping route and failing the exchange");
- exchange.setException(e);
-
} else {
if (shouldStopConsumer) {
- startConsumer(route);
+ stopConsumer(route);
}
- }
- }
- private void testAndCreateCandidateNode(Route route) {
- try {
- lock.lock();
- if (!isCandidateCreated) {
- createCandidateNode(route.getRouteContext().getCamelContext());
- isCandidateCreated = true;
- }
- } catch (Exception e) {
- handleException(e);
- } finally {
- lock.unlock();
+ IllegalStateException e = new IllegalStateException("Zookeeper based route policy prohibits processing exchanges, stopping route and failing the exchange");
+ exchange.setException(e);
}
}
- private void awaitElectionResults() {
- while (electionComplete.getCount() > 0) {
+ private void ensureElectionIsCreated(Route route) {
+ if (election == null) {
+ electionLock.lock();
try {
- electionComplete.await();
- } catch (InterruptedException e1) {
+ if (election == null) { // re-test
+ election = new ZooKeeperElection(route.getRouteContext().getCamelContext(), uri, enabledCount);
+ election.addElectionWatcher(this);
+ }
+ } finally {
+ electionLock.unlock();
}
}
}
@@ -173,6 +132,13 @@ public class ZooKeeperRoutePolicy extend
}
}
+ @Override
+ public void electionResultChanged() {
+ if (election.isMaster()) {
+ startAllStoppedConsumers();
+ }
+ }
+
private void startAllStoppedConsumers() {
try {
lock.lock();
@@ -200,99 +166,4 @@ public class ZooKeeperRoutePolicy extend
public void setShouldStopConsumer(boolean shouldStopConsumer) {
this.shouldStopConsumer = shouldStopConsumer;
}
-
- private ZooKeeperEndpoint createCandidateNode(CamelContext camelContext) {
- this.template = camelContext.createProducerTemplate();
- log.info("Initializing ZookeeperRoutePolicy with uri {}", uri);
-
- ZooKeeperEndpoint zep = camelContext.getEndpoint(uri, ZooKeeperEndpoint.class);
- zep.getConfiguration().setCreate(true);
- String fullpath = createFullPathToCandidate(zep);
- Exchange e = zep.createExchange();
- e.setPattern(ExchangePattern.InOut);
- e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_NODE, fullpath);
- e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_CREATE_MODE, CreateMode.EPHEMERAL_SEQUENTIAL);
- template.send(zep, e);
-
- if (e.isFailed()) {
- log.warn("Error setting up election node " + fullpath, e.getException());
- } else {
- log.info("Candidate node {} has been created", fullpath);
- try {
- camelContext.addRoutes(new ElectoralMonitorRoute(zep));
- } catch (Exception ex) {
- log.warn("Error configuring ZookeeperRoutePolicy. This exception is ignored.", ex);
- }
- }
- return zep;
-
- }
-
- private String createFullPathToCandidate(ZooKeeperEndpoint zep) {
- String fullpath = zep.getConfiguration().getPath();
- if (!fullpath.endsWith("/")) {
- fullpath += "/";
- }
- fullpath += candidateName;
- return fullpath;
- }
-
- private class ElectoralMonitorRoute extends RouteBuilder {
-
- private SequenceComparator comparator = new SequenceComparator();
-
- private ZooKeeperEndpoint zep;
-
- public ElectoralMonitorRoute(ZooKeeperEndpoint zep) {
- this.zep = zep;
- zep.getConfiguration().setListChildren(true);
- zep.getConfiguration().setRepeat(true);
- }
-
- @Override
- public void configure() throws Exception {
-
- /**
- * TODO: this is cheap cheerful but suboptimal; it suffers from the
- * 'herd effect' that on any change to the candidates list every
- * policy instance will ask for the entire candidate list again.
- * This is fine for small numbers of nodes (for scenarios
- * like Master-Slave it is perfect) but could get noisy if
- * large numbers of nodes were involved.
- * <p>
- * Better would be to find the position of this node in the list and
- * watch the node in the position ahead node ahead of this and only
- * request the candidate list when its status changes. This will
- * require enhancing the consumer to allow custom operation lists.
- */
- from(zep).sort(body(), comparator).process(new Processor() {
-
- public void process(Exchange e) throws Exception {
- @SuppressWarnings("unchecked")
- List<String> candidates = e.getIn().getMandatoryBody(List.class);
-
- int location = Math.abs(Collections.binarySearch(candidates, candidateName));
- /**
- * check if the item at this location starts with this nodes
- * candidate name
- */
- if (isOurCandidateAtLocationInCandidatesList(candidates, location)) {
-
- shouldProcessExchanges.set(location <= enabledCount);
- if (log.isDebugEnabled()) {
- log.debug("This node is number {} on the candidate list, route is configured for the top {}. Exchange processing will be {}",
- new Object[]{location, enabledCount, shouldProcessExchanges.get() ? "enabled" : "disabled"});
- }
- startAllStoppedConsumers();
- }
- electionComplete.countDown();
- }
-
- private boolean isOurCandidateAtLocationInCandidatesList(List<String> candidates, int location) {
- return location <= candidates.size() && candidates.get(location - 1).startsWith(candidateName);
- }
- });
- }
- }
-
}
Modified: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java?rev=1380174&r1=1380173&r2=1380174&view=diff
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java (original)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java Mon Sep 3 08:58:31 2012
@@ -206,6 +206,15 @@ public class ZooKeeperTestSupport extend
}
}
+ public void deleteAll(String node) throws Exception {
+ delay(200);
+ log.debug("Deleting {} and it's immediate children", node);
+ for (String child : zk.getChildren(node, false)) {
+ delete(node + "/" + child);
+ }
+ delete(node);
+ }
+
public void delete(String node) throws Exception {
delay(200);
log.debug("Deleting node " + node);
Modified: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java?rev=1380174&r1=1380173&r2=1380174&view=diff
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java (original)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/FailoverRoutePolicyTest.java Mon Sep 3 08:58:31 2012
@@ -27,8 +27,11 @@ import org.apache.camel.component.zookee
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class FailoverRoutePolicyTest extends ZooKeeperTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(FailoverRoutePolicyTest.class);
protected CamelContext createCamelContext() throws Exception {
disableJMX();
@@ -76,6 +79,7 @@ public class FailoverRoutePolicyTest ext
template.sendBody("vm:" + routename, ExchangePattern.InOut, message);
} catch (Exception e) {
if (expected > 0) {
+ LOG.error(e.getMessage(), e);
fail("Expected messages...");
}
}
Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java?rev=1380174&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/policy/ZookeeperElectionTest.java Mon Sep 3 08:58:31 2012
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.policy;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZookeeperElectionTest extends ZooKeeperTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(ZookeeperElectionTest.class);
+
+ private static final String NODE_BASE_KEY = "/someapp";
+ private static final String NODE_PARTICULAR_KEY = "/someapp/somepolicy";
+ private static final String ELECTION_URI = "zookeeper:localhost:39913/someapp/somepolicy";
+
+ @Before
+ public void before() throws Exception {
+ // set up the parent used to control the election
+ client.createPersistent(NODE_BASE_KEY, "App node to contain policy election nodes...");
+ client.createPersistent(NODE_PARTICULAR_KEY, "Policy node used by route policy to control routes...");
+ }
+
+ @After
+ public void after() throws Exception {
+ client.deleteAll(NODE_PARTICULAR_KEY);
+ client.delete(NODE_BASE_KEY);
+ }
+
+ @Test
+ public void masterCanBeElected() throws Exception {
+ ZooKeeperElection candidate = new ZooKeeperElection(template, context, ELECTION_URI, 1);
+ assertTrue("The only election candidate was not elected as master.", candidate.isMaster());
+ }
+
+ @Test
+ public void masterAndSlave() throws Exception {
+ final DefaultCamelContext candidateOneContext = createNewContext();
+ final DefaultCamelContext candidateTwoContext = createNewContext();
+
+ ZooKeeperElection electionCandidate1 = createElectionCandidate(candidateOneContext, 1);
+ assertTrue("The first candidate was not elected.", electionCandidate1.isMaster());
+ ZooKeeperElection electionCandidate2 = createElectionCandidate(candidateTwoContext, 1);
+ assertFalse("The second candidate should not have been elected.", electionCandidate2.isMaster());
+ }
+
+ @Test
+ public void testMasterGoesAway() throws Exception {
+ final DefaultCamelContext candidateOneContext = createNewContext();
+ final DefaultCamelContext candidateTwoContext = createNewContext();
+
+ ZooKeeperElection electionCandidate1 = createElectionCandidate(candidateOneContext, 1);
+ assertTrue("The first candidate was not elected.", electionCandidate1.isMaster());
+ ZooKeeperElection electionCandidate2 = createElectionCandidate(candidateTwoContext, 1);
+ assertFalse("The second candidate should not have been elected.", electionCandidate2.isMaster());
+
+ LOG.debug("About to shutdown the first candidate.");
+
+ candidateOneContext.stop(); // the first candidate was killed.
+
+ delay(3000); // more than the timeout on the zeekeeper server.
+ assertTrue("The second candidate should have been elected.", electionCandidate2.isMaster());
+ }
+
+ @Test
+ public void testDualMaster() throws Exception {
+ final DefaultCamelContext candidateOneContext = createNewContext();
+ final DefaultCamelContext candidateTwoContext = createNewContext();
+
+ ZooKeeperElection electionCandidate1 = createElectionCandidate(candidateOneContext, 2);
+ assertTrue("The first candidate was not elected.", electionCandidate1.isMaster());
+ ZooKeeperElection electionCandidate2 = createElectionCandidate(candidateTwoContext, 2);
+ assertTrue("The second candidate should also be a master.", electionCandidate2.isMaster());
+ }
+
+ @Test
+ public void testWatchersAreNotified() throws Exception {
+ final DefaultCamelContext candidateOneContext = createNewContext();
+ final DefaultCamelContext candidateTwoContext = createNewContext();
+
+ final AtomicBoolean notified = new AtomicBoolean(false);
+ ElectionWatcher watcher = new ElectionWatcher() {
+ @Override public void electionResultChanged() { notified.set(true); }
+ };
+
+ ZooKeeperElection electionCandidate1 = createElectionCandidate(candidateOneContext, 2);
+ assertTrue("The first candidate was not elected.", electionCandidate1.isMaster());
+ electionCandidate1.addElectionWatcher(watcher);
+ ZooKeeperElection electionCandidate2 = createElectionCandidate(candidateTwoContext, 2);
+ electionCandidate2.isMaster();
+ assertTrue("The first candidate should have had it's watcher notified", notified.get());
+ }
+
+ private DefaultCamelContext createNewContext() throws Exception {
+ DefaultCamelContext controlledContext = new DefaultCamelContext();
+ controlledContext.start();
+ return controlledContext;
+ }
+
+ private ZooKeeperElection createElectionCandidate(final DefaultCamelContext context, int masterCount) {
+ return new ZooKeeperElection(context.createProducerTemplate(), context, ELECTION_URI, masterCount);
+ }
+}