You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ha...@apache.org on 2011/08/25 00:31:39 UTC
svn commit: r1161306 [2/3] - in /camel/trunk: components/
components/camel-zookeeper/ components/camel-zookeeper/src/
components/camel-zookeeper/src/main/
components/camel-zookeeper/src/main/java/
components/camel-zookeeper/src/main/java/org/ component...
Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetChildrenOperation.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetChildrenOperation.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetChildrenOperation.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetChildrenOperation.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import java.util.List;
+
+import static java.lang.String.format;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * <code>GetChildrenOperation</code> is a basic ZooKeeper operation used to
+ * retrieve the list of children belonging to a given ZooKeeper node.
+ */
+public class GetChildrenOperation extends ZooKeeperOperation<List<String>> {
+
+ public GetChildrenOperation(ZooKeeper connection, String node) {
+ super(connection, node);
+ }
+
+ public OperationResult<List<String>> getResult() {
+ try {
+ Stat statistics = new Stat();
+
+ List<String> children = connection.getChildren(node, true, statistics);
+ if (LOG.isDebugEnabled()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(format("Received children from '%s' path with statistics '%s'", node, statistics));
+ } else {
+ LOG.debug(format("Received children from '%s' path ", node));
+ }
+ }
+ return new OperationResult<List<String>>(children, statistics);
+ } catch (Exception e) {
+ return new OperationResult<List<String>>(e);
+ }
+ }
+}
Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetDataOperation.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetDataOperation.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetDataOperation.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/GetDataOperation.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import static java.lang.String.format;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * <code>GetDataOperation</code> is a basic operation to immediately retrieve
+ * the data associated with a given ZooKeeper node.
+ */
+public class GetDataOperation extends ZooKeeperOperation<byte[]> {
+
+ public GetDataOperation(ZooKeeper connection, String node) {
+ super(connection, node);
+ }
+
+ public OperationResult<byte[]> getResult() {
+ try {
+ Stat statistics = new Stat();
+
+ if (LOG.isDebugEnabled()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(format("Received data from '%s' path with statistics '%s'", node, statistics));
+ } else {
+ LOG.debug(format("Received data from '%s' path ", node));
+ }
+ }
+ return new OperationResult<byte[]>(connection.getData(node, true, statistics), statistics);
+ } catch (Exception e) {
+ return new OperationResult<byte[]>(e);
+ }
+ }
+
+}
Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/OperationResult.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/OperationResult.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/OperationResult.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/OperationResult.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * <code>OperationResult</code> is used to ecapsulate the results of executing a
+ * {@link ZooKeeperOperation}
+ */
+public class OperationResult<ResultType> {
+ private Stat statistics;
+ private ResultType result;
+ private Exception exception;
+ private boolean ok;
+
+ public OperationResult(ResultType result, Stat statistics) {
+ this(result, statistics, true);
+ }
+
+ public OperationResult(ResultType result, Stat statistics, boolean ok) {
+ this.result = result;
+ this.statistics = statistics;
+ this.ok = ok;
+ }
+
+ public OperationResult(Exception exception) {
+ this.exception = exception;
+ ok = false;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+
+ public Stat getStatistics() {
+ return statistics;
+ }
+
+ public ResultType getResult() {
+ return result;
+ }
+
+ public boolean isOk() {
+ return ok;
+ }
+
+ public boolean failedDueTo(Code... codes) {
+ if (exception instanceof KeeperException) {
+
+ for (Code code : codes) {
+ if (code.equals(((KeeperException)exception).code())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/SetDataOperation.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/SetDataOperation.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/SetDataOperation.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/SetDataOperation.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import static java.lang.String.format;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * <code>SetDataOperation</code> sets the content of a ZooKeeper node. An optional version
+ * may be specified that the node must currently have for the operation to succeed.
+ * @see {@link ZooKeeper#setData(String, byte[], int)}
+ */
+public class SetDataOperation extends ZooKeeperOperation<byte[]> {
+
+ private byte[] data;
+
+ private int version = -1;
+
+ public SetDataOperation(ZooKeeper connection, String node, byte[] data) {
+ super(connection, node);
+ this.data = data;
+ }
+
+ public OperationResult<byte[]> getResult() {
+ try {
+ Stat statistics = connection.setData(node, data, version);
+ if (LOG.isDebugEnabled()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(format("Set data of node '%s' with '%d' bytes of data, retrieved statistics '%s' ",
+ node, data != null ? data.length : 0, statistics));
+ } else {
+ LOG.debug(format("Set data of node '%s' with '%d' bytes of data", node, data != null ? data.length : 0));
+ }
+ }
+ return new OperationResult<byte[]>(data, statistics);
+ } catch (Exception e) {
+ return new OperationResult<byte[]>(e);
+ }
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public ZooKeeperOperation createCopy() throws Exception {
+ SetDataOperation copy = (SetDataOperation) super.createCopy();
+ copy.version = -1; // set the version to -1 for 'any version'
+ return copy;
+ }
+
+}
Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/ZooKeeperOperation.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/ZooKeeperOperation.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/ZooKeeperOperation.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/operations/ZooKeeperOperation.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * <code>ZooKeeperOperation</code> is the base class for wrapping various
+ * ZooKeeper API instructions and callbacks into callable and composable operation
+ * objects.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class ZooKeeperOperation<ResultType> {
+
+ protected static final transient Log LOG = LogFactory.getLog(ZooKeeperOperation.class);
+
+ protected static final Class[] CONSTRUCTOR_ARGS = {ZooKeeper.class, String.class};
+
+ protected String node;
+
+ protected ZooKeeper connection;
+
+ protected Set<Thread> waitingThreads = new CopyOnWriteArraySet<Thread>();
+
+ protected OperationResult<ResultType> result;
+
+ private boolean producesExchange;
+
+ private boolean cancelled;
+
+ public ZooKeeperOperation(ZooKeeper connection, String node) {
+ this(connection, node, true);
+ }
+
+ public ZooKeeperOperation(ZooKeeper connection, String node, boolean producesExchange) {
+ this.connection = connection;
+ this.node = node;
+ this.producesExchange = producesExchange;
+ }
+
+ /**
+ * Gets the result of this zookeeper operation, i.e. some data and the
+ * associated node stats
+ *
+ * @return
+ */
+ public abstract OperationResult<ResultType> getResult();
+
+ public OperationResult<ResultType> get() throws InterruptedException, ExecutionException {
+ waitingThreads.add(Thread.currentThread());
+ result = getResult();
+ return result;
+ }
+
+ public OperationResult<ResultType> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return get(); // TODO ; perhaps set a timer here ....
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (mayInterruptIfRunning) {
+ for (Thread waiting : waitingThreads) {
+ waiting.interrupt();
+ }
+ cancelled = true;
+ }
+ return mayInterruptIfRunning;
+ }
+
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ public boolean isDone() {
+ return result != null;
+ }
+
+ public String getNode() {
+ return node;
+ }
+
+ public boolean shouldProduceExchange() {
+ return producesExchange;
+ }
+
+
+ // TODO: slightly different to a clone as it uses the constructor
+ public ZooKeeperOperation createCopy() throws Exception {
+ return getClass().getConstructor(CONSTRUCTOR_ARGS).newInstance(new Object[] {connection, node});
+ }
+}
Added: camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java (added)
+++ camel/trunk/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/policy/ZooKeeperRoutePolicy.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.policy;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static java.lang.String.format;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.Route;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.zookeeper.SequenceComparator;
+import org.apache.camel.component.zookeeper.ZooKeeperEndpoint;
+import org.apache.camel.component.zookeeper.ZooKeeperMessage;
+import org.apache.camel.impl.JavaUuidGenerator;
+import org.apache.camel.impl.RoutePolicySupport;
+import org.apache.camel.spi.UuidGenerator;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.zookeeper.CreateMode;
+
+/**
+ * <code>ZooKeeperRoutePolicy</code> uses the leader election capabilities of a
+ * ZooKeeper cluster to control how routes are enabled. It is typically used in
+ * fail-over scenarios controlling identical instances of a route across a
+ * cluster of Camel based servers.
+ * <p>
+ * The policy is configured with a 'top n' number of routes that should be
+ * allowed to start, for a master/slave scenario this would be 1. Each instance
+ * of the policy will execute the election algorithm to obtain its position in
+ * the hierarchy of servers, if it is within the 'top n' servers then the policy
+ * is enabled and exchanges can be processed by the route. If not it waits for a
+ * change in the leader hierarchy and then reruns this scenario to see if it is
+ * now in the top n.
+ * <p>
+ * All instances of the policy must also be configured with the same path on the
+ * ZooKeeper cluster where the election will be carried out. It is good practice
+ * for this to indicate the application e.g. /someapplication/someroute/ note
+ * that these nodes should exist before using the policy.
+ * <p>
+ * See @link{ http://hadoop.apache
+ * .org/zookeeper/docs/current/recipes.html#sc_leaderElection} for more on how
+ * Leader election is achieved with ZooKeeper.
+ *
+ */
+public class ZooKeeperRoutePolicy extends RoutePolicySupport {
+
+ private String uri;
+
+ private int enabledCount;
+
+ private String candidateName;
+
+ private final Lock lock = new ReentrantLock();
+
+ private final CountDownLatch electionComplete = new CountDownLatch(1);
+
+ private Set<Route> suspendedRoutes = new CopyOnWriteArraySet<Route>();
+
+ private AtomicBoolean shouldProcessExchanges = new AtomicBoolean();
+
+ private ProducerTemplate template;
+
+ private boolean shouldStopConsumer = true;
+
+ private UuidGenerator uuidGenerator = new JavaUuidGenerator();
+
+ private boolean isCandidateCreated;
+
+ public ZooKeeperRoutePolicy(String uri, int enabledCount) throws Exception {
+ this.uri = uri;
+ this.enabledCount = enabledCount;
+ createCandidateName();
+ }
+
+ private void createCandidateName() throws Exception {
+ /** UUID would be enough, also using hostname for human readability */
+ StringBuilder b = new StringBuilder(InetAddress.getLocalHost().getCanonicalHostName());
+ b.append("-").append(uuidGenerator.generateUuid());
+ this.candidateName = b.toString();
+ }
+
+ @Override
+ public void onExchangeBegin(Route route, Exchange exchange) {
+ testAndCreateCandidateNode(route);
+
+ awaitElectionResults();
+ if (!shouldProcessExchanges.get()) {
+ if (shouldStopConsumer) {
+ stopConsumer(route);
+ }
+
+ IllegalStateException e = new IllegalStateException("Zookeeper based route policy prohibits processing exchanges, stopping route and failing the exchange");
+ exchange.setException(e);
+
+ } else {
+ if (shouldStopConsumer) {
+ startConsumer(route);
+ }
+ }
+ }
+
+ private void testAndCreateCandidateNode(Route route) {
+ try {
+ lock.lock();
+ if (!isCandidateCreated) {
+ createCandidateNode(route.getRouteContext().getCamelContext());
+ isCandidateCreated = true;
+ }
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void awaitElectionResults() {
+ while (electionComplete.getCount() > 0) {
+ try {
+ electionComplete.await();
+ } catch (InterruptedException e1) {
+ }
+ }
+ }
+
+ private void startConsumer(Route route) {
+ try {
+ lock.lock();
+ if (suspendedRoutes.contains(route)) {
+ startConsumer(route.getConsumer());
+ suspendedRoutes.remove(route);
+ }
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void stopConsumer(Route route) {
+ try {
+ lock.lock();
+ // check that we should still suspend once the lock is acquired
+ if (!suspendedRoutes.contains(route) && !shouldProcessExchanges.get()) {
+ stopConsumer(route.getConsumer());
+ suspendedRoutes.add(route);
+ }
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void startAllStoppedConsumers() {
+ try {
+ lock.lock();
+ if (!suspendedRoutes.isEmpty()) {
+ if (log.isDebugEnabled()) {
+ log.debug(format("'%d' have been stopped previously by poilcy, restarting.", suspendedRoutes.size()));
+ }
+ for (Route suspended : suspendedRoutes) {
+ startConsumer(suspended.getConsumer());
+ }
+ suspendedRoutes.clear();
+ }
+
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean isShouldStopConsumer() {
+ return shouldStopConsumer;
+ }
+
+ public void setShouldStopConsumer(boolean shouldStopConsumer) {
+ this.shouldStopConsumer = shouldStopConsumer;
+ }
+
+ private ZooKeeperEndpoint createCandidateNode(CamelContext camelContext) {
+ this.template = camelContext.createProducerTemplate();
+ if (log.isInfoEnabled()) {
+ log.info(format("Initializing ZookeeperRoutePolicy with uri '%s'", uri));
+ }
+ ZooKeeperEndpoint zep = (ZooKeeperEndpoint)camelContext.getEndpoint(uri);
+ zep.getConfiguration().setCreate(true);
+ String fullpath = createFullPathToCandidate(zep);
+ Exchange e = zep.createExchange();
+ e.setPattern(ExchangePattern.InOut);
+ e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_NODE, fullpath);
+ e.getIn().setHeader(ZooKeeperMessage.ZOOKEEPER_CREATE_MODE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ template.send(zep, e);
+
+ if (e.isFailed()) {
+ log.error("Error setting up election node " + fullpath, e.getException());
+ } else {
+ if (log.isInfoEnabled()) {
+ log.info(format("Candidate node '%s' has been created", fullpath));
+ }
+ try {
+ if (zep != null) {
+ camelContext.addRoutes(new ElectoralMonitorRoute(zep));
+ }
+ } catch (Exception ex) {
+ log.error("Error configuring ZookeeperRoutePolicy", ex);
+ }
+ }
+ return zep;
+
+ }
+
+ private String createFullPathToCandidate(ZooKeeperEndpoint zep) {
+ String fullpath = zep.getConfiguration().getPath();
+ if (!fullpath.endsWith("/")) {
+ fullpath += "/";
+ }
+ fullpath += candidateName;
+ return fullpath;
+ }
+
+ private class ElectoralMonitorRoute extends RouteBuilder {
+
+ private SequenceComparator comparator = new SequenceComparator();
+
+ private ZooKeeperEndpoint zep;
+
+ public ElectoralMonitorRoute(ZooKeeperEndpoint zep) {
+ this.zep = zep;
+ zep.getConfiguration().setListChildren(true);
+ zep.getConfiguration().setRepeat(true);
+ }
+
+ @Override
+ public void configure() throws Exception {
+
+ /**
+ * TODO: this is cheap cheerful but suboptimal; it suffers from the
+ * 'herd effect' that on any change to the candidates list every
+ * policy instance will ask for the entire candidate list again.
+ * This is fine for small numbers of nodes (for scenarios
+ * like Master-Slave it is perfect) but could get noisy if
+ * large numbers of nodes were involved.
+ * <p>
+ * Better would be to find the position of this node in the list and
+ * watch the node in the position ahead node ahead of this and only
+ * request the candidate list when its status changes. This will
+ * require enhancing the consumer to allow custom operation lists.
+ */
+ from(zep).sort(body(), comparator).process(new Processor() {
+
+ @SuppressWarnings("unchecked")
+ public void process(Exchange e) throws Exception {
+ List<String> candidates = (List<String>)ExchangeHelper.getMandatoryInBody(e);
+
+ int location = Math.abs(Collections.binarySearch(candidates, candidateName));
+ /**
+ * check if the item at this location starts with this nodes
+ * candidate name
+ */
+ if (isOurCandidateAtLocationInCandidatesList(candidates, location)) {
+
+ shouldProcessExchanges.set(location <= enabledCount);
+ if (log.isDebugEnabled()) {
+ log.debug(format("This node is number '%d' on the candidate list, route is configured for the top '%d'. Exchange processing will be %s", location,
+ enabledCount, shouldProcessExchanges.get() ? "enabled" : "disabled"));
+ }
+ startAllStoppedConsumers();
+ }
+ electionComplete.countDown();
+ }
+
+ private boolean isOurCandidateAtLocationInCandidatesList(List<String> candidates, int location) {
+ return location <= candidates.size() && candidates.get(location - 1).startsWith(candidateName);
+ }
+ });
+ }
+ }
+
+}
Added: camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zoo
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zoo?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zoo (added)
+++ camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zoo Wed Aug 24 22:31:37 2011
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+class=org.apache.camel.component.zookeeper.ZooKeeperComponent
Added: camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zookeeper
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zookeeper?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zookeeper (added)
+++ camel/trunk/components/camel-zookeeper/src/main/resources/META-INF/services/org/apache/camel/component/zookeeper Wed Aug 24 22:31:37 2011
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+class=org.apache.camel.component.zookeeper.ZooKeeperComponent
Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeChildrenTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.zookeeper.NaturalSortComparator.Order;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Test;
+
+@SuppressWarnings("unchecked")
+public class ConsumeChildrenTest extends ZooKeeperTestSupport {
+
+ @Override
+ protected RouteBuilder[] createRouteBuilders() throws Exception {
+ return new RouteBuilder[] {new RouteBuilder() {
+ public void configure() throws Exception {
+ from("zoo://localhost:39913/grimm?repeat=true&listChildren=true").sort(body(), new NaturalSortComparator(Order.Descending)).to("mock:zookeeper-data");
+ }
+ }};
+ }
+
+ @Test
+ public void shouldAwaitCreationAndGetDataNotification() throws Exception {
+
+ MockEndpoint mock = getMockEndpoint("mock:zookeeper-data");
+ mock.expectedMessageCount(5);
+
+ client.createPersistent("/grimm", "parent");
+ client.create("/grimm/hansel", "child");
+ client.create("/grimm/gretel", "child");
+ client.delete("/grimm/hansel");
+ client.delete("/grimm/gretel");
+
+ mock.await(5, TimeUnit.SECONDS);
+ mock.assertIsSatisfied();
+
+ validateExchangesContainListings(mock, createChildListing(), createChildListing("hansel"), createChildListing("hansel", "gretel"), createChildListing("gretel"),
+ createChildListing());
+
+ }
+
+ private void validateExchangesContainListings(MockEndpoint mock, List<String>... expected) throws InvalidPayloadException {
+ int index = 0;
+ for (Exchange received : mock.getReceivedExchanges()) {
+ List<String> actual = ExchangeHelper.getMandatoryInBody(received, List.class);
+ assertEquals(expected[index++], actual);
+ validateChildrenCountChangesEachTime(mock);
+ }
+ }
+
+ protected void validateChildrenCountChangesEachTime(MockEndpoint mock) {
+ int lastChildCount = -1;
+ List<Exchange> received = mock.getReceivedExchanges();
+ for (int x = 0; x < received.size(); x++) {
+ Message zkm = mock.getReceivedExchanges().get(x).getIn();
+ int childCount = ((Stat)zkm.getHeader(ZooKeeperMessage.ZOOKEEPER_STATISTICS)).getNumChildren();
+ assertNotSame("Num of children did not change", lastChildCount, childCount);
+ lastChildCount = childCount;
+ }
+ }
+}
Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ConsumeDataTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.junit.Test;
+
+public class ConsumeDataTest extends ZooKeeperTestSupport {
+
+ @Override
+ protected RouteBuilder[] createRouteBuilders() throws Exception {
+ return new RouteBuilder[] {new RouteBuilder() {
+ public void configure() throws Exception {
+ from("zoo://localhost:39913/camel?repeat=true").to("mock:zookeeper-data");
+ }
+ }};
+ }
+
+ @Test
+ public void shouldAwaitCreationAndGetDataNotification() throws Exception {
+
+ MockEndpoint mock = getMockEndpoint("mock:zookeeper-data");
+ mock.expectedMinimumMessageCount(10);
+
+ createCamelNode();
+ updateNode(10);
+
+ mock.await(5, TimeUnit.SECONDS);
+ mock.assertIsSatisfied();
+
+ validateExchangesReceivedInOrderWithIncreasingVersion(mock);
+ }
+
+ @Test
+ public void deletionOfAwaitedNodeCausesNoFailure() throws Exception {
+
+ MockEndpoint mock = getMockEndpoint("mock:zookeeper-data");
+ mock.expectedMessageCount(11);
+ createCamelNode();
+
+ delay(200);
+
+ // by now we are back waiting for a change so delete the node
+ client.delete("/camel");
+
+ // recreate and update a number of times.
+ createCamelNode();
+ updateNode(10);
+
+ mock.await(5, TimeUnit.SECONDS);
+ mock.assertIsSatisfied();
+ }
+
+ private void updateNode(int times) throws InterruptedException, Exception {
+ for (int x = 1; x < times; x++) {
+ delay(200);
+ client.setData("/camel", testPayload + "_" + x, -1);
+ }
+ }
+
+ private void createCamelNode() throws InterruptedException, Exception {
+ try {
+ delay(1000);
+ client.create("/camel", testPayload + "_0");
+ } catch (NodeExistsException e) {
+ }
+ }
+}
Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/NaturalSortComparatorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/NaturalSortComparatorTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/NaturalSortComparatorTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/NaturalSortComparatorTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.camel.component.zookeeper.NaturalSortComparator.Order;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class NaturalSortComparatorTest {
+
+ @Test
+ public void testSortOrder() throws Exception {
+
+ List<String> sorted = Arrays
+ .asList(new String[] {"0", "1", "3", "4.0", "11", "30", "55", "225", "333", "camel-2.1.0", "camel-2.1.1", "camel-2.1.1-SNAPSHOT", "camel-2.2.0"});
+
+ List<String> unsorted = new ArrayList<String>(sorted);
+ Collections.shuffle(unsorted);
+ Collections.sort(unsorted, new NaturalSortComparator());
+ compareLists(sorted, unsorted);
+
+ Collections.shuffle(unsorted);
+ Collections.sort(unsorted, new NaturalSortComparator(Order.Descending));
+ Collections.reverse(sorted);
+ compareLists(sorted, unsorted);
+ }
+
+ private void compareLists(List<String> sorted, List<String> unsorted) {
+ for (int x = 0; x < unsorted.size(); x++) {
+ assertEquals(sorted.get(x), unsorted.get(x));
+ }
+ }
+}
Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperEndpointTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperEndpointTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperEndpointTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperEndpointTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.util.ArrayList;
+import java.util.Set;
+
+import javax.management.Attribute;
+import javax.management.ObjectName;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport.TestZookeeperClient;
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport.TestZookeeperServer;
+import org.apache.camel.management.JmxInstrumentationUsingDefaultsTest;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.springframework.jmx.support.JmxUtils;
+
+@SuppressWarnings("all")
+public class ZooKeeperEndpointTest extends JmxInstrumentationUsingDefaultsTest {
+
+ private static int teardownAfter;
+
+ @Override
+ protected void setUp() throws Exception {
+ if (teardownAfter == 0) {
+ ZooKeeperTestSupport.setupTestServer();
+ }
+ super.setUp();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ if (++teardownAfter == 3) {
+ ZooKeeperTestSupport.shutdownServer();
+ }
+ }
+
+ public synchronized void testEnpointConfigurationCanBeSetViaJMX() throws Exception {
+ Set s = mbsc.queryNames(new ObjectName(domainName + ":type=endpoints,*"), null);
+ assertEquals("Could not find endpoints: " + s, 2, s.size());
+ ObjectName zepName = new ArrayList<ObjectName>(s).get(0);
+
+ verifyManagedAttribute(zepName, "Path", "/someotherpath");
+ verifyManagedAttribute(zepName, "Create", true);
+ verifyManagedAttribute(zepName, "Repeat", true);
+ verifyManagedAttribute(zepName, "ListChildren", true);
+ verifyManagedAttribute(zepName, "AwaitExistence", true);
+ verifyManagedAttribute(zepName, "Timeout", 12345);
+ verifyManagedAttribute(zepName, "Backoff", 12345L);
+
+ mbsc.invoke(zepName, "clearServers", null, JmxUtils.getMethodSignature(ZooKeeperEndpoint.class.getMethod("clearServers", null)));
+ mbsc.invoke(zepName, "addServer", new Object[] {"someserver:12345"},
+ JmxUtils.getMethodSignature(ZooKeeperEndpoint.class.getMethod("addServer", new Class[] {String.class})));
+
+ }
+
+ private void verifyManagedAttribute(ObjectName zepName, String attributeName, String attributeValue) throws Exception {
+ mbsc.setAttribute(zepName, new Attribute(attributeName, attributeValue));
+ assertEquals(attributeValue, mbsc.getAttribute(zepName, attributeName));
+ }
+
+ private void verifyManagedAttribute(ObjectName zepName, String attributeName, Integer attributeValue) throws Exception {
+ mbsc.setAttribute(zepName, new Attribute(attributeName, attributeValue));
+ assertEquals(attributeValue, mbsc.getAttribute(zepName, attributeName));
+ }
+
+ private void verifyManagedAttribute(ObjectName zepName, String attributeName, Boolean attributeValue) throws Exception {
+ mbsc.setAttribute(zepName, new Attribute(attributeName, attributeValue));
+ assertEquals(attributeValue, mbsc.getAttribute(zepName, attributeName));
+ }
+
+ private void verifyManagedAttribute(ObjectName zepName, String attributeName, Long attributeValue) throws Exception {
+ mbsc.setAttribute(zepName, new Attribute(attributeName, attributeValue));
+ assertEquals(attributeValue, mbsc.getAttribute(zepName, attributeName));
+ }
+
+ @Override
+ public void testCounters() throws Exception {
+ }
+
+ @Override
+ public void testMBeansRegistered() throws Exception {
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("zoo://localhost:39913/node").to("mock:test");
+ }
+ };
+ }
+}
Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperTestSupport.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.FileUtil;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class ZooKeeperTestSupport extends CamelTestSupport {
+
+ protected static TestZookeeperServer server;
+
+ protected static TestZookeeperClient client;
+
+ private static final Logger LOG = Logger.getLogger(ZooKeeperTestSupport.class);
+
+ protected String testPayload = "This is a test";
+
+ protected byte[] testPayloadBytes = testPayload.getBytes();
+
+ @BeforeClass
+ public static void setupTestServer() throws Exception {
+ LOG.info("Starting Zookeeper Test Infrastructure");
+ server = new TestZookeeperServer(getServerPort(), clearServerData());
+ waitForServerUp("localhost:" + getServerPort(), 1000);
+ client = new TestZookeeperClient(getServerPort(), getTestClientSessionTimeout());
+ LOG.info("Started Zookeeper Test Infrastructure on port " + getServerPort());
+ }
+
+ public ZooKeeper getConnection() {
+ return client.getConnection();
+ }
+
+ @AfterClass
+ public static void shutdownServer() throws Exception {
+ LOG.info("Stopping Zookeeper Test Infrastructure");
+ client.shutdown();
+ server.shutdown();
+ waitForServerDown("localhost:" + getServerPort(), 1000);
+ LOG.info("Stopped Zookeeper Test Infrastructure");
+ }
+
+ protected static int getServerPort() {
+ return 39913;
+ }
+
+ protected static int getTestClientSessionTimeout() {
+ return 100000;
+ }
+
+ protected static boolean clearServerData() {
+ return true;
+ }
+
+ public static class TestZookeeperServer {
+ private NIOServerCnxn.Factory connectionFactory;
+ private ZooKeeperServer zkServer;
+
+ public TestZookeeperServer(int clientPort, boolean clearServerData) throws Exception {
+
+ if (clearServerData) {
+ File working = new File("./target/zookeeper");
+ deleteDir(working);
+ if (working.exists()) {
+ throw new Exception("Could not delete Test Zookeeper Server working dir ./target/zookeeper");
+ }
+ }
+ zkServer = new ZooKeeperServer();
+ File dataDir = new File("./target/zookeeper/log");
+ File snapDir = new File("./target/zookeeper/data");
+ FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, snapDir);
+ zkServer.setTxnLogFactory(ftxn);
+ zkServer.setTickTime(1000);
+ connectionFactory = new NIOServerCnxn.Factory(new InetSocketAddress("localhost", clientPort), 0);
+ connectionFactory.startup(zkServer);
+ }
+
+ public void shutdown() throws Exception {
+ connectionFactory.shutdown();
+ connectionFactory.join();
+ while (zkServer.isRunning()) {
+ zkServer.shutdown();
+ Thread.sleep(100);
+ }
+ }
+ }
+
+ public static class TestZookeeperClient implements Watcher {
+
+ public static int x;
+
+ private final Logger log = Logger.getLogger(getClass());
+
+ private ZooKeeper zk;
+
+ private CountDownLatch connected = new CountDownLatch(1);
+
+
+ public TestZookeeperClient(int port, int timeout) throws Exception {
+ zk = new ZooKeeper("localhost:" + port, timeout, this);
+ connected.await();
+ }
+
+ public ZooKeeper getConnection() {
+ return zk;
+ }
+
+ public void shutdown() throws Exception {
+ zk.close();
+ }
+
+ public byte[] waitForNodeChange(String node) throws Exception {
+ Stat stat = new Stat();
+ return zk.getData(node, this, stat);
+ }
+
+ public void create(String node, String data) throws Exception {
+ log.debug(String.format("Creating node '%s' with data '%s' ", node, data));
+ create(node, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+
+ }
+
+ public void createPersistent(String node, String data) throws Exception {
+ log.debug(String.format("Creating node '%s' with data '%s' ", node, data));
+ create(node, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ public void create(String znode, String data, List<ACL> access, CreateMode mode) throws Exception {
+ delay(200);
+ String created = zk.create(znode, data != null ? data.getBytes() : null, access, mode);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Created znode named '%s'", created));
+ }
+ }
+
+ public Stat setData(String node, String data, int version) throws Exception {
+ log.debug(String.format("TestClient Updating data of node %s to %s", node, data));
+ return zk.setData(node, data.getBytes(), version);
+ }
+
+ public byte[] getData(String znode) throws Exception {
+ return zk.getData(znode, false, new Stat());
+ }
+
+ public void process(WatchedEvent event) {
+
+ if (event.getState() == KeeperState.SyncConnected) {
+ log.info("TestClient connected");
+ connected.countDown();
+ } else {
+ if (event.getState() == KeeperState.Disconnected) {
+ log.info("TestClient connected ?" + zk.getState());
+ }
+ }
+ }
+
+ public void delete(String node) throws Exception {
+ delay(200);
+ log.debug("Deleting node " + node);
+ zk.delete(node, -1);
+ }
+ }
+
+ // Wait methods are taken directly from the Zookeeper tests. A tests jar
+ // would be nice! Another good reason the keeper folks should move to maven.
+ private static boolean waitForServerUp(String hp, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ // if there are multiple hostports, just take the first one
+ hp = hp.split(",")[0];
+ String result = send4LetterWord(hp, "stat");
+ if (result.startsWith("Zookeeper version:")) {
+ return true;
+ }
+ } catch (IOException e) {
+ LOG.info("server " + hp + " not up " + e);
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ private static String send4LetterWord(String hp, String cmd) throws IOException {
+ String split[] = hp.split(":");
+ String host = split[0];
+ int port;
+ try {
+ port = Integer.parseInt(split[1]);
+ } catch (RuntimeException e) {
+ throw new RuntimeException("Problem parsing " + hp + e.toString());
+ }
+
+ Socket sock = new Socket(host, port);
+ BufferedReader reader = null;
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write(cmd.getBytes());
+ outstream.flush();
+
+ reader = new BufferedReader(new InputStreamReader(sock.getInputStream()));
+ StringBuffer sb = new StringBuffer();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ sb.append(line + "\n");
+ }
+ return sb.toString();
+ } finally {
+ sock.close();
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
+ private static boolean waitForServerDown(String hp, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ send4LetterWord(hp, "stat");
+ } catch (IOException e) {
+ return true;
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ public static void deleteDir(File f) {
+ LinkedList<File> deleteStack = new LinkedList<File>();
+ deleteStack.addLast(f);
+ deleteDir(deleteStack);
+ }
+
+ private static void deleteDir(Deque<File> deleteStack) {
+ File f = deleteStack.pollLast();
+ if (f != null) {
+ if (f.isDirectory()) {
+ for (File child : f.listFiles()) {
+ deleteStack.addLast(child);
+ }
+ }
+ deleteDir(deleteStack);
+ FileUtil.deleteFile(f);
+ }
+ }
+
+ public static void delay(int wait) throws InterruptedException {
+ Thread.sleep(wait);
+ }
+
+ protected List<String> createChildListing(String... children) {
+ return Arrays.asList(children);
+ }
+
+ protected void validateExchangesReceivedInOrderWithIncreasingVersion(MockEndpoint mock) {
+ int lastVersion = -1;
+ List<Exchange> received = mock.getReceivedExchanges();
+ for (int x = 0; x < received.size(); x++) {
+ Message zkm = mock.getReceivedExchanges().get(x).getIn();
+ int version = ZooKeeperMessage.getStatistics(zkm).getVersion();
+ assertTrue("Version did not increase", lastVersion < version);
+ lastVersion = version;
+ }
+ }
+
+ protected void verifyAccessControlList(String node, List<ACL> expected) throws Exception {
+ getConnection().getACL(node, new Stat());
+ }
+
+ protected void verifyNodeContainsData(String node, byte[] expected) throws Exception {
+ if (expected == null) {
+ assertNull(client.getData(node));
+ } else {
+ assertEquals(new String(expected), new String(client.getData(node)));
+ }
+ }
+}
Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperUtilsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperUtilsTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperUtilsTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperUtilsTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+
+
+import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultMessage;
+import org.apache.zookeeper.CreateMode;
+import org.junit.Test;
+
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateMode;
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateModeFromString;
+
+import static org.junit.Assert.assertEquals;
+
+public class ZooKeeperUtilsTest {
+
+ @Test
+ public void testCreatModeExtraction() {
+ assertEquals(CreateMode.EPHEMERAL, getCreateModeFromString("EPHEMERAL", CreateMode.EPHEMERAL));
+ assertEquals(CreateMode.EPHEMERAL_SEQUENTIAL, getCreateModeFromString("EPHEMERAL_SEQUENTIAL", CreateMode.EPHEMERAL));
+ assertEquals(CreateMode.PERSISTENT, getCreateModeFromString("PERSISTENT", CreateMode.EPHEMERAL));
+ assertEquals(CreateMode.PERSISTENT_SEQUENTIAL, getCreateModeFromString("PERSISTENT_SEQUENTIAL", CreateMode.EPHEMERAL));
+ assertEquals(CreateMode.EPHEMERAL, getCreateModeFromString("DOESNOTEXIST", CreateMode.EPHEMERAL));
+ }
+
+ @Test
+ public void testCreatModeExtractionFromMessageHeader() {
+ assertEquals(CreateMode.EPHEMERAL, testModeInMessage("EPHEMERAL", CreateMode.EPHEMERAL));
+ assertEquals(CreateMode.EPHEMERAL_SEQUENTIAL, testModeInMessage("EPHEMERAL_SEQUENTIAL", CreateMode.EPHEMERAL));
+ assertEquals(CreateMode.PERSISTENT, testModeInMessage("PERSISTENT", CreateMode.EPHEMERAL));
+ assertEquals(CreateMode.PERSISTENT_SEQUENTIAL, testModeInMessage("PERSISTENT_SEQUENTIAL", CreateMode.EPHEMERAL));
+ assertEquals(CreateMode.EPHEMERAL, testModeInMessage("DOESNOTEXIST", CreateMode.EPHEMERAL));
+ }
+
+ private CreateMode testModeInMessage(String mode, CreateMode defaultMode) {
+ Message m = new DefaultMessage();
+ m.setHeader(ZooKeeperMessage.ZOOKEEPER_CREATE_MODE, mode);
+ return getCreateMode(m, defaultMode);
+ }
+}
Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperConnectionManagerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperConnectionManagerTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperConnectionManagerTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperConnectionManagerTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.junit.Test;
+
+public class ZookeeperConnectionManagerTest extends ZooKeeperTestSupport {
+
+ @Test
+ public void shouldWaitForConnection() {
+ ZooKeeperConfiguration config = new ZooKeeperConfiguration();
+ config.addZookeeperServer("localhost:39913");
+
+ ZooKeeperComponent component = new ZooKeeperComponent(config);
+ component.setConfiguration(config);
+ component.setCamelContext(context);
+
+ ZooKeeperEndpoint zep = new ZooKeeperEndpoint("zoo:someserver/this/is/a/path", component, config);
+
+ ZooKeeperConnectionManager zkcm = new ZooKeeperConnectionManager(zep);
+ ZooKeeper zk = zkcm.getConnection();
+ zk.getState();
+ assertEquals(States.CONNECTED, zk.getState());
+ }
+}
Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.zookeeper.operations.GetChildrenOperation;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+
+import org.junit.Test;
+
+import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_CREATE_MODE;
+import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_NODE;
+
+
+public class ZookeeperProducerTest extends ZooKeeperTestSupport {
+
+ private String zookeeperUri;
+
+ private String testPayload = "TestPayload";
+
+ @Override
+ protected RouteBuilder[] createRouteBuilders() throws Exception {
+ return new RouteBuilder[] {new RouteBuilder() {
+ public void configure() throws Exception {
+ zookeeperUri = "zoo://localhost:39913/node?create=true";
+ from("direct:roundtrip").to(zookeeperUri).to("mock:producer-out");
+ from(zookeeperUri).to("mock:consumed-from-node");
+ }
+ }, new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:no-create-fails-set").to("zoo://localhost:39913/doesnotexist");
+ }
+ }, new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:node-from-header").to("zoo://localhost:39913/notset?create=true");
+ from("zoo://localhost:39913/set?create=true").to("mock:consumed-from-set-node");
+ }
+ }, new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:create-mode").to("zoo://localhost:39913/persistent?create=true&createMode=PERSISTENT").to("mock:create-mode");
+ }
+ }};
+ }
+
+ @Test
+ public void testRoundtripOfDataToAndFromZnode() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
+ MockEndpoint pipeline = getMockEndpoint("mock:producer-out");
+ mock.expectedMessageCount(1);
+ pipeline.expectedMessageCount(1);
+
+ Exchange e = createExchangeWithBody(testPayload);
+ e.setPattern(ExchangePattern.InOut);
+ template.send("direct:roundtrip", e);
+
+ mock.await(2, TimeUnit.SECONDS);
+ mock.assertIsSatisfied();
+ pipeline.assertIsSatisfied();
+ }
+
+ @Test
+ public void testAsyncRoundtripOfDataToAndFromZnode() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
+ mock.expectedMessageCount(1);
+
+ Exchange e = createExchangeWithBody(testPayload);
+ template.send("direct:roundtrip", e);
+
+ mock.await(2, TimeUnit.SECONDS);
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ public void setUsingNodeFromHeader() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:consumed-from-set-node");
+ mock.expectedMessageCount(1);
+
+ Exchange e = createExchangeWithBody(testPayload);
+ e.setPattern(ExchangePattern.InOut);
+ template.sendBodyAndHeader("direct:node-from-header", e, ZOOKEEPER_NODE, "/set");
+
+ mock.await(5, TimeUnit.SECONDS);
+ mock.assertIsSatisfied();
+ }
+
+ @Test
+ public void setUsingCreateModeFromHeader() throws Exception {
+
+ client.createPersistent("/modes-test", "parent for modes");
+ for (CreateMode mode : CreateMode.values()) {
+ Exchange exchange = createExchangeWithBody(testPayload);
+ exchange.getIn().setHeader(ZOOKEEPER_CREATE_MODE, mode);
+ exchange.getIn().setHeader(ZOOKEEPER_NODE, "/modes-test/" + mode);
+ exchange.setPattern(ExchangePattern.InOut);
+ template.send("direct:node-from-header", exchange);
+ }
+ GetChildrenOperation listing = new GetChildrenOperation(getConnection(), "/modes-test");
+ assertEquals(CreateMode.values().length, listing.get().getResult().size());
+ }
+
+ @Test
+ public void createWithOtherCreateMode() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:create-mode");
+ mock.expectedMessageCount(1);
+
+ Exchange e = createExchangeWithBody(testPayload);
+ e.setPattern(ExchangePattern.InOut);
+
+ template.send("direct:create-mode", e);
+ mock.await(5, TimeUnit.SECONDS);
+
+ Stat s = mock.getReceivedExchanges().get(0).getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_STATISTICS, Stat.class);
+ assertEquals(s.getEphemeralOwner(), 0);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void setAndGetListing() throws Exception {
+
+ client.createPersistent("/set-listing", "parent for set and list test");
+
+ Exchange exchange = createExchangeWithBody(testPayload);
+ exchange.getIn().setHeader(ZOOKEEPER_NODE, "/set-listing/firstborn");
+ exchange.setPattern(ExchangePattern.InOut);
+ template.send("zoo://localhost:39913/set-listing?create=true&listChildren=true", exchange);
+ List<String> children = ExchangeHelper.getMandatoryOutBody(exchange, List.class);
+ assertEquals(1, children.size());
+ assertEquals("firstborn", children.get(0));
+ }
+
+ @Test
+ public void testZookeeperMessage() throws Exception {
+
+ MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
+ mock.expectedMessageCount(1);
+
+ Exchange exchange = createExchangeWithBody(testPayload);
+ template.send("direct:roundtrip", exchange);
+ mock.await();
+ mock.assertIsSatisfied();
+
+ Message received = mock.getReceivedExchanges().get(0).getIn();
+ assertEquals("/node", ZooKeeperMessage.getPath(received));
+ assertNotNull(ZooKeeperMessage.getStatistics(received));
+ }
+}
Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/AnyOfOperationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/AnyOfOperationTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/AnyOfOperationTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/AnyOfOperationTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+public class AnyOfOperationTest extends ZooKeeperTestSupport {
+
+
+ @Test
+ public void testExistsOrWaitsWhenNodeExists() throws Exception {
+ String node = "/cogito";
+ client.create(node, "ergo sum");
+ AnyOfOperations operation = getExistsOrWaitOperation(node);
+ assertEquals(node, operation.get().getResult());
+ }
+
+
+ @Test
+ public void testExistsOrWaitsWhenNodeDoesNotExist() throws Exception {
+ String node = "/chapter-one";
+ AnyOfOperations operation = getExistsOrWaitOperation(node);
+ Thread.sleep(1000);
+ client.create(node, "I am born");
+ assertEquals(node, operation.get().getResult());
+ }
+
+ private AnyOfOperations getExistsOrWaitOperation(String node) {
+ ZooKeeper connection = getConnection();
+ AnyOfOperations operation = new AnyOfOperations(node, new ExistsOperation(connection, node),
+ new ExistenceChangedOperation(connection, node));
+ return operation;
+ }
+}
Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/ChildrenChangedOperationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/ChildrenChangedOperationTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/ChildrenChangedOperationTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/ChildrenChangedOperationTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+public class ChildrenChangedOperationTest extends ZooKeeperTestSupport {
+
+ @Test
+ public void getsListingWhenNodeIsCreated() throws Exception {
+ String path = "/parent";
+ client.createPersistent(path, null);
+
+ ZooKeeper connection = getConnection();
+ ChildrenChangedOperation future = new ChildrenChangedOperation(connection, path);
+ connection.getChildren(path, future, null);
+
+ client.createPersistent(path + "/child1", null);
+ assertEquals(createChildListing("child1"), future.get().getResult());
+ }
+
+ @Test
+ public void getsNotifiedWhenNodeIsDeleted() throws Exception {
+
+ String path = "/parent2";
+ client.createPersistent(path, null);
+ client.createPersistent(path + "/child1", null);
+
+ ZooKeeper connection = getConnection();
+ ChildrenChangedOperation future = new ChildrenChangedOperation(connection, path);
+ connection.getChildren(path, future, null);
+
+ client.delete(path + "/child1");
+ assertEquals(createChildListing(), future.get().getResult());
+ }
+
+ @Test
+ public void getsNoListingWhenOnlyChangeIsRequired() throws Exception {
+ String path = "/parent3";
+ client.createPersistent(path, null);
+
+ ZooKeeper connection = getConnection();
+ ChildrenChangedOperation future = new ChildrenChangedOperation(connection, path, false);
+ connection.getChildren(path, future, null);
+
+ client.createPersistent(path + "/child3", null);
+ assertEquals(null, future.get());
+ }
+}
Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/CreateOperationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/CreateOperationTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/CreateOperationTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/CreateOperationTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CreateOperationTest extends ZooKeeperTestSupport {
+
+ private ZooKeeper connection;
+
+ @Before
+ public void setupConnection() {
+ connection = getConnection();
+ }
+
+ @Test
+ public void createBasic() throws Exception {
+
+ CreateOperation create = new CreateOperation(connection, "/one");
+
+ OperationResult<String> result = create.get();
+ assertEquals("/one", result.getResult());
+
+ verifyNodeContainsData("/one", null);
+ }
+
+ @Test
+ public void createBasicWithData() throws Exception {
+ CreateOperation create = new CreateOperation(connection, "/two");
+ create.setData(testPayload.getBytes());
+
+ OperationResult<String> result = create.get();
+
+ assertEquals("/two", result.getResult());
+ verifyNodeContainsData("/two", testPayloadBytes);
+ }
+
+ @Test
+ public void createSequencedNodeToTestCreateMode() throws Exception {
+ CreateOperation create = new CreateOperation(connection, "/three");
+ create.setData(testPayload.getBytes());
+ create.setCreateMode(CreateMode.EPHEMERAL_SEQUENTIAL);
+
+ OperationResult<String> result = create.get();
+ assertEquals("/three0000000002", result.getResult());
+
+ verifyNodeContainsData("/three0000000002", testPayloadBytes);
+ }
+
+ @Test
+ public void createNodeWithSpecificAccess() throws Exception {
+ CreateOperation create = new CreateOperation(connection, "/four");
+ create.setData(testPayload.getBytes());
+ List<ACL> perms = Collections.singletonList(new ACL(Perms.CREATE, Ids.ANYONE_ID_UNSAFE));
+ create.setPermissions(perms);
+
+ OperationResult<String> result = create.get();
+ assertEquals("/four", result.getResult());
+
+ verifyAccessControlList("/four", perms);
+ }
+}
Added: camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/DataChangedOperationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/DataChangedOperationTest.java?rev=1161306&view=auto
==============================================================================
--- camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/DataChangedOperationTest.java (added)
+++ camel/trunk/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/operations/DataChangedOperationTest.java Wed Aug 24 22:31:37 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper.operations;
+
+import org.apache.camel.component.zookeeper.ZooKeeperTestSupport;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+public class DataChangedOperationTest extends ZooKeeperTestSupport {
+
+ @Test
+ public void getsDataWhenNodeChanges() throws Exception {
+ client.create("/datachanged", "this won't hurt a bit");
+ ZooKeeper connection = getConnection();
+
+ DataChangedOperation future = new DataChangedOperation(connection, "/datachanged", true);
+ connection.getData("/datachanged", future, null);
+
+ client.setData("/datachanged", "Really trust us", -1);
+ assertArrayEquals("Really trust us".getBytes(), future.get().getResult());
+ }
+
+ @Test
+ public void getsNotifiedWhenNodeIsDeleted() throws Exception {
+
+ client.create("/existedButWasDeleted", "this won't hurt a bit");
+ ZooKeeper connection = getConnection();
+
+ DataChangedOperation future = new DataChangedOperation(connection, "/existedButWasDeleted", true);
+ connection.getData("/existedButWasDeleted", future, null);
+
+ client.delete("/existedButWasDeleted");
+ assertEquals(null, future.get().getResult());
+ }
+}