You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by se...@apache.org on 2017/02/22 16:41:21 UTC

camel git commit: [CAMEL-10786] Prototyping QueueService producer/consumer, with no test tries so far due to the account unavailability

Repository: camel
Updated Branches:
  refs/heads/master 46f83c924 -> 9a5357e66


[CAMEL-10786] Prototyping QueueService producer/consumer, with no test tries so far due to the account unavailability


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9a5357e6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9a5357e6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9a5357e6

Branch: refs/heads/master
Commit: 9a5357e669a0fd466fd9c6c2463ccdc651daee03
Parents: 46f83c9
Author: Sergey Beryozkin <sb...@gmail.com>
Authored: Wed Feb 22 16:41:06 2017 +0000
Committer: Sergey Beryozkin <sb...@gmail.com>
Committed: Wed Feb 22 16:41:06 2017 +0000

----------------------------------------------------------------------
 .../azure/blob/BlobServiceConstants.java        |   1 +
 .../azure/blob/BlobServiceProducer.java         |  12 +-
 .../azure/blob/BlobServiceRequestOptions.java   |  14 +-
 .../component/azure/blob/BlobServiceUtil.java   |   3 +-
 .../common/AbstractServiceRequestOptions.java   |  31 ++++
 .../azure/queue/QueueServiceConfiguration.java  |  46 +++++-
 .../azure/queue/QueueServiceConstants.java      |  11 +-
 .../azure/queue/QueueServiceConsumer.java       |  12 +-
 .../azure/queue/QueueServiceOperations.java     |  11 +-
 .../azure/queue/QueueServiceProducer.java       | 145 +++++++++++++++++--
 .../azure/queue/QueueServiceRequestOptions.java |  33 +++++
 .../component/azure/queue/QueueServiceUtil.java |  30 ++++
 12 files changed, 312 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConstants.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConstants.java
index f8f644a..2fd1a99 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConstants.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConstants.java
@@ -22,6 +22,7 @@ public interface BlobServiceConstants {
     String BLOB_CLIENT = "AzureBlobClient";
     
     String SERVICE_URI_SEGMENT = ".blob.core.windows.net";
+    String BLOB_SERVICE_REQUEST_OPTIONS = "BlobServiceRequestOptions";
     String ACCESS_CONDITION = "BlobAccessCondition";
     String BLOB_REQUEST_OPTIONS = "BlobRequestOptions";
     String OPERATION_CONTEXT = "BlobOperationContext";

http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceProducer.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceProducer.java
index 0f01588..360b4aa 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceProducer.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceProducer.java
@@ -120,9 +120,15 @@ public class BlobServiceProducer extends DefaultProducer {
         LOG.trace("Getting the blob list from the container [{}] from exchange [{}]...", 
                   getConfiguration().getContainerName(), exchange);
         BlobServiceConfiguration cfg = getConfiguration();
-        @SuppressWarnings("unchecked")
-        EnumSet<BlobListingDetails> details = 
-            (EnumSet<BlobListingDetails>)exchange.getIn().getHeader(BlobServiceConstants.BLOB_LISTING_DETAILS); 
+        EnumSet<BlobListingDetails> details = null;
+        Object detailsObject = exchange.getIn().getHeader(BlobServiceConstants.BLOB_LISTING_DETAILS);
+        if (detailsObject instanceof EnumSet) {
+            @SuppressWarnings("unchecked")
+            EnumSet<BlobListingDetails> theDetails = (EnumSet<BlobListingDetails>)detailsObject;
+            details = theDetails;
+        } else if (detailsObject instanceof BlobListingDetails) {
+            details = EnumSet.of((BlobListingDetails)detailsObject);
+        }
         Iterable<ListBlobItem> items = 
             client.listBlobs(cfg.getBlobPrefix(), cfg.isUseFlatListing(), 
                              details, opts.getRequestOpts(), opts.getOpContext());

http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceRequestOptions.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceRequestOptions.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceRequestOptions.java
index 3d1342e..bbb9385 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceRequestOptions.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceRequestOptions.java
@@ -17,14 +17,13 @@
 package org.apache.camel.component.azure.blob;
 
 import com.microsoft.azure.storage.AccessCondition;
-import com.microsoft.azure.storage.OperationContext;
 import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import org.apache.camel.component.azure.common.AbstractServiceRequestOptions;
 
-public class BlobServiceRequestOptions {
+public class BlobServiceRequestOptions extends AbstractServiceRequestOptions {
     private AccessCondition accessCond;
     private BlobRequestOptions requestOpts;
-    private OperationContext opContext;
-
+    
     public AccessCondition getAccessCond() {
         return accessCond;
     }
@@ -41,11 +40,4 @@ public class BlobServiceRequestOptions {
         this.requestOpts = requestOpts;
     }
 
-    public OperationContext getOpContext() {
-        return opContext;
-    }
-
-    public void setOpContext(OperationContext opContext) {
-        this.opContext = opContext;
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceUtil.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceUtil.java
index dcf6dc3..0f89819 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceUtil.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceUtil.java
@@ -216,7 +216,8 @@ public final class BlobServiceUtil {
 
 
     public static BlobServiceRequestOptions getRequestOptions(Exchange exchange) {
-        BlobServiceRequestOptions opts = exchange.getIn().getBody(BlobServiceRequestOptions.class);
+        BlobServiceRequestOptions opts = exchange.getIn().getHeader(
+            BlobServiceConstants.BLOB_SERVICE_REQUEST_OPTIONS, BlobServiceRequestOptions.class);
         if (opts != null) {
             return opts;
         } else {

http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/common/AbstractServiceRequestOptions.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/common/AbstractServiceRequestOptions.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/common/AbstractServiceRequestOptions.java
new file mode 100644
index 0000000..db9b635
--- /dev/null
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/common/AbstractServiceRequestOptions.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.common;
+
+import com.microsoft.azure.storage.OperationContext;
+
+public abstract class AbstractServiceRequestOptions {
+    private OperationContext opContext;
+
+    public OperationContext getOpContext() {
+        return opContext;
+    }
+
+    public void setOpContext(OperationContext opContext) {
+        this.opContext = opContext;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConfiguration.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConfiguration.java
index d211d7e..e85cd2c 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConfiguration.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConfiguration.java
@@ -28,8 +28,17 @@ public class QueueServiceConfiguration extends AbstractConfiguration {
     @UriParam
     private CloudQueue azureQueueClient;
     
-    @UriParam(label = "producer", defaultValue = "getMessage")
-    private QueueServiceOperations operation = QueueServiceOperations.getMessage;
+    @UriParam(label = "producer", defaultValue = "listQueues")
+    private QueueServiceOperations operation = QueueServiceOperations.listQueues;
+    
+    @UriParam(label = "producer")
+    private int messageTimeToLive;
+    
+    @UriParam(label = "producer")
+    private int messageVisibilityDelay;
+    
+    @UriParam(label = "producer")
+    private String queuePrefix;
     
     public String getQueueName() {
         return queueName;
@@ -63,4 +72,37 @@ public class QueueServiceConfiguration extends AbstractConfiguration {
     public void setOperation(QueueServiceOperations operation) {
         this.operation = operation;
     }
+
+    public int getMessageTimeToLive() {
+        return messageTimeToLive;
+    }
+
+    /**
+     * Message Time To Live in seconds 
+     */
+    public void setMessageTimeToLive(int messageTimeToLive) {
+        this.messageTimeToLive = messageTimeToLive;
+    }
+
+    public int getMessageVisibilityDelay() {
+        return messageVisibilityDelay;
+    }
+
+    /**
+     * Message Visibility Delay in seconds 
+     */
+    public void setMessageVisibilityDelay(int messageVisibilityDelay) {
+        this.messageVisibilityDelay = messageVisibilityDelay;
+    }
+
+    public String getQueuePrefix() {
+        return queuePrefix;
+    }
+
+    /**
+     * Set a prefix which can be used for listing the queues 
+     */
+    public void setQueuePrefix(String queuePrefix) {
+        this.queuePrefix = queuePrefix;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConstants.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConstants.java
index 0e7a523..ba2bd0f 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConstants.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConstants.java
@@ -22,7 +22,12 @@ public interface QueueServiceConstants {
     String QUEUE_CLIENT = "AzureQueueClient";
     
     String SERVICE_URI_SEGMENT = ".queue.core.windows.net";
-    String ACCESS_CONDITION = "BlobAccessCondition";
-    String BLOB_REQUEST_OPTIONS = "BlobRequestOptions";
-    String OPERATION_CONTEXT = "BlobOperationContext";
+    String QUEUE_SERVICE_REQUEST_OPTIONS = "QueueServiceRequestOptions";
+    String QUEUE_REQUEST_OPTIONS = "QueueRequestOptions";
+    String OPERATION_CONTEXT = "QueueOperationContext";
+    String MESSAGE_UPDATE_FIELDS = "QueueMessageUpdateFields";
+    String QUEUE_LISTING_DETAILS = "QueueListingDetails";
+    
+    String QUEUE_CREATED = "QueueCreated";
+    
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java
index 9793a1c..a70ad46 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java
@@ -38,7 +38,8 @@ public class QueueServiceConsumer extends ScheduledPollConsumer {
     protected int poll() throws Exception {
         Exchange exchange = super.getEndpoint().createExchange();
         try {
-            getMessage(exchange);
+            LOG.trace("Retrieving a message");
+            retrieveMessage(exchange);
             super.getAsyncProcessor().process(exchange);
             return 1;
         } catch (StorageException ex) {
@@ -50,10 +51,11 @@ public class QueueServiceConsumer extends ScheduledPollConsumer {
         }
     }
     
-    private void getMessage(Exchange exchange) throws Exception {
-        LOG.trace("Getting the message from the queue [{}] from exchange [{}]...", 
-                  getConfiguration().getQueueName(), exchange);
-        throw new UnsupportedOperationException();
+    private void retrieveMessage(Exchange exchange) throws Exception {
+        //TODO: Support the batch processing if needed, given that it is possible
+        // to retrieve more than 1 message in one go, similarly to camel-aws/s3 consumer. 
+        QueueServiceUtil.retrieveMessage(exchange, getConfiguration());
+        
     }
         
     protected QueueServiceConfiguration getConfiguration() {

http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceOperations.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceOperations.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceOperations.java
index a1e0898..536b47e 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceOperations.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceOperations.java
@@ -17,6 +17,13 @@
 package org.apache.camel.component.azure.queue;
 
 public enum QueueServiceOperations {
-    getMessage,
-    putMessage    
+    listQueues,
+    createQueue,
+    deleteQueue,
+    addMessage,
+    retrieveMessage,
+    peekMessage,
+    updateMessage,
+    deleteMessage
+    
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceProducer.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceProducer.java
index 434ed2d..ff15369 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceProducer.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceProducer.java
@@ -16,9 +16,16 @@
  */
 package org.apache.camel.component.azure.queue;
 
+import java.util.EnumSet;
+
+import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import com.microsoft.azure.storage.queue.MessageUpdateFields;
+import com.microsoft.azure.storage.queue.QueueListingDetails;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.azure.blob.BlobServiceConstants;
+import org.apache.camel.component.azure.common.ExchangeUtil;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
@@ -40,14 +47,32 @@ public class QueueServiceProducer extends DefaultProducer {
     public void process(final Exchange exchange) throws Exception {
         QueueServiceOperations operation = determineOperation(exchange);
         if (ObjectHelper.isEmpty(operation)) {
-            operation = QueueServiceOperations.getMessage;
+            operation = QueueServiceOperations.listQueues;
         } else {
             switch (operation) {
-            case getMessage:
-                getMessage(exchange);
+            case retrieveMessage:
+                retrieveMessage(exchange);
+                break;
+            case peekMessage:
+                peekMessage(exchange);
+                break;    
+            case createQueue:
+                createQueue(exchange);
+                break;
+            case deleteQueue:
+                deleteQueue(exchange);
+                break;    
+            case addMessage:
+                addMessage(exchange);
+                break;
+            case updateMessage:
+                updateMessage(exchange);
                 break;
-            case putMessage:
-                putMessage(exchange);
+            case deleteMessage:
+                deleteMessage(exchange);
+                break;
+            case listQueues:
+                listQueues(exchange);
                 break;    
             default:
                 throw new IllegalArgumentException("Unsupported operation");
@@ -56,15 +81,115 @@ public class QueueServiceProducer extends DefaultProducer {
              
     }
     
-    private void getMessage(Exchange exchange) {
-        LOG.trace("Getting the message from the queue [{}] from exchange [{}]...", 
+    private void listQueues(Exchange exchange) throws Exception {
+        CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration());
+        QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange);
+        QueueListingDetails details = (QueueListingDetails)exchange.getIn().getHeader(QueueServiceConstants.QUEUE_LISTING_DETAILS);
+        if (details == null) {
+            details = QueueListingDetails.ALL;
+        }
+        Iterable<CloudQueue> list = client.getServiceClient().listQueues(
+            getConfiguration().getQueuePrefix(), details, 
+            opts.getRequestOpts(), opts.getOpContext());
+        ExchangeUtil.getMessageForResponse(exchange).setBody(list);
+    }
+    
+    private void createQueue(Exchange exchange) throws Exception {
+        CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration());
+        QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange);
+        doCreateQueue(client, opts, exchange);
+    }
+    
+    private void doCreateQueue(CloudQueue client, QueueServiceRequestOptions opts, Exchange exchange) throws Exception {
+        LOG.trace("Creating the queue [{}] from exchange [{}]...", 
                   getConfiguration().getQueueName(), exchange);
-        throw new UnsupportedOperationException();    
+        client.createIfNotExists(opts.getRequestOpts(), opts.getOpContext());
+        ExchangeUtil.getMessageForResponse(exchange)
+            .setHeader(QueueServiceConstants.QUEUE_CREATED, Boolean.TRUE);
     }
-    private void putMessage(Exchange exchange) {
+    
+    private void deleteQueue(Exchange exchange) throws Exception {
+        LOG.trace("Deleting the queue [{}] from exchange [{}]...", 
+                  getConfiguration().getQueueName(), exchange);
+        CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration());
+        QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange);
+        client.delete(opts.getRequestOpts(), opts.getOpContext());
+    }
+    
+    private void addMessage(Exchange exchange) throws Exception {
         LOG.trace("Putting the message into the queue [{}] from exchange [{}]...", 
                   getConfiguration().getQueueName(), exchange);
-        throw new UnsupportedOperationException();    
+        CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration());
+        QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange);
+        
+        Boolean queueCreated = exchange.getIn().getHeader(QueueServiceConstants.QUEUE_CREATED, 
+                                                          Boolean.class);
+        if (Boolean.TRUE != queueCreated) {
+            doCreateQueue(client, opts, exchange);
+        }
+        
+        CloudQueueMessage message = getCloudQueueMessage(exchange);
+        client.addMessage(message, 
+                          getConfiguration().getMessageTimeToLive(), 
+                          getConfiguration().getMessageVisibilityDelay(),
+                          opts.getRequestOpts(), opts.getOpContext());
+    }
+    private void updateMessage(Exchange exchange) throws Exception {
+        CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration());
+        QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange);
+        
+        CloudQueueMessage message = getCloudQueueMessage(exchange);
+        LOG.trace("Updating the message in the queue [{}] from exchange [{}]...", 
+                  getConfiguration().getQueueName(), exchange);
+        
+        EnumSet<MessageUpdateFields> fields = null;
+        Object fieldsObject = exchange.getIn().getHeader(QueueServiceConstants.MESSAGE_UPDATE_FIELDS);
+        if (fieldsObject instanceof EnumSet) {
+            @SuppressWarnings("unchecked")
+            EnumSet<MessageUpdateFields> theFields = (EnumSet<MessageUpdateFields>)fieldsObject;
+            fields = theFields;
+        } else if (fieldsObject instanceof MessageUpdateFields) {
+            fields = EnumSet.of((MessageUpdateFields)fieldsObject);
+        }
+        client.updateMessage(message, 
+                          getConfiguration().getMessageVisibilityDelay(),
+                          fields,
+                          opts.getRequestOpts(), opts.getOpContext());
+    }
+    
+    private void deleteMessage(Exchange exchange) throws Exception {
+        LOG.trace("Deleting the message from the queue [{}] from exchange [{}]...", 
+                  getConfiguration().getQueueName(), exchange);
+        CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration());
+        QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange);
+        CloudQueueMessage message = getCloudQueueMessage(exchange);
+        client.deleteMessage(message, opts.getRequestOpts(), opts.getOpContext());
+    }
+
+    private void retrieveMessage(Exchange exchange) throws Exception {
+        QueueServiceUtil.retrieveMessage(exchange, getConfiguration());
+    }
+    
+    private void peekMessage(Exchange exchange) throws Exception {
+        CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration());
+        QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange);  
+        CloudQueueMessage message = client.peekMessage(opts.getRequestOpts(), opts.getOpContext());
+        ExchangeUtil.getMessageForResponse(exchange).setBody(message);
+    }
+    
+    
+    private CloudQueueMessage getCloudQueueMessage(Exchange exchange) throws Exception {
+        Object body = exchange.getIn().getMandatoryBody();
+        CloudQueueMessage message = null;
+        if (body instanceof CloudQueueMessage) {
+            message = (CloudQueueMessage)body;
+        } else if (body instanceof String) {
+            message = new CloudQueueMessage((String)body);
+        }
+        if (message == null) {
+            throw new IllegalArgumentException("Unsupported queue message type:" + body.getClass().getName());
+        }
+        return message;
     }
 
     private QueueServiceOperations determineOperation(Exchange exchange) {

http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceRequestOptions.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceRequestOptions.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceRequestOptions.java
new file mode 100644
index 0000000..f10b83a
--- /dev/null
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceRequestOptions.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.queue;
+
+import com.microsoft.azure.storage.queue.QueueRequestOptions;
+import org.apache.camel.component.azure.common.AbstractServiceRequestOptions;
+
+public class QueueServiceRequestOptions extends AbstractServiceRequestOptions {
+    private QueueRequestOptions requestOpts;
+    
+    public QueueRequestOptions getRequestOpts() {
+        return requestOpts;
+    }
+
+    public void setRequestOpts(QueueRequestOptions requestOpts) {
+        this.requestOpts = requestOpts;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceUtil.java
----------------------------------------------------------------------
diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceUtil.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceUtil.java
index 17121b0..6f09418 100644
--- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceUtil.java
+++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceUtil.java
@@ -18,8 +18,13 @@ package org.apache.camel.component.azure.queue;
 
 import java.net.URI;
 
+import com.microsoft.azure.storage.OperationContext;
 import com.microsoft.azure.storage.StorageCredentials;
 import com.microsoft.azure.storage.queue.CloudQueue;
+import com.microsoft.azure.storage.queue.CloudQueueMessage;
+import com.microsoft.azure.storage.queue.QueueRequestOptions;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.azure.common.ExchangeUtil;
 
 public final class QueueServiceUtil {
     private QueueServiceUtil() { 
@@ -64,4 +69,29 @@ public final class QueueServiceUtil {
     public static StorageCredentials getAccountCredentials(QueueServiceConfiguration cfg) {
         return cfg.getCredentials();
     }
+    
+    public static void retrieveMessage(Exchange exchange, QueueServiceConfiguration cfg) throws Exception {
+        CloudQueue client = createQueueClient(cfg);
+        QueueServiceRequestOptions opts = getRequestOptions(exchange);  
+        CloudQueueMessage message = client.retrieveMessage(cfg.getMessageVisibilityDelay(),
+                               opts.getRequestOpts(), opts.getOpContext());
+        ExchangeUtil.getMessageForResponse(exchange).setBody(message);
+    }
+    
+    public static QueueServiceRequestOptions getRequestOptions(Exchange exchange) {
+        QueueServiceRequestOptions opts = exchange.getIn().getHeader(
+            QueueServiceConstants.QUEUE_SERVICE_REQUEST_OPTIONS, QueueServiceRequestOptions.class);
+        if (opts != null) {
+            return opts;
+        } else {
+            opts = new QueueServiceRequestOptions();
+        }
+        QueueRequestOptions requestOpts =
+            exchange.getIn().getHeader(QueueServiceConstants.QUEUE_REQUEST_OPTIONS, QueueRequestOptions.class);
+        OperationContext opContext =
+            exchange.getIn().getHeader(QueueServiceConstants.OPERATION_CONTEXT, OperationContext.class);
+        opts.setOpContext(opContext);
+        opts.setRequestOpts(requestOpts);
+        return opts;
+    }
 }