You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/01/31 10:59:01 UTC

(camel) branch main updated: CAMEL-20202: Read Azure BinaryData as stream (#12953)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e3331b3da1 CAMEL-20202: Read Azure BinaryData as stream (#12953)
5e3331b3da1 is described below

commit 5e3331b3da15f549660889f4a4998f3e0e688672
Author: Jono Morris <jo...@apache.org>
AuthorDate: Wed Jan 31 23:58:56 2024 +1300

    CAMEL-20202: Read Azure BinaryData as stream (#12953)
    
    * CAMEL-20202: Read Azure BinaryData as stream
    
    * CAMEL-20202: pass InputStream instead of String
    
    * CAMEL-20202: restore original code
    
    * CAMEL-20202: read data stream and add synchronization adapter
    
    * CAMEL-20202: remove IOException that is no longer thrown
---
 .../camel/component/azure/storage/queue/QueueConsumer.java | 13 ++++++++++++-
 .../storage/queue/integration/StorageQueueConsumerIT.java  | 14 +++++++++++---
 2 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java b/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
index 46980783b2f..c5f409890a1 100644
--- a/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
+++ b/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.azure.storage.queue;
 
+import java.io.InputStream;
 import java.time.Duration;
 import java.util.LinkedList;
 import java.util.List;
@@ -35,7 +36,9 @@ import org.apache.camel.component.azure.storage.queue.client.QueueClientWrapper;
 import org.apache.camel.component.azure.storage.queue.operations.QueueOperations;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -160,9 +163,17 @@ public class QueueConsumer extends ScheduledBatchPollingConsumer {
         final Message message = exchange.getIn();
 
         BinaryData data = messageItem.getBody();
-        message.setBody(data == null ? null : data.toString());
+        InputStream is = data == null ? null : data.toStream();
+        message.setBody(is);
         message.setHeaders(QueueExchangeHeaders.createQueueExchangeHeadersFromQueueMessageItem(messageItem).toMap());
 
+        exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() {
+            @Override
+            public void onDone(Exchange exchange) {
+                IOHelper.close(is);
+            }
+        });
+
         return exchange;
     }
 
diff --git a/components/camel-azure/camel-azure-storage-queue/src/test/java/org/apache/camel/component/azure/storage/queue/integration/StorageQueueConsumerIT.java b/components/camel-azure/camel-azure-storage-queue/src/test/java/org/apache/camel/component/azure/storage/queue/integration/StorageQueueConsumerIT.java
index 2b427ada6bc..f4da18dd7e3 100644
--- a/components/camel-azure/camel-azure-storage-queue/src/test/java/org/apache/camel/component/azure/storage/queue/integration/StorageQueueConsumerIT.java
+++ b/components/camel-azure/camel-azure-storage-queue/src/test/java/org/apache/camel/component/azure/storage/queue/integration/StorageQueueConsumerIT.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.azure.storage.queue.integration;
 
+import java.io.InputStream;
 import java.util.List;
 
 import org.apache.camel.EndpointInject;
@@ -26,6 +27,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 class StorageQueueConsumerIT extends StorageQueueBase {
 
@@ -52,9 +54,15 @@ class StorageQueueConsumerIT extends StorageQueueBase {
         result.message(2).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(2);
         result.expectedPropertyReceived(Exchange.BATCH_SIZE, 3);
 
-        assertEquals("test-message-1", exchanges.get(0).getMessage().getBody());
-        assertEquals("test-message-2", exchanges.get(1).getMessage().getBody());
-        assertEquals("test-message-3", exchanges.get(2).getMessage().getBody());
+        assertEquals("test-message-1", convertToString(exchanges.get(0)));
+        assertEquals("test-message-2", convertToString(exchanges.get(1)));
+        assertEquals("test-message-3", convertToString(exchanges.get(2)));
+    }
+
+    private String convertToString(Exchange exchange) {
+        InputStream is = exchange.getMessage().getBody(InputStream.class);
+        assertNotNull(is);
+        return exchange.getContext().getTypeConverter().convertTo(String.class, is);
     }
 
     @Override