You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/04/27 07:37:43 UTC
[2/2] camel git commit: Renamed the producer name from Zookeeper to
ZooKeeper
Renamed the producer name from Zookeeper to ZooKeeper
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9c076c3d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9c076c3d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9c076c3d
Branch: refs/heads/master
Commit: 9c076c3d573435a71bbbe560717930b7dc377a70
Parents: 3c3202e
Author: Tatsuya Hoshino <ta...@gmail.com>
Authored: Wed Apr 27 08:04:41 2016 +0900
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Apr 27 07:37:31 2016 +0200
----------------------------------------------------------------------
.../component/zookeeper/ZooKeeperEndpoint.java | 4 +-
.../component/zookeeper/ZooKeeperProducer.java | 266 +++++++++++++++++++
.../component/zookeeper/ZookeeperProducer.java | 266 -------------------
.../zookeeper/ZooKeeperProducerTest.java | 167 ++++++++++++
.../zookeeper/ZookeeperProducerTest.java | 167 ------------
5 files changed, 435 insertions(+), 435 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9c076c3d/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
index e85876d..fa646f6 100644
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperEndpoint.java
@@ -45,7 +45,7 @@ public class ZooKeeperEndpoint extends DefaultEndpoint {
}
public Producer createProducer() throws Exception {
- return new ZookeeperProducer(this);
+ return new ZooKeeperProducer(this);
}
public Consumer createConsumer(Processor processor) throws Exception {
@@ -167,7 +167,7 @@ public class ZooKeeperEndpoint extends DefaultEndpoint {
public void setSendEmptyMessageOnDelete(boolean sendEmptyMessageOnDelete) {
getConfiguration().setSendEmptyMessageOnDelete(sendEmptyMessageOnDelete);
}
-
+
@Override
protected void doStop() throws Exception {
if (connectionManager != null) {
http://git-wip-us.apache.org/repos/asf/camel/blob/9c076c3d/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperProducer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperProducer.java
new file mode 100644
index 0000000..e5428af
--- /dev/null
+++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZooKeeperProducer.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import static java.lang.String.format;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.zookeeper.operations.CreateOperation;
+import org.apache.camel.component.zookeeper.operations.DeleteOperation;
+import org.apache.camel.component.zookeeper.operations.GetChildrenOperation;
+import org.apache.camel.component.zookeeper.operations.OperationResult;
+import org.apache.camel.component.zookeeper.operations.SetDataOperation;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ExchangeHelper;
+
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getAclListFromMessage;
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateMode;
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateModeFromString;
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getNodeFromMessage;
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getPayloadFromExchange;
+import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getVersionFromMessage;
+
+/**
+ * <code>ZooKeeperProducer</code> attempts to set the content of nodes in the
+ * {@link ZooKeeper} cluster with the payloads of the of the exchanges it
+ * receives.
+ */
+@SuppressWarnings("rawtypes")
+public class ZooKeeperProducer extends DefaultProducer {
+ public static final String ZK_OPERATION_WRITE = "WRITE";
+ public static final String ZK_OPERATION_DELETE = "DELETE";
+
+ private final ZooKeeperConfiguration configuration;
+ private ZooKeeperConnectionManager zkm;
+ private ZooKeeper connection;
+
+ public ZooKeeperProducer(ZooKeeperEndpoint endpoint) {
+ super(endpoint);
+ this.configuration = endpoint.getConfiguration();
+ this.zkm = endpoint.getConnectionManager();
+ }
+
+ public void process(Exchange exchange) throws Exception {
+
+ ProductionContext context = new ProductionContext(connection, exchange);
+
+ String operation = exchange.getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_OPERATION, String.class);
+ boolean isDelete = ZK_OPERATION_DELETE.equals(operation);
+
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ if (isDelete) {
+ if (log.isDebugEnabled()) {
+ log.debug(format("Deleting znode '%s', waiting for confirmation", context.node));
+ }
+
+ OperationResult result = synchronouslyDelete(context);
+ if (configuration.isListChildren()) {
+ result = listChildren(context);
+ }
+ updateExchangeWithResult(context, result);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(format("Storing data to znode '%s', waiting for confirmation", context.node));
+ }
+
+ OperationResult result = synchronouslySetData(context);
+ if (configuration.isListChildren()) {
+ result = listChildren(context);
+ }
+ updateExchangeWithResult(context, result);
+ }
+ } else {
+ if (isDelete) {
+ asynchronouslyDeleteNode(connection, context);
+ } else {
+ asynchronouslySetDataOnNode(connection, context);
+ }
+ }
+
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ connection = zkm.getConnection();
+ if (log.isTraceEnabled()) {
+ log.trace(String.format("Starting zookeeper producer of '%s'", configuration.getPath()));
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ if (log.isTraceEnabled()) {
+ log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
+ }
+ zkm.shutdown();
+ }
+
+ private void asynchronouslyDeleteNode(ZooKeeper connection, ProductionContext context) {
+ if (log.isDebugEnabled()) {
+ log.debug(format("Deleting node '%s', not waiting for confirmation", context.node));
+ }
+ connection.delete(context.node, context.version, new AsyncDeleteCallback(), context);
+
+ }
+
+ private void asynchronouslySetDataOnNode(ZooKeeper connection, ProductionContext context) {
+ if (log.isDebugEnabled()) {
+ log.debug(format("Storing data to node '%s', not waiting for confirmation", context.node));
+ }
+ connection.setData(context.node, context.payload, context.version, new AsyncSetDataCallback(), context);
+ }
+
+ private void updateExchangeWithResult(ProductionContext context, OperationResult result) {
+ ZooKeeperMessage out = new ZooKeeperMessage(context.node, result.getStatistics(), context.in.getHeaders());
+ if (result.isOk()) {
+ out.setBody(result.getResult());
+ } else {
+ context.exchange.setException(result.getException());
+ }
+
+ context.exchange.setOut(out);
+ }
+
+ private OperationResult listChildren(ProductionContext context) throws Exception {
+ return new GetChildrenOperation(context.connection, configuration.getPath()).get();
+ }
+
+ /** Simple container to avoid passing all these around as parameters */
+ private class ProductionContext {
+ ZooKeeper connection;
+ Exchange exchange;
+ Message in;
+ byte[] payload;
+ int version;
+ String node;
+
+ public ProductionContext(ZooKeeper connection, Exchange exchange) {
+ this.connection = connection;
+ this.exchange = exchange;
+ this.in = exchange.getIn();
+ this.node = getNodeFromMessage(in, configuration.getPath());
+ this.version = getVersionFromMessage(in);
+ this.payload = getPayloadFromExchange(exchange);
+ }
+ }
+
+ private class AsyncSetDataCallback implements StatCallback {
+
+ public void processResult(int rc, String node, Object ctx, Stat statistics) {
+ if (Code.NONODE.equals(Code.get(rc))) {
+ if (configuration.isCreate()) {
+ log.warn(format("Node '%s' did not exist, creating it...", node));
+ ProductionContext context = (ProductionContext)ctx;
+ OperationResult<String> result = null;
+ try {
+ result = createNode(context);
+ } catch (Exception e) {
+ log.error(format("Error trying to create node '%s'", node), e);
+ }
+
+ if (result == null || !result.isOk()) {
+ log.error(format("Error creating node '%s'", node), result.getException());
+ }
+ }
+ } else {
+ logStoreComplete(node, statistics);
+ }
+ }
+ }
+
+ private class AsyncDeleteCallback implements VoidCallback {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (log.isDebugEnabled()) {
+ if (log.isTraceEnabled()) {
+ log.trace(format("Removed data node '%s'", path));
+ } else {
+ log.debug(format("Removed data node '%s'", path));
+ }
+ }
+ }
+ }
+
+ private OperationResult<String> createNode(ProductionContext ctx) throws Exception {
+ CreateOperation create = new CreateOperation(ctx.connection, ctx.node);
+ create.setPermissions(getAclListFromMessage(ctx.exchange.getIn()));
+
+ CreateMode mode = null;
+ String modeString = configuration.getCreateMode();
+ if (modeString != null) {
+ try {
+ mode = getCreateModeFromString(modeString, CreateMode.EPHEMERAL);
+ } catch (Exception e) { }
+ } else {
+ mode = getCreateMode(ctx.exchange.getIn(), CreateMode.EPHEMERAL);
+ }
+ create.setCreateMode(mode == null ? CreateMode.EPHEMERAL : mode);
+ create.setData(ctx.payload);
+ return create.get();
+ }
+
+ /**
+ * Tries to set the data first and if a no node error is received then an
+ * attempt will be made to create it instead.
+ */
+ private OperationResult synchronouslySetData(ProductionContext ctx) throws Exception {
+
+ SetDataOperation setData = new SetDataOperation(ctx.connection, ctx.node, ctx.payload);
+ setData.setVersion(ctx.version);
+
+ OperationResult result = setData.get();
+
+ if (!result.isOk() && configuration.isCreate() && result.failedDueTo(Code.NONODE)) {
+ log.warn(format("Node '%s' did not exist, creating it.", ctx.node));
+ result = createNode(ctx);
+ }
+ return result;
+ }
+
+ private OperationResult synchronouslyDelete(ProductionContext ctx) throws Exception {
+ DeleteOperation setData = new DeleteOperation(ctx.connection, ctx.node);
+ setData.setVersion(ctx.version);
+
+ OperationResult result = setData.get();
+
+ if (!result.isOk() && configuration.isCreate() && result.failedDueTo(Code.NONODE)) {
+ log.warn(format("Node '%s' did not exist, creating it.", ctx.node));
+ result = createNode(ctx);
+ }
+ return result;
+ }
+
+
+ private void logStoreComplete(String path, Stat statistics) {
+ if (log.isDebugEnabled()) {
+ if (log.isTraceEnabled()) {
+ log.trace(format("Stored data to node '%s', and receive statistics %s", path, statistics));
+ } else {
+ log.debug(format("Stored data to node '%s'", path));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/9c076c3d/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
deleted file mode 100644
index 2280049..0000000
--- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/ZookeeperProducer.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper;
-
-import static java.lang.String.format;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.component.zookeeper.operations.CreateOperation;
-import org.apache.camel.component.zookeeper.operations.DeleteOperation;
-import org.apache.camel.component.zookeeper.operations.GetChildrenOperation;
-import org.apache.camel.component.zookeeper.operations.OperationResult;
-import org.apache.camel.component.zookeeper.operations.SetDataOperation;
-import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.util.ExchangeHelper;
-
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.AsyncCallback.VoidCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getAclListFromMessage;
-import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateMode;
-import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getCreateModeFromString;
-import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getNodeFromMessage;
-import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getPayloadFromExchange;
-import static org.apache.camel.component.zookeeper.ZooKeeperUtils.getVersionFromMessage;
-
-/**
- * <code>ZookeeperProducer</code> attempts to set the content of nodes in the
- * {@link ZooKeeper} cluster with the payloads of the of the exchanges it
- * receives.
- */
-@SuppressWarnings("rawtypes")
-public class ZookeeperProducer extends DefaultProducer {
- public static final String ZK_OPERATION_WRITE = "WRITE";
- public static final String ZK_OPERATION_DELETE = "DELETE";
-
- private final ZooKeeperConfiguration configuration;
- private ZooKeeperConnectionManager zkm;
- private ZooKeeper connection;
-
- public ZookeeperProducer(ZooKeeperEndpoint endpoint) {
- super(endpoint);
- this.configuration = endpoint.getConfiguration();
- this.zkm = endpoint.getConnectionManager();
- }
-
- public void process(Exchange exchange) throws Exception {
-
- ProductionContext context = new ProductionContext(connection, exchange);
-
- String operation = exchange.getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_OPERATION, String.class);
- boolean isDelete = ZK_OPERATION_DELETE.equals(operation);
-
- if (ExchangeHelper.isOutCapable(exchange)) {
- if (isDelete) {
- if (log.isDebugEnabled()) {
- log.debug(format("Deleting znode '%s', waiting for confirmation", context.node));
- }
-
- OperationResult result = synchronouslyDelete(context);
- if (configuration.isListChildren()) {
- result = listChildren(context);
- }
- updateExchangeWithResult(context, result);
- } else {
- if (log.isDebugEnabled()) {
- log.debug(format("Storing data to znode '%s', waiting for confirmation", context.node));
- }
-
- OperationResult result = synchronouslySetData(context);
- if (configuration.isListChildren()) {
- result = listChildren(context);
- }
- updateExchangeWithResult(context, result);
- }
- } else {
- if (isDelete) {
- asynchronouslyDeleteNode(connection, context);
- } else {
- asynchronouslySetDataOnNode(connection, context);
- }
- }
-
- }
-
- @Override
- protected void doStart() throws Exception {
- connection = zkm.getConnection();
- if (log.isTraceEnabled()) {
- log.trace(String.format("Starting zookeeper producer of '%s'", configuration.getPath()));
- }
- }
-
- @Override
- protected void doStop() throws Exception {
- super.doStop();
- if (log.isTraceEnabled()) {
- log.trace(String.format("Shutting down zookeeper producer of '%s'", configuration.getPath()));
- }
- zkm.shutdown();
- }
-
- private void asynchronouslyDeleteNode(ZooKeeper connection, ProductionContext context) {
- if (log.isDebugEnabled()) {
- log.debug(format("Deleting node '%s', not waiting for confirmation", context.node));
- }
- connection.delete(context.node, context.version, new AsyncDeleteCallback(), context);
-
- }
-
- private void asynchronouslySetDataOnNode(ZooKeeper connection, ProductionContext context) {
- if (log.isDebugEnabled()) {
- log.debug(format("Storing data to node '%s', not waiting for confirmation", context.node));
- }
- connection.setData(context.node, context.payload, context.version, new AsyncSetDataCallback(), context);
- }
-
- private void updateExchangeWithResult(ProductionContext context, OperationResult result) {
- ZooKeeperMessage out = new ZooKeeperMessage(context.node, result.getStatistics(), context.in.getHeaders());
- if (result.isOk()) {
- out.setBody(result.getResult());
- } else {
- context.exchange.setException(result.getException());
- }
-
- context.exchange.setOut(out);
- }
-
- private OperationResult listChildren(ProductionContext context) throws Exception {
- return new GetChildrenOperation(context.connection, configuration.getPath()).get();
- }
-
- /** Simple container to avoid passing all these around as parameters */
- private class ProductionContext {
- ZooKeeper connection;
- Exchange exchange;
- Message in;
- byte[] payload;
- int version;
- String node;
-
- public ProductionContext(ZooKeeper connection, Exchange exchange) {
- this.connection = connection;
- this.exchange = exchange;
- this.in = exchange.getIn();
- this.node = getNodeFromMessage(in, configuration.getPath());
- this.version = getVersionFromMessage(in);
- this.payload = getPayloadFromExchange(exchange);
- }
- }
-
- private class AsyncSetDataCallback implements StatCallback {
-
- public void processResult(int rc, String node, Object ctx, Stat statistics) {
- if (Code.NONODE.equals(Code.get(rc))) {
- if (configuration.isCreate()) {
- log.warn(format("Node '%s' did not exist, creating it...", node));
- ProductionContext context = (ProductionContext)ctx;
- OperationResult<String> result = null;
- try {
- result = createNode(context);
- } catch (Exception e) {
- log.error(format("Error trying to create node '%s'", node), e);
- }
-
- if (result == null || !result.isOk()) {
- log.error(format("Error creating node '%s'", node), result.getException());
- }
- }
- } else {
- logStoreComplete(node, statistics);
- }
- }
- }
-
- private class AsyncDeleteCallback implements VoidCallback {
- @Override
- public void processResult(int rc, String path, Object ctx) {
- if (log.isDebugEnabled()) {
- if (log.isTraceEnabled()) {
- log.trace(format("Removed data node '%s'", path));
- } else {
- log.debug(format("Removed data node '%s'", path));
- }
- }
- }
- }
-
- private OperationResult<String> createNode(ProductionContext ctx) throws Exception {
- CreateOperation create = new CreateOperation(ctx.connection, ctx.node);
- create.setPermissions(getAclListFromMessage(ctx.exchange.getIn()));
-
- CreateMode mode = null;
- String modeString = configuration.getCreateMode();
- if (modeString != null) {
- try {
- mode = getCreateModeFromString(modeString, CreateMode.EPHEMERAL);
- } catch (Exception e) { }
- } else {
- mode = getCreateMode(ctx.exchange.getIn(), CreateMode.EPHEMERAL);
- }
- create.setCreateMode(mode == null ? CreateMode.EPHEMERAL : mode);
- create.setData(ctx.payload);
- return create.get();
- }
-
- /**
- * Tries to set the data first and if a no node error is received then an
- * attempt will be made to create it instead.
- */
- private OperationResult synchronouslySetData(ProductionContext ctx) throws Exception {
-
- SetDataOperation setData = new SetDataOperation(ctx.connection, ctx.node, ctx.payload);
- setData.setVersion(ctx.version);
-
- OperationResult result = setData.get();
-
- if (!result.isOk() && configuration.isCreate() && result.failedDueTo(Code.NONODE)) {
- log.warn(format("Node '%s' did not exist, creating it.", ctx.node));
- result = createNode(ctx);
- }
- return result;
- }
-
- private OperationResult synchronouslyDelete(ProductionContext ctx) throws Exception {
- DeleteOperation setData = new DeleteOperation(ctx.connection, ctx.node);
- setData.setVersion(ctx.version);
-
- OperationResult result = setData.get();
-
- if (!result.isOk() && configuration.isCreate() && result.failedDueTo(Code.NONODE)) {
- log.warn(format("Node '%s' did not exist, creating it.", ctx.node));
- result = createNode(ctx);
- }
- return result;
- }
-
-
- private void logStoreComplete(String path, Stat statistics) {
- if (log.isDebugEnabled()) {
- if (log.isTraceEnabled()) {
- log.trace(format("Stored data to node '%s', and receive statistics %s", path, statistics));
- } else {
- log.debug(format("Stored data to node '%s'", path));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/9c076c3d/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperProducerTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperProducerTest.java
new file mode 100644
index 0000000..ced212f
--- /dev/null
+++ b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZooKeeperProducerTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.zookeeper;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.zookeeper.operations.GetChildrenOperation;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+
+import org.junit.Test;
+
+import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_CREATE_MODE;
+import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_NODE;
+import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_OPERATION;
+
+public class ZooKeeperProducerTest extends ZooKeeperTestSupport {
+
+ private String zookeeperUri;
+ private String testPayload = "TestPayload";
+
+ @Override
+ protected RouteBuilder[] createRouteBuilders() throws Exception {
+ return new RouteBuilder[] {new RouteBuilder() {
+ public void configure() throws Exception {
+ zookeeperUri = "zookeeper://localhost:" + getServerPort() + "/node?create=true";
+ from("direct:roundtrip").to(zookeeperUri).to("mock:producer-out");
+ from(zookeeperUri).to("mock:consumed-from-node");
+ }
+ }, new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:no-create-fails-set").to("zookeeper://localhost:" + getServerPort() + "/doesnotexist");
+ }
+ }, new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:node-from-header").to("zookeeper://localhost:" + getServerPort() + "/notset?create=true");
+ from("zookeeper://localhost:" + getServerPort() + "/set?create=true").to("mock:consumed-from-set-node");
+ }
+ }, new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:create-mode").to("zookeeper://localhost:" + getServerPort() + "/persistent?create=true&createMode=PERSISTENT").to("mock:create-mode");
+ }
+ }, new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:delete").to("zookeeper://localhost:39913/to-be-deleted").to("mock:delete");
+ }
+ }};
+ }
+
+ @Test
+ public void testRoundtripOfDataToAndFromZnode() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
+ MockEndpoint pipeline = getMockEndpoint("mock:producer-out");
+ mock.expectedMessageCount(1);
+ pipeline.expectedMessageCount(1);
+
+ Exchange e = createExchangeWithBody(testPayload);
+ e.setPattern(ExchangePattern.InOut);
+ template.send("direct:roundtrip", e);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testAsyncRoundtripOfDataToAndFromZnode() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
+ mock.expectedMessageCount(1);
+
+ Exchange e = createExchangeWithBody(testPayload);
+ template.send("direct:roundtrip", e);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void setUsingCreateModeFromHeader() throws Exception {
+ client.createPersistent("/modes-test", "parent for modes");
+ for (CreateMode mode : CreateMode.values()) {
+ Exchange exchange = createExchangeWithBody(testPayload);
+ exchange.getIn().setHeader(ZOOKEEPER_CREATE_MODE, mode);
+ exchange.getIn().setHeader(ZOOKEEPER_NODE, "/modes-test/" + mode);
+ exchange.setPattern(ExchangePattern.InOut);
+ template.send("direct:node-from-header", exchange);
+ }
+ GetChildrenOperation listing = new GetChildrenOperation(getConnection(), "/modes-test");
+ assertEquals(CreateMode.values().length, listing.get().getResult().size());
+ }
+
+ @Test
+ public void createWithOtherCreateMode() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:create-mode");
+ mock.expectedMessageCount(1);
+
+ Exchange e = createExchangeWithBody(testPayload);
+ e.setPattern(ExchangePattern.InOut);
+
+ template.send("direct:create-mode", e);
+
+ assertMockEndpointsSatisfied();
+
+ Stat s = mock.getReceivedExchanges().get(0).getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_STATISTICS, Stat.class);
+ assertEquals(s.getEphemeralOwner(), 0);
+ }
+
+ @Test
+ public void deleteNode() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:delete");
+ mock.expectedMessageCount(1);
+
+ client.createPersistent("/to-be-deleted", "to be deleted");
+ Exchange e = createExchangeWithBody(null);
+ e.setPattern(ExchangePattern.InOut);
+ e.getIn().setHeader(ZOOKEEPER_OPERATION, "DELETE");
+ template.send("direct:delete", e);
+
+ assertMockEndpointsSatisfied();
+
+ assertNull(client.getConnection().exists("/to-be-deleted", false));
+ }
+
+ @Test
+ public void setAndGetListing() throws Exception {
+ client.createPersistent("/set-listing", "parent for set and list test");
+
+ Exchange exchange = createExchangeWithBody(testPayload);
+ exchange.getIn().setHeader(ZOOKEEPER_NODE, "/set-listing/firstborn");
+ exchange.setPattern(ExchangePattern.InOut);
+ template.send("zookeeper://localhost:" + getServerPort() + "/set-listing?create=true&listChildren=true", exchange);
+ List<?> children = exchange.getOut().getMandatoryBody(List.class);
+ assertEquals(1, children.size());
+ assertEquals("firstborn", children.get(0));
+ }
+
+ @Test
+ public void testZookeeperMessage() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
+ mock.expectedMessageCount(1);
+
+ Exchange exchange = createExchangeWithBody(testPayload);
+ template.send("direct:roundtrip", exchange);
+
+ assertMockEndpointsSatisfied();
+
+ Message received = mock.getReceivedExchanges().get(0).getIn();
+ assertEquals("/node", ZooKeeperMessage.getPath(received));
+ assertNotNull(ZooKeeperMessage.getStatistics(received));
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/9c076c3d/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java b/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java
deleted file mode 100644
index 3dcb493..0000000
--- a/components/camel-zookeeper/src/test/java/org/apache/camel/component/zookeeper/ZookeeperProducerTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.zookeeper;
-
-import java.util.List;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Message;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.component.zookeeper.operations.GetChildrenOperation;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-
-import org.junit.Test;
-
-import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_CREATE_MODE;
-import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_NODE;
-import static org.apache.camel.component.zookeeper.ZooKeeperMessage.ZOOKEEPER_OPERATION;
-
-public class ZookeeperProducerTest extends ZooKeeperTestSupport {
-
- private String zookeeperUri;
- private String testPayload = "TestPayload";
-
- @Override
- protected RouteBuilder[] createRouteBuilders() throws Exception {
- return new RouteBuilder[] {new RouteBuilder() {
- public void configure() throws Exception {
- zookeeperUri = "zookeeper://localhost:" + getServerPort() + "/node?create=true";
- from("direct:roundtrip").to(zookeeperUri).to("mock:producer-out");
- from(zookeeperUri).to("mock:consumed-from-node");
- }
- }, new RouteBuilder() {
- public void configure() throws Exception {
- from("direct:no-create-fails-set").to("zookeeper://localhost:" + getServerPort() + "/doesnotexist");
- }
- }, new RouteBuilder() {
- public void configure() throws Exception {
- from("direct:node-from-header").to("zookeeper://localhost:" + getServerPort() + "/notset?create=true");
- from("zookeeper://localhost:" + getServerPort() + "/set?create=true").to("mock:consumed-from-set-node");
- }
- }, new RouteBuilder() {
- public void configure() throws Exception {
- from("direct:create-mode").to("zookeeper://localhost:" + getServerPort() + "/persistent?create=true&createMode=PERSISTENT").to("mock:create-mode");
- }
- }, new RouteBuilder() {
- public void configure() throws Exception {
- from("direct:delete").to("zookeeper://localhost:39913/to-be-deleted").to("mock:delete");
- }
- }};
- }
-
- @Test
- public void testRoundtripOfDataToAndFromZnode() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
- MockEndpoint pipeline = getMockEndpoint("mock:producer-out");
- mock.expectedMessageCount(1);
- pipeline.expectedMessageCount(1);
-
- Exchange e = createExchangeWithBody(testPayload);
- e.setPattern(ExchangePattern.InOut);
- template.send("direct:roundtrip", e);
-
- assertMockEndpointsSatisfied();
- }
-
- @Test
- public void testAsyncRoundtripOfDataToAndFromZnode() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
- mock.expectedMessageCount(1);
-
- Exchange e = createExchangeWithBody(testPayload);
- template.send("direct:roundtrip", e);
-
- assertMockEndpointsSatisfied();
- }
-
- @Test
- public void setUsingCreateModeFromHeader() throws Exception {
- client.createPersistent("/modes-test", "parent for modes");
- for (CreateMode mode : CreateMode.values()) {
- Exchange exchange = createExchangeWithBody(testPayload);
- exchange.getIn().setHeader(ZOOKEEPER_CREATE_MODE, mode);
- exchange.getIn().setHeader(ZOOKEEPER_NODE, "/modes-test/" + mode);
- exchange.setPattern(ExchangePattern.InOut);
- template.send("direct:node-from-header", exchange);
- }
- GetChildrenOperation listing = new GetChildrenOperation(getConnection(), "/modes-test");
- assertEquals(CreateMode.values().length, listing.get().getResult().size());
- }
-
- @Test
- public void createWithOtherCreateMode() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:create-mode");
- mock.expectedMessageCount(1);
-
- Exchange e = createExchangeWithBody(testPayload);
- e.setPattern(ExchangePattern.InOut);
-
- template.send("direct:create-mode", e);
-
- assertMockEndpointsSatisfied();
-
- Stat s = mock.getReceivedExchanges().get(0).getIn().getHeader(ZooKeeperMessage.ZOOKEEPER_STATISTICS, Stat.class);
- assertEquals(s.getEphemeralOwner(), 0);
- }
-
- @Test
- public void deleteNode() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:delete");
- mock.expectedMessageCount(1);
-
- client.createPersistent("/to-be-deleted", "to be deleted");
- Exchange e = createExchangeWithBody(null);
- e.setPattern(ExchangePattern.InOut);
- e.getIn().setHeader(ZOOKEEPER_OPERATION, "DELETE");
- template.send("direct:delete", e);
-
- assertMockEndpointsSatisfied();
-
- assertNull(client.getConnection().exists("/to-be-deleted", false));
- }
-
- @Test
- public void setAndGetListing() throws Exception {
- client.createPersistent("/set-listing", "parent for set and list test");
-
- Exchange exchange = createExchangeWithBody(testPayload);
- exchange.getIn().setHeader(ZOOKEEPER_NODE, "/set-listing/firstborn");
- exchange.setPattern(ExchangePattern.InOut);
- template.send("zookeeper://localhost:" + getServerPort() + "/set-listing?create=true&listChildren=true", exchange);
- List<?> children = exchange.getOut().getMandatoryBody(List.class);
- assertEquals(1, children.size());
- assertEquals("firstborn", children.get(0));
- }
-
- @Test
- public void testZookeeperMessage() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:consumed-from-node");
- mock.expectedMessageCount(1);
-
- Exchange exchange = createExchangeWithBody(testPayload);
- template.send("direct:roundtrip", exchange);
-
- assertMockEndpointsSatisfied();
-
- Message received = mock.getReceivedExchanges().get(0).getIn();
- assertEquals("/node", ZooKeeperMessage.getPath(received));
- assertNotNull(ZooKeeperMessage.getStatistics(received));
- }
-}