You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by se...@apache.org on 2017/02/22 16:41:21 UTC
camel git commit: [CAMEL-10786] Prototyping QueueService
producer/consumer, with no test tries so far due to the account unavailability
Repository: camel
Updated Branches:
refs/heads/master 46f83c924 -> 9a5357e66
[CAMEL-10786] Prototyping QueueService producer/consumer, with no test tries so far due to the account unavailability
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9a5357e6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9a5357e6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9a5357e6
Branch: refs/heads/master
Commit: 9a5357e669a0fd466fd9c6c2463ccdc651daee03
Parents: 46f83c9
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Wed Feb 22 16:41:06 2017 +0000
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Wed Feb 22 16:41:06 2017 +0000
----------------------------------------------------------------------
.../azure/blob/BlobServiceConstants.java | 1 +
.../azure/blob/BlobServiceProducer.java | 12 +-
.../azure/blob/BlobServiceRequestOptions.java | 14 +-
.../component/azure/blob/BlobServiceUtil.java | 3 +-
.../common/AbstractServiceRequestOptions.java | 31 ++++
.../azure/queue/QueueServiceConfiguration.java | 46 +++++-
.../azure/queue/QueueServiceConstants.java | 11 +-
.../azure/queue/QueueServiceConsumer.java | 12 +-
.../azure/queue/QueueServiceOperations.java | 11 +-
.../azure/queue/QueueServiceProducer.java | 145 +++++++++++++++++--
.../azure/queue/QueueServiceRequestOptions.java | 33 +++++
.../component/azure/queue/QueueServiceUtil.java | 30 ++++
12 files changed, 312 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConstants.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConstants.java
index f8f644a..2fd1a99 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConstants.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConstants.java
@@ -22,6 +22,7 @@ public interface BlobServiceConstants {
String BLOB_CLIENT = "AzureBlobClient";
String SERVICE_URI_SEGMENT = ".blob.core.windows.net";
+ String BLOB_SERVICE_REQUEST_OPTIONS = "BlobServiceRequestOptions";
String ACCESS_CONDITION = "BlobAccessCondition";
String BLOB_REQUEST_OPTIONS = "BlobRequestOptions";
String OPERATION_CONTEXT = "BlobOperationContext";
http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceProducer.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceProducer.java
index 0f01588..360b4aa 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceProducer.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceProducer.java
@@ -120,9 +120,15 @@ public class BlobServiceProducer extends DefaultProducer {
LOG.trace("Getting the blob list from the container [{}] from exchange [{}]...",
getConfiguration().getContainerName(), exchange);
BlobServiceConfiguration cfg = getConfiguration();
- @SuppressWarnings("unchecked")
- EnumSet<BlobListingDetails> details =
- (EnumSet<BlobListingDetails>)exchange.getIn().getHeader(BlobServiceConstants.BLOB_LISTING_DETAILS);
+ EnumSet<BlobListingDetails> details = null;
+ Object detailsObject = exchange.getIn().getHeader(BlobServiceConstants.BLOB_LISTING_DETAILS);
+ if (detailsObject instanceof EnumSet) {
+ @SuppressWarnings("unchecked")
+ EnumSet<BlobListingDetails> theDetails = (EnumSet<BlobListingDetails>)detailsObject;
+ details = theDetails;
+ } else if (detailsObject instanceof BlobListingDetails) {
+ details = EnumSet.of((BlobListingDetails)detailsObject);
+ }
Iterable<ListBlobItem> items =
client.listBlobs(cfg.getBlobPrefix(), cfg.isUseFlatListing(),
details, opts.getRequestOpts(), opts.getOpContext());
http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceRequestOptions.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceRequestOptions.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceRequestOptions.java
index 3d1342e..bbb9385 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceRequestOptions.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceRequestOptions.java
@@ -17,14 +17,13 @@
package org.apache.camel.component.azure.blob;
import com.microsoft.azure.storage.AccessCondition;
-import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import org.apache.camel.component.azure.common.AbstractServiceRequestOptions;
-public class BlobServiceRequestOptions {
+public class BlobServiceRequestOptions extends AbstractServiceRequestOptions {
private AccessCondition accessCond;
private BlobRequestOptions requestOpts;
- private OperationContext opContext;
-
+
public AccessCondition getAccessCond() {
return accessCond;
}
@@ -41,11 +40,4 @@ public class BlobServiceRequestOptions {
this.requestOpts = requestOpts;
}
- public OperationContext getOpContext() {
- return opContext;
- }
-
- public void setOpContext(OperationContext opContext) {
- this.opContext = opContext;
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceUtil.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceUtil.java
index dcf6dc3..0f89819 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceUtil.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceUtil.java
@@ -216,7 +216,8 @@ public final class BlobServiceUtil {
public static BlobServiceRequestOptions getRequestOptions(Exchange exchange) {
- BlobServiceRequestOptions opts = exchange.getIn().getBody(BlobServiceRequestOptions.class);
+ BlobServiceRequestOptions opts = exchange.getIn().getHeader(
+ BlobServiceConstants.BLOB_SERVICE_REQUEST_OPTIONS, BlobServiceRequestOptions.class);
if (opts != null) {
return opts;
} else {
http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/common/AbstractServiceRequestOptions.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/common/AbstractServiceRequestOptions.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/common/AbstractServiceRequestOptions.java
new file mode 100644
index 0000000..db9b635
--- /dev/null
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/common/AbstractServiceRequestOptions.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.common;
+
+import com.microsoft.azure.storage.OperationContext;
+
+public abstract class AbstractServiceRequestOptions {
+ private OperationContext opContext;
+
+ public OperationContext getOpContext() {
+ return opContext;
+ }
+
+ public void setOpContext(OperationContext opContext) {
+ this.opContext = opContext;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConfiguration.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConfiguration.java
index d211d7e..e85cd2c 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConfiguration.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConfiguration.java
@@ -28,8 +28,17 @@ public class QueueServiceConfiguration extends AbstractConfiguration {
@UriParam
private CloudQueue azureQueueClient;
- @UriParam(label = "producer", defaultValue = "getMessage")
- private QueueServiceOperations operation = QueueServiceOperations.getMessage;
+ @UriParam(label = "producer", defaultValue = "listQueues")
+ private QueueServiceOperations operation = QueueServiceOperations.listQueues;
+
+ @UriParam(label = "producer")
+ private int messageTimeToLive;
+
+ @UriParam(label = "producer")
+ private int messageVisibilityDelay;
+
+ @UriParam(label = "producer")
+ private String queuePrefix;
public String getQueueName() {
return queueName;
@@ -63,4 +72,37 @@ public class QueueServiceConfiguration extends AbstractConfiguration {
public void setOperation(QueueServiceOperations operation) {
this.operation = operation;
}
+
+ public int getMessageTimeToLive() {
+ return messageTimeToLive;
+ }
+
+ /**
+ * Message Time To Live in seconds
+ */
+ public void setMessageTimeToLive(int messageTimeToLive) {
+ this.messageTimeToLive = messageTimeToLive;
+ }
+
+ public int getMessageVisibilityDelay() {
+ return messageVisibilityDelay;
+ }
+
+ /**
+ * Message Visibility Delay in seconds
+ */
+ public void setMessageVisibilityDelay(int messageVisibilityDelay) {
+ this.messageVisibilityDelay = messageVisibilityDelay;
+ }
+
+ public String getQueuePrefix() {
+ return queuePrefix;
+ }
+
+ /**
+ * Set a prefix which can be used for listing the queues
+ */
+ public void setQueuePrefix(String queuePrefix) {
+ this.queuePrefix = queuePrefix;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConstants.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConstants.java
index 0e7a523..ba2bd0f 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConstants.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConstants.java
@@ -22,7 +22,12 @@ public interface QueueServiceConstants {
String QUEUE_CLIENT = "AzureQueueClient";
String SERVICE_URI_SEGMENT = ".queue.core.windows.net";
- String ACCESS_CONDITION = "BlobAccessCondition";
- String BLOB_REQUEST_OPTIONS = "BlobRequestOptions";
- String OPERATION_CONTEXT = "BlobOperationContext";
+ String QUEUE_SERVICE_REQUEST_OPTIONS = "QueueServiceRequestOptions";
+ String QUEUE_REQUEST_OPTIONS = "QueueRequestOptions";
+ String OPERATION_CONTEXT = "QueueOperationContext";
+ String MESSAGE_UPDATE_FIELDS = "QueueMessageUpdateFields";
+ String QUEUE_LISTING_DETAILS = "QueueListingDetails";
+
+ String QUEUE_CREATED = "QueueCreated";
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java
index 9793a1c..a70ad46 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java
@@ -38,7 +38,8 @@ public class QueueServiceConsumer extends ScheduledPollConsumer {
protected int poll() throws Exception {
Exchange exchange = super.getEndpoint().createExchange();
try {
- getMessage(exchange);
+ LOG.trace("Retrieving a message");
+ retrieveMessage(exchange);
super.getAsyncProcessor().process(exchange);
return 1;
} catch (StorageException ex) {
@@ -50,10 +51,11 @@ public class QueueServiceConsumer extends ScheduledPollConsumer {
}
}
- private void getMessage(Exchange exchange) throws Exception {
- LOG.trace("Getting the message from the queue [{}] from exchange [{}]...",
- getConfiguration().getQueueName(), exchange);
- throw new UnsupportedOperationException();
+ private void retrieveMessage(Exchange exchange) throws Exception {
+ //TODO: Support the batch processing if needed, given that it is possible
+ // to retrieve more than 1 message in one go, similarly to camel-aws/s3 consumer.
+ QueueServiceUtil.retrieveMessage(exchange, getConfiguration());
+
}
protected QueueServiceConfiguration getConfiguration() {
http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceOperations.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceOperations.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceOperations.java
index a1e0898..536b47e 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceOperations.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceOperations.java
@@ -17,6 +17,13 @@
package org.apache.camel.component.azure.queue;
public enum QueueServiceOperations {
- getMessage,
- putMessage
+ listQueues,
+ createQueue,
+ deleteQueue,
+ addMessage,
+ retrieveMessage,
+ peekMessage,
+ updateMessage,
+ deleteMessage
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceProducer.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceProducer.java
index 434ed2d..ff15369 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceProducer.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceProducer.java
@@ -16,9 +16,16 @@
*/
package org.apache.camel.component.azure.queue;
+import java.util.EnumSet;
+
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import com.microsoft.azure.storage.queue.MessageUpdateFields;
+import com.microsoft.azure.storage.queue.QueueListingDetails;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.component.azure.blob.BlobServiceConstants;
+import org.apache.camel.component.azure.common.ExchangeUtil;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
@@ -40,14 +47,32 @@ public class QueueServiceProducer extends DefaultProducer {
public void process(final Exchange exchange) throws Exception {
QueueServiceOperations operation = determineOperation(exchange);
if (ObjectHelper.isEmpty(operation)) {
- operation = QueueServiceOperations.getMessage;
+ operation = QueueServiceOperations.listQueues;
} else {
switch (operation) {
- case getMessage:
- getMessage(exchange);
+ case retrieveMessage:
+ retrieveMessage(exchange);
+ break;
+ case peekMessage:
+ peekMessage(exchange);
+ break;
+ case createQueue:
+ createQueue(exchange);
+ break;
+ case deleteQueue:
+ deleteQueue(exchange);
+ break;
+ case addMessage:
+ addMessage(exchange);
+ break;
+ case updateMessage:
+ updateMessage(exchange);
break;
- case putMessage:
- putMessage(exchange);
+ case deleteMessage:
+ deleteMessage(exchange);
+ break;
+ case listQueues:
+ listQueues(exchange);
break;
default:
throw new IllegalArgumentException("Unsupported operation");
@@ -56,15 +81,115 @@ public class QueueServiceProducer extends DefaultProducer {
}
- private void getMessage(Exchange exchange) {
- LOG.trace("Getting the message from the queue [{}] from exchange [{}]...",
+ private void listQueues(Exchange exchange) throws Exception {
+ CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration());
+ QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange);
+ QueueListingDetails details = (QueueListingDetails)exchange.getIn().getHeader(QueueServiceConstants.QUEUE_LISTING_DETAILS);
+ if (details == null) {
+ details = QueueListingDetails.ALL;
+ }
+ Iterable<CloudQueue> list = client.getServiceClient().listQueues(
+ getConfiguration().getQueuePrefix(), details,
+ opts.getRequestOpts(), opts.getOpContext());
+ ExchangeUtil.getMessageForResponse(exchange).setBody(list);
+ }
+
+ private void createQueue(Exchange exchange) throws Exception {
+ CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration());
+ QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange);
+ doCreateQueue(client, opts, exchange);
+ }
+
+ private void doCreateQueue(CloudQueue client, QueueServiceRequestOptions opts, Exchange exchange) throws Exception {
+ LOG.trace("Creating the queue [{}] from exchange [{}]...",
getConfiguration().getQueueName(), exchange);
- throw new UnsupportedOperationException();
+ client.createIfNotExists(opts.getRequestOpts(), opts.getOpContext());
+ ExchangeUtil.getMessageForResponse(exchange)
+ .setHeader(QueueServiceConstants.QUEUE_CREATED, Boolean.TRUE);
}
- private void putMessage(Exchange exchange) {
+
+ private void deleteQueue(Exchange exchange) throws Exception {
+ LOG.trace("Deleting the queue [{}] from exchange [{}]...",
+ getConfiguration().getQueueName(), exchange);
+ CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration());
+ QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange);
+ client.delete(opts.getRequestOpts(), opts.getOpContext());
+ }
+
+ private void addMessage(Exchange exchange) throws Exception {
LOG.trace("Putting the message into the queue [{}] from exchange [{}]...",
getConfiguration().getQueueName(), exchange);
- throw new UnsupportedOperationException();
+ CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration());
+ QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange);
+
+ Boolean queueCreated = exchange.getIn().getHeader(QueueServiceConstants.QUEUE_CREATED,
+ Boolean.class);
+ if (Boolean.TRUE != queueCreated) {
+ doCreateQueue(client, opts, exchange);
+ }
+
+ CloudQueueMessage message = getCloudQueueMessage(exchange);
+ client.addMessage(message,
+ getConfiguration().getMessageTimeToLive(),
+ getConfiguration().getMessageVisibilityDelay(),
+ opts.getRequestOpts(), opts.getOpContext());
+ }
+ private void updateMessage(Exchange exchange) throws Exception {
+ CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration());
+ QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange);
+
+ CloudQueueMessage message = getCloudQueueMessage(exchange);
+ LOG.trace("Updating the message in the queue [{}] from exchange [{}]...",
+ getConfiguration().getQueueName(), exchange);
+
+ EnumSet<MessageUpdateFields> fields = null;
+ Object fieldsObject = exchange.getIn().getHeader(QueueServiceConstants.MESSAGE_UPDATE_FIELDS);
+ if (fieldsObject instanceof EnumSet) {
+ @SuppressWarnings("unchecked")
+ EnumSet<MessageUpdateFields> theFields = (EnumSet<MessageUpdateFields>)fieldsObject;
+ fields = theFields;
+ } else if (fieldsObject instanceof MessageUpdateFields) {
+ fields = EnumSet.of((MessageUpdateFields)fieldsObject);
+ }
+ client.updateMessage(message,
+ getConfiguration().getMessageVisibilityDelay(),
+ fields,
+ opts.getRequestOpts(), opts.getOpContext());
+ }
+
+ private void deleteMessage(Exchange exchange) throws Exception {
+ LOG.trace("Deleting the message from the queue [{}] from exchange [{}]...",
+ getConfiguration().getQueueName(), exchange);
+ CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration());
+ QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange);
+ CloudQueueMessage message = getCloudQueueMessage(exchange);
+ client.deleteMessage(message, opts.getRequestOpts(), opts.getOpContext());
+ }
+
+ private void retrieveMessage(Exchange exchange) throws Exception {
+ QueueServiceUtil.retrieveMessage(exchange, getConfiguration());
+ }
+
+ private void peekMessage(Exchange exchange) throws Exception {
+ CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration());
+ QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange);
+ CloudQueueMessage message = client.peekMessage(opts.getRequestOpts(), opts.getOpContext());
+ ExchangeUtil.getMessageForResponse(exchange).setBody(message);
+ }
+
+
+ private CloudQueueMessage getCloudQueueMessage(Exchange exchange) throws Exception {
+ Object body = exchange.getIn().getMandatoryBody();
+ CloudQueueMessage message = null;
+ if (body instanceof CloudQueueMessage) {
+ message = (CloudQueueMessage)body;
+ } else if (body instanceof String) {
+ message = new CloudQueueMessage((String)body);
+ }
+ if (message == null) {
+ throw new IllegalArgumentException("Unsupported queue message type:" + body.getClass().getName());
+ }
+ return message;
}
private QueueServiceOperations determineOperation(Exchange exchange) {
http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceRequestOptions.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceRequestOptions.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceRequestOptions.java
new file mode 100644
index 0000000..f10b83a
--- /dev/null
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceRequestOptions.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.queue;
+
+import com.microsoft.azure.storage.queue.QueueRequestOptions;
+import org.apache.camel.component.azure.common.AbstractServiceRequestOptions;
+
+public class QueueServiceRequestOptions extends AbstractServiceRequestOptions {
+ private QueueRequestOptions requestOpts;
+
+ public QueueRequestOptions getRequestOpts() {
+ return requestOpts;
+ }
+
+ public void setRequestOpts(QueueRequestOptions requestOpts) {
+ this.requestOpts = requestOpts;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceUtil.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceUtil.java
index 17121b0..6f09418 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceUtil.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceUtil.java
@@ -18,8 +18,13 @@ package org.apache.camel.component.azure.queue;
import java.net.URI;
+import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import com.microsoft.azure.storage.queue.QueueRequestOptions;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.azure.common.ExchangeUtil;
public final class QueueServiceUtil {
private QueueServiceUtil() {
@@ -64,4 +69,29 @@ public final class QueueServiceUtil {
public static StorageCredentials getAccountCredentials(QueueServiceConfiguration cfg) {
return cfg.getCredentials();
}
+
+ public static void retrieveMessage(Exchange exchange, QueueServiceConfiguration cfg) throws Exception {
+ CloudQueue client = createQueueClient(cfg);
+ QueueServiceRequestOptions opts = getRequestOptions(exchange);
+ CloudQueueMessage message = client.retrieveMessage(cfg.getMessageVisibilityDelay(),
+ opts.getRequestOpts(), opts.getOpContext());
+ ExchangeUtil.getMessageForResponse(exchange).setBody(message);
+ }
+
+ public static QueueServiceRequestOptions getRequestOptions(Exchange exchange) {
+ QueueServiceRequestOptions opts = exchange.getIn().getHeader(
+ QueueServiceConstants.QUEUE_SERVICE_REQUEST_OPTIONS, QueueServiceRequestOptions.class);
+ if (opts != null) {
+ return opts;
+ } else {
+ opts = new QueueServiceRequestOptions();
+ }
+ QueueRequestOptions requestOpts =
+ exchange.getIn().getHeader(QueueServiceConstants.QUEUE_REQUEST_OPTIONS, QueueRequestOptions.class);
+ OperationContext opContext =
+ exchange.getIn().getHeader(QueueServiceConstants.OPERATION_CONTEXT, OperationContext.class);
+ opts.setOpContext(opContext);
+ opts.setRequestOpts(requestOpts);
+ return opts;
+ }
}