You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/27 07:37:43 UTC

[2/2] camel git commit: Renamed the producer name from Zookeeper to ZooKeeper

Renamed the producer name from Zookeeper to ZooKeeper


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9c076c3d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9c076c3d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9c076c3d

Branch: refs/heads/master
Commit: 9c076c3d573435a71bbbe560717930b7dc377a70
Parents: 3c3202e
Author: Tatsuya Hoshino <ta...@gmail.com>
Authored: Wed Apr 27 08:04:41 2016 +0900
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Apr 27 07:37:31 2016 +0200

----------------------------------------------------------------------
 .../component/zookeeper/ZooKeeperEndpoint.java  |   4 +-
 .../component/zookeeper/ZooKeeperProducer.java  | 266 +++++++++++++++++++
 .../component/zookeeper/ZookeeperProducer.java  | 266 -------------------
 .../zookeeper/ZooKeeperProducerTest.java        | 167 ++++++++++++
 .../zookeeper/ZookeeperProducerTest.java        | 167 ------------
 5 files changed, 435 insertions(+), 435 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9c076c3d/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
index e85876d..fa646f6 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
@@ -45,7 +45,7 @@ public class ZooKeeperEndpoint extends DefaultEndpoint {
     }
 
     public Producer createProducer() throws Exception {
-        return new ZookeeperProducer(this);
+        return new ZooKeeperProducer(this);
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
@@ -167,7 +167,7 @@ public class ZooKeeperEndpoint extends DefaultEndpoint {
     public void setSendEmptyMessageOnDelete(boolean sendEmptyMessageOnDelete) {
         getConfiguration().setSendEmptyMessageOnDelete(sendEmptyMessageOnDelete);
     }
-    
+
     @Override
     protected void doStop() throws Exception {
         if (connectionManager != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/9c076c3d/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperProducer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperProducer.java
new file mode 100644
index 0000000..e5428af
--- /dev/null
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperProducer.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import static java.lang.String.format;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.zookeeper.operations.CreateOperation;
+import org.apache.camel.component.zookeeper.operations.DeleteOperation;
+import org.apache.camel.component.zookeeper.operations.GetChildrenOperation;
+import org.apache.camel.component.zookeeper.operations.OperationResult;
+import org.apache.camel.component.zookeeper.operations.SetDataOperation;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ExchangeHelper;
+
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getAclListFromMessage;
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateMode;
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateModeFromString;
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getNodeFromMessage;
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getPayloadFromExchange;
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getVersionFromMessage;
+
+/**
+ * <code>ZooKeeperProducer</code> attempts to set the content of nodes in the
+ * {@link ZooKeeper} cluster with the payloads of the of the exchanges it
+ * receives.
+ */
+@SuppressWarnings("rawtypes")
+public class ZooKeeperProducer extends DefaultProducer {
+    public static final String ZK_OPERATION_WRITE  = "WRITE";
+    public static final String ZK_OPERATION_DELETE = "DELETE";
+
+    private final ZooKeeperConfiguration configuration;
+    private ZooKeeperConnectionManager zkm;
+    private ZooKeeper connection;
+
+    public ZooKeeperProducer(ZooKeeperEndpoint endpoint) {
+        super(endpoint);
+        this.configuration = endpoint.getConfiguration();
+        this.zkm = endpoint.getConnectionManager();
+    }
+
+    public void process(Exchange exchange) throws Exception {
+
+        ProductionContext context = new ProductionContext(connection, exchange);
+
+        String operation = exchange.getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_OPERATION, String.class);
+        boolean isDelete = ZK_OPERATION_DELETE.equals(operation);
+
+        if (ExchangeHelper.isOutCapable(exchange)) {
+            if (isDelete) {
+                if (log.isDebugEnabled()) {
+                    log.debug(format("Deleting znode '%s', waiting for confirmation", context.node));
+                }
+
+                OperationResult result = synchronouslyDelete(context);
+                if (configuration.isListChildren()) {
+                    result = listChildren(context);
+                }
+                updateExchangeWithResult(context, result);
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug(format("Storing data to znode '%s', waiting for confirmation", context.node));
+                }
+
+                OperationResult result = synchronouslySetData(context);
+                if (configuration.isListChildren()) {
+                    result = listChildren(context);
+                }
+                updateExchangeWithResult(context, result);
+            }
+        } else {
+            if (isDelete) {
+                asynchronouslyDeleteNode(connection, context);
+            } else {
+                asynchronouslySetDataOnNode(connection, context);
+            }
+        }
+
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        connection = zkm.getConnection();
+        if (log.isTraceEnabled()) {
+            log.trace(String.format("Starting zookeeper producer of '%s'", configuration.getPath()));
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        if (log.isTraceEnabled()) {
+            log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
+        }
+        zkm.shutdown();
+    }
+
+    private void asynchronouslyDeleteNode(ZooKeeper connection, ProductionContext context) {
+        if (log.isDebugEnabled()) {
+            log.debug(format("Deleting node '%s', not waiting for confirmation", context.node));
+        }
+        connection.delete(context.node, context.version, new AsyncDeleteCallback(), context);
+
+    }
+
+    private void asynchronouslySetDataOnNode(ZooKeeper connection, ProductionContext context) {
+        if (log.isDebugEnabled()) {
+            log.debug(format("Storing data to node '%s', not waiting for confirmation", context.node));
+        }
+        connection.setData(context.node, context.payload, context.version, new AsyncSetDataCallback(), context);
+    }
+
+    private void updateExchangeWithResult(ProductionContext context, OperationResult result) {
+        ZooKeeperMessage out = new ZooKeeperMessage(context.node, result.getStatistics(), context.in.getHeaders());
+        if (result.isOk()) {
+            out.setBody(result.getResult());
+        } else {
+            context.exchange.setException(result.getException());
+        }
+
+        context.exchange.setOut(out);
+    }
+
+    private OperationResult listChildren(ProductionContext context) throws Exception {
+        return new GetChildrenOperation(context.connection, configuration.getPath()).get();
+    }
+
+    /** Simple container to avoid passing all these around as parameters */
+    private class ProductionContext {
+        ZooKeeper connection;
+        Exchange exchange;
+        Message in;
+        byte[] payload;
+        int version;
+        String node;
+
+        public ProductionContext(ZooKeeper connection, Exchange exchange) {
+            this.connection = connection;
+            this.exchange = exchange;
+            this.in = exchange.getIn();
+            this.node = getNodeFromMessage(in, configuration.getPath());
+            this.version = getVersionFromMessage(in);
+            this.payload = getPayloadFromExchange(exchange);
+        }
+    }
+
+    private class AsyncSetDataCallback implements StatCallback {
+
+        public void processResult(int rc, String node, Object ctx, Stat statistics) {
+            if (Code.NONODE.equals(Code.get(rc))) {
+                if (configuration.isCreate()) {
+                    log.warn(format("Node '%s' did not exist, creating it...", node));
+                    ProductionContext context = (ProductionContext)ctx;
+                    OperationResult<String> result = null;
+                    try {
+                        result = createNode(context);
+                    } catch (Exception e) {
+                        log.error(format("Error trying to create node '%s'", node), e);
+                    }
+
+                    if (result == null || !result.isOk()) {
+                        log.error(format("Error creating node '%s'", node), result.getException());
+                    }
+                }
+            } else {
+                logStoreComplete(node, statistics);
+            }
+        }
+    }
+
+    private class AsyncDeleteCallback implements VoidCallback {
+        @Override
+        public void processResult(int rc, String path, Object ctx) {
+            if (log.isDebugEnabled()) {
+                if (log.isTraceEnabled()) {
+                    log.trace(format("Removed data node '%s'", path));
+                } else {
+                    log.debug(format("Removed data node '%s'", path));
+                }
+            }
+        }
+    }
+
+    private OperationResult<String> createNode(ProductionContext ctx) throws Exception {
+        CreateOperation create = new CreateOperation(ctx.connection, ctx.node);
+        create.setPermissions(getAclListFromMessage(ctx.exchange.getIn()));
+
+        CreateMode mode = null;
+        String modeString = configuration.getCreateMode();
+        if (modeString != null) {
+            try {
+                mode = getCreateModeFromString(modeString, CreateMode.EPHEMERAL);
+            } catch (Exception e) { }
+        } else {
+            mode = getCreateMode(ctx.exchange.getIn(), CreateMode.EPHEMERAL);
+        }
+        create.setCreateMode(mode == null ? CreateMode.EPHEMERAL : mode);
+        create.setData(ctx.payload);
+        return create.get();
+    }
+
+    /**
+     * Tries to set the data first and if a no node error is received then an
+     * attempt will be made to create it instead.
+     */
+    private OperationResult synchronouslySetData(ProductionContext ctx) throws Exception {
+
+        SetDataOperation setData = new SetDataOperation(ctx.connection, ctx.node, ctx.payload);
+        setData.setVersion(ctx.version);
+
+        OperationResult result = setData.get();
+
+        if (!result.isOk() && configuration.isCreate() && result.failedDueTo(Code.NONODE)) {
+            log.warn(format("Node '%s' did not exist, creating it.", ctx.node));
+            result = createNode(ctx);
+        }
+        return result;
+    }
+
+    private OperationResult synchronouslyDelete(ProductionContext ctx) throws Exception {
+        DeleteOperation setData = new DeleteOperation(ctx.connection, ctx.node);
+        setData.setVersion(ctx.version);
+
+        OperationResult result = setData.get();
+
+        if (!result.isOk() && configuration.isCreate() && result.failedDueTo(Code.NONODE)) {
+            log.warn(format("Node '%s' did not exist, creating it.", ctx.node));
+            result = createNode(ctx);
+        }
+        return result;
+    }
+
+
+    private void logStoreComplete(String path, Stat statistics) {
+        if (log.isDebugEnabled()) {
+            if (log.isTraceEnabled()) {
+                log.trace(format("Stored data to node '%s', and receive statistics %s", path, statistics));
+            } else {
+                log.debug(format("Stored data to node '%s'", path));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/9c076c3d/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
deleted file mode 100644
index 2280049..0000000
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper;
-
-import static java.lang.String.format;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.component.zookeeper.operations.CreateOperation;
-import org.apache.camel.component.zookeeper.operations.DeleteOperation;
-import org.apache.camel.component.zookeeper.operations.GetChildrenOperation;
-import org.apache.camel.component.zookeeper.operations.OperationResult;
-import org.apache.camel.component.zookeeper.operations.SetDataOperation;
-import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.util.ExchangeHelper;
-
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.AsyncCallback.VoidCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getAclListFromMessage;
-import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateMode;
-import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateModeFromString;
-import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getNodeFromMessage;
-import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getPayloadFromExchange;
-import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getVersionFromMessage;
-
-/**
- * <code>ZookeeperProducer</code> attempts to set the content of nodes in the
- * {@link ZooKeeper} cluster with the payloads of the of the exchanges it
- * receives.
- */
-@SuppressWarnings("rawtypes")
-public class ZookeeperProducer extends DefaultProducer {
-    public static final String ZK_OPERATION_WRITE  = "WRITE";
-    public static final String ZK_OPERATION_DELETE = "DELETE";
-
-    private final ZooKeeperConfiguration configuration;
-    private ZooKeeperConnectionManager zkm;
-    private ZooKeeper connection;
-
-    public ZookeeperProducer(ZooKeeperEndpoint endpoint) {
-        super(endpoint);
-        this.configuration = endpoint.getConfiguration();
-        this.zkm = endpoint.getConnectionManager();
-    }
-
-    public void process(Exchange exchange) throws Exception {
-        
-        ProductionContext context = new ProductionContext(connection, exchange);
-
-        String operation = exchange.getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_OPERATION, String.class);
-        boolean isDelete = ZK_OPERATION_DELETE.equals(operation);
-        
-        if (ExchangeHelper.isOutCapable(exchange)) {
-            if (isDelete) {
-                if (log.isDebugEnabled()) {
-                    log.debug(format("Deleting znode '%s', waiting for confirmation", context.node));
-                }
-
-                OperationResult result = synchronouslyDelete(context);
-                if (configuration.isListChildren()) {
-                    result = listChildren(context);
-                }
-                updateExchangeWithResult(context, result);
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug(format("Storing data to znode '%s', waiting for confirmation", context.node));
-                }
-
-                OperationResult result = synchronouslySetData(context);
-                if (configuration.isListChildren()) {
-                    result = listChildren(context);
-                }
-                updateExchangeWithResult(context, result);
-            }
-        } else {
-            if (isDelete) {
-                asynchronouslyDeleteNode(connection, context);
-            } else {
-                asynchronouslySetDataOnNode(connection, context);
-            }
-        }
-        
-    }
-    
-    @Override
-    protected void doStart() throws Exception {
-        connection = zkm.getConnection();
-        if (log.isTraceEnabled()) {
-            log.trace(String.format("Starting zookeeper producer of '%s'", configuration.getPath()));
-        }
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        super.doStop();
-        if (log.isTraceEnabled()) {
-            log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
-        }
-        zkm.shutdown();
-    }
-
-    private void asynchronouslyDeleteNode(ZooKeeper connection, ProductionContext context) {
-        if (log.isDebugEnabled()) {
-            log.debug(format("Deleting node '%s', not waiting for confirmation", context.node));
-        }
-        connection.delete(context.node, context.version, new AsyncDeleteCallback(), context);
-
-    }
-
-    private void asynchronouslySetDataOnNode(ZooKeeper connection, ProductionContext context) {
-        if (log.isDebugEnabled()) {
-            log.debug(format("Storing data to node '%s', not waiting for confirmation", context.node));
-        }
-        connection.setData(context.node, context.payload, context.version, new AsyncSetDataCallback(), context);
-    }
-
-    private void updateExchangeWithResult(ProductionContext context, OperationResult result) {
-        ZooKeeperMessage out = new ZooKeeperMessage(context.node, result.getStatistics(), context.in.getHeaders());
-        if (result.isOk()) {
-            out.setBody(result.getResult());
-        } else {
-            context.exchange.setException(result.getException());
-        }
-        
-        context.exchange.setOut(out);
-    }
-
-    private OperationResult listChildren(ProductionContext context) throws Exception {
-        return new GetChildrenOperation(context.connection, configuration.getPath()).get();
-    }
-
-    /** Simple container to avoid passing all these around as parameters */
-    private class ProductionContext {
-        ZooKeeper connection;
-        Exchange exchange;
-        Message in;
-        byte[] payload;
-        int version;
-        String node;
-
-        public ProductionContext(ZooKeeper connection, Exchange exchange) {
-            this.connection = connection;
-            this.exchange = exchange;
-            this.in = exchange.getIn();
-            this.node = getNodeFromMessage(in, configuration.getPath());
-            this.version = getVersionFromMessage(in);
-            this.payload = getPayloadFromExchange(exchange);
-        }
-    }
-
-    private class AsyncSetDataCallback implements StatCallback {
-
-        public void processResult(int rc, String node, Object ctx, Stat statistics) {
-            if (Code.NONODE.equals(Code.get(rc))) {
-                if (configuration.isCreate()) {
-                    log.warn(format("Node '%s' did not exist, creating it...", node));
-                    ProductionContext context = (ProductionContext)ctx;
-                    OperationResult<String> result = null;
-                    try {
-                        result = createNode(context);
-                    } catch (Exception e) {
-                        log.error(format("Error trying to create node '%s'", node), e);
-                    }
-
-                    if (result == null || !result.isOk()) {
-                        log.error(format("Error creating node '%s'", node), result.getException());
-                    }
-                }
-            } else {
-                logStoreComplete(node, statistics);
-            }
-        }
-    }
-
-    private class AsyncDeleteCallback implements VoidCallback {
-        @Override
-        public void processResult(int rc, String path, Object ctx) {
-            if (log.isDebugEnabled()) {
-                if (log.isTraceEnabled()) {
-                    log.trace(format("Removed data node '%s'", path));
-                } else {
-                    log.debug(format("Removed data node '%s'", path));
-                }
-            }
-        }
-    }
-    
-    private OperationResult<String> createNode(ProductionContext ctx) throws Exception {
-        CreateOperation create = new CreateOperation(ctx.connection, ctx.node);
-        create.setPermissions(getAclListFromMessage(ctx.exchange.getIn()));
-        
-        CreateMode mode = null;
-        String modeString = configuration.getCreateMode();
-        if (modeString != null) {
-            try {
-                mode = getCreateModeFromString(modeString, CreateMode.EPHEMERAL);
-            } catch (Exception e) { }
-        } else {
-            mode = getCreateMode(ctx.exchange.getIn(), CreateMode.EPHEMERAL);
-        }
-        create.setCreateMode(mode == null ? CreateMode.EPHEMERAL : mode);
-        create.setData(ctx.payload);
-        return create.get();
-    }
-
-    /**
-     * Tries to set the data first and if a no node error is received then an
-     * attempt will be made to create it instead.
-     */
-    private OperationResult synchronouslySetData(ProductionContext ctx) throws Exception {
-
-        SetDataOperation setData = new SetDataOperation(ctx.connection, ctx.node, ctx.payload);
-        setData.setVersion(ctx.version);
-
-        OperationResult result = setData.get();
-
-        if (!result.isOk() && configuration.isCreate() && result.failedDueTo(Code.NONODE)) {
-            log.warn(format("Node '%s' did not exist, creating it.", ctx.node));
-            result = createNode(ctx);
-        }
-        return result;
-    }
-
-    private OperationResult synchronouslyDelete(ProductionContext ctx) throws Exception {
-        DeleteOperation setData = new DeleteOperation(ctx.connection, ctx.node);
-        setData.setVersion(ctx.version);
-
-        OperationResult result = setData.get();
-
-        if (!result.isOk() && configuration.isCreate() && result.failedDueTo(Code.NONODE)) {
-            log.warn(format("Node '%s' did not exist, creating it.", ctx.node));
-            result = createNode(ctx);
-        }
-        return result;
-    }
-
-    
-    private void logStoreComplete(String path, Stat statistics) {
-        if (log.isDebugEnabled()) {
-            if (log.isTraceEnabled()) {
-                log.trace(format("Stored data to node '%s', and receive statistics %s", path, statistics));
-            } else {
-                log.debug(format("Stored data to node '%s'", path));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/9c076c3d/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperProducerTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperProducerTest.java
new file mode 100644
index 0000000..ced212f
--- /dev/null
+++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperProducerTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.zookeeper.operations.GetChildrenOperation;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+
+import org.junit.Test;
+
+import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_CREATE_MODE;
+import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_NODE;
+import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_OPERATION;
+
+public class ZooKeeperProducerTest extends ZooKeeperTestSupport {
+
+    private String zookeeperUri;
+    private String testPayload = "TestPayload";
+
+    @Override
+    protected RouteBuilder[] createRouteBuilders() throws Exception {
+        return new RouteBuilder[] {new RouteBuilder() {
+            public void configure() throws Exception {
+                zookeeperUri = "zookeeper://localhost:" + getServerPort() + "/node?create=true";
+                from("direct:roundtrip").to(zookeeperUri).to("mock:producer-out");
+                from(zookeeperUri).to("mock:consumed-from-node");
+            }
+        }, new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:no-create-fails-set").to("zookeeper://localhost:" + getServerPort() + "/doesnotexist");
+            }
+        }, new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:node-from-header").to("zookeeper://localhost:" + getServerPort() + "/notset?create=true");
+                from("zookeeper://localhost:" + getServerPort() + "/set?create=true").to("mock:consumed-from-set-node");
+            }
+        }, new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:create-mode").to("zookeeper://localhost:" + getServerPort() + "/persistent?create=true&createMode=PERSISTENT").to("mock:create-mode");
+            }
+        }, new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct:delete").to("zookeeper://localhost:39913/to-be-deleted").to("mock:delete");
+            }
+        }};
+    }
+
+    @Test
+    public void testRoundtripOfDataToAndFromZnode() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
+        MockEndpoint pipeline = getMockEndpoint("mock:producer-out");
+        mock.expectedMessageCount(1);
+        pipeline.expectedMessageCount(1);
+
+        Exchange e = createExchangeWithBody(testPayload);
+        e.setPattern(ExchangePattern.InOut);
+        template.send("direct:roundtrip", e);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testAsyncRoundtripOfDataToAndFromZnode() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
+        mock.expectedMessageCount(1);
+
+        Exchange e = createExchangeWithBody(testPayload);
+        template.send("direct:roundtrip", e);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void setUsingCreateModeFromHeader() throws Exception {
+        client.createPersistent("/modes-test", "parent for modes");
+        for (CreateMode mode : CreateMode.values()) {
+            Exchange exchange = createExchangeWithBody(testPayload);
+            exchange.getIn().setHeader(ZOOKEEPER_CREATE_MODE, mode);
+            exchange.getIn().setHeader(ZOOKEEPER_NODE, "/modes-test/" + mode);
+            exchange.setPattern(ExchangePattern.InOut);
+            template.send("direct:node-from-header", exchange);
+        }
+        GetChildrenOperation listing = new GetChildrenOperation(getConnection(), "/modes-test");
+        assertEquals(CreateMode.values().length, listing.get().getResult().size());
+    }
+
+    @Test
+    public void createWithOtherCreateMode() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:create-mode");
+        mock.expectedMessageCount(1);
+
+        Exchange e = createExchangeWithBody(testPayload);
+        e.setPattern(ExchangePattern.InOut);
+
+        template.send("direct:create-mode", e);
+
+        assertMockEndpointsSatisfied();
+
+        Stat s = mock.getReceivedExchanges().get(0).getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_STATISTICS, Stat.class);
+        assertEquals(s.getEphemeralOwner(), 0);
+    }
+
+    @Test
+    public void deleteNode() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:delete");
+        mock.expectedMessageCount(1);
+
+        client.createPersistent("/to-be-deleted", "to be deleted");
+        Exchange e = createExchangeWithBody(null);
+        e.setPattern(ExchangePattern.InOut);
+        e.getIn().setHeader(ZOOKEEPER_OPERATION, "DELETE");
+        template.send("direct:delete", e);
+
+        assertMockEndpointsSatisfied();
+
+        assertNull(client.getConnection().exists("/to-be-deleted", false));
+    }
+
+    @Test
+    public void setAndGetListing() throws Exception {
+        client.createPersistent("/set-listing", "parent for set and list test");
+
+        Exchange exchange = createExchangeWithBody(testPayload);
+        exchange.getIn().setHeader(ZOOKEEPER_NODE, "/set-listing/firstborn");
+        exchange.setPattern(ExchangePattern.InOut);
+        template.send("zookeeper://localhost:" + getServerPort() + "/set-listing?create=true&listChildren=true", exchange);
+        List<?> children = exchange.getOut().getMandatoryBody(List.class);
+        assertEquals(1, children.size());
+        assertEquals("firstborn", children.get(0));
+    }
+
+    @Test
+    public void testZookeeperMessage() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
+        mock.expectedMessageCount(1);
+
+        Exchange exchange = createExchangeWithBody(testPayload);
+        template.send("direct:roundtrip", exchange);
+
+        assertMockEndpointsSatisfied();
+
+        Message received = mock.getReceivedExchanges().get(0).getIn();
+        assertEquals("/node", ZooKeeperMessage.getPath(received));
+        assertNotNull(ZooKeeperMessage.getStatistics(received));
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/9c076c3d/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java
deleted file mode 100644
index 3dcb493..0000000
--- a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper;
-
-import java.util.List;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Message;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.zookeeper.operations.GetChildrenOperation;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-
-import org.junit.Test;
-
-import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_CREATE_MODE;
-import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_NODE;
-import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_OPERATION;
-
-public class ZookeeperProducerTest extends ZooKeeperTestSupport {
-
-    private String zookeeperUri;
-    private String testPayload = "TestPayload";
-
-    @Override
-    protected RouteBuilder[] createRouteBuilders() throws Exception {
-        return new RouteBuilder[] {new RouteBuilder() {
-            public void configure() throws Exception {
-                zookeeperUri = "zookeeper://localhost:" + getServerPort() + "/node?create=true";
-                from("direct:roundtrip").to(zookeeperUri).to("mock:producer-out");
-                from(zookeeperUri).to("mock:consumed-from-node");
-            }
-        }, new RouteBuilder() {
-            public void configure() throws Exception {
-                from("direct:no-create-fails-set").to("zookeeper://localhost:" + getServerPort() + "/doesnotexist");
-            }
-        }, new RouteBuilder() {
-            public void configure() throws Exception {
-                from("direct:node-from-header").to("zookeeper://localhost:" + getServerPort() + "/notset?create=true");
-                from("zookeeper://localhost:" + getServerPort() + "/set?create=true").to("mock:consumed-from-set-node");
-            }
-        }, new RouteBuilder() {
-            public void configure() throws Exception {
-                from("direct:create-mode").to("zookeeper://localhost:" + getServerPort() + "/persistent?create=true&createMode=PERSISTENT").to("mock:create-mode");
-            }
-        }, new RouteBuilder() {
-            public void configure() throws Exception {
-                from("direct:delete").to("zookeeper://localhost:39913/to-be-deleted").to("mock:delete");
-            }
-        }};
-    }
-
-    @Test
-    public void testRoundtripOfDataToAndFromZnode() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
-        MockEndpoint pipeline = getMockEndpoint("mock:producer-out");
-        mock.expectedMessageCount(1);
-        pipeline.expectedMessageCount(1);
-
-        Exchange e = createExchangeWithBody(testPayload);
-        e.setPattern(ExchangePattern.InOut);
-        template.send("direct:roundtrip", e);
-
-        assertMockEndpointsSatisfied();
-    }
-
-    @Test
-    public void testAsyncRoundtripOfDataToAndFromZnode() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
-        mock.expectedMessageCount(1);
-
-        Exchange e = createExchangeWithBody(testPayload);
-        template.send("direct:roundtrip", e);
-
-        assertMockEndpointsSatisfied();
-    }
-
-    @Test
-    public void setUsingCreateModeFromHeader() throws Exception {
-        client.createPersistent("/modes-test", "parent for modes");
-        for (CreateMode mode : CreateMode.values()) {
-            Exchange exchange = createExchangeWithBody(testPayload);
-            exchange.getIn().setHeader(ZOOKEEPER_CREATE_MODE, mode);
-            exchange.getIn().setHeader(ZOOKEEPER_NODE, "/modes-test/" + mode);
-            exchange.setPattern(ExchangePattern.InOut);
-            template.send("direct:node-from-header", exchange);
-        }
-        GetChildrenOperation listing = new GetChildrenOperation(getConnection(), "/modes-test");
-        assertEquals(CreateMode.values().length, listing.get().getResult().size());
-    }
-    
-    @Test
-    public void createWithOtherCreateMode() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:create-mode");
-        mock.expectedMessageCount(1);
-        
-        Exchange e = createExchangeWithBody(testPayload);
-        e.setPattern(ExchangePattern.InOut);
-        
-        template.send("direct:create-mode", e);
-
-        assertMockEndpointsSatisfied();
-
-        Stat s = mock.getReceivedExchanges().get(0).getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_STATISTICS, Stat.class);
-        assertEquals(s.getEphemeralOwner(), 0);
-    }
-
-    @Test
-    public void deleteNode() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:delete");
-        mock.expectedMessageCount(1);
-
-        client.createPersistent("/to-be-deleted", "to be deleted");
-        Exchange e = createExchangeWithBody(null);
-        e.setPattern(ExchangePattern.InOut);
-        e.getIn().setHeader(ZOOKEEPER_OPERATION, "DELETE");
-        template.send("direct:delete", e);
-
-        assertMockEndpointsSatisfied();
-
-        assertNull(client.getConnection().exists("/to-be-deleted", false));
-    }
-
-    @Test
-    public void setAndGetListing() throws Exception {
-        client.createPersistent("/set-listing", "parent for set and list test");
-
-        Exchange exchange = createExchangeWithBody(testPayload);
-        exchange.getIn().setHeader(ZOOKEEPER_NODE, "/set-listing/firstborn");
-        exchange.setPattern(ExchangePattern.InOut);
-        template.send("zookeeper://localhost:" + getServerPort() + "/set-listing?create=true&listChildren=true", exchange);
-        List<?> children = exchange.getOut().getMandatoryBody(List.class);
-        assertEquals(1, children.size());
-        assertEquals("firstborn", children.get(0));
-    }
-
-    @Test
-    public void testZookeeperMessage() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
-        mock.expectedMessageCount(1);
-
-        Exchange exchange = createExchangeWithBody(testPayload);
-        template.send("direct:roundtrip", exchange);
-
-        assertMockEndpointsSatisfied();
-
-        Message received = mock.getReceivedExchanges().get(0).getIn();
-        assertEquals("/node", ZooKeeperMessage.getPath(received));
-        assertNotNull(ZooKeeperMessage.getStatistics(received));
-    }
-}