You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/12/12 16:59:05 UTC

[camel] 01/02: [Camel-18808] add support for CamelAzureStorageBlobBlobUploadSize in uploadBlockBlob operation (#8881)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b57a75d0ee349017442cc3e9fe7f95ed9a591bf7
Author: Jussi Wallin <ju...@beanbakers.fi>
AuthorDate: Mon Dec 12 18:41:59 2022 +0200

    [Camel-18808] add support for CamelAzureStorageBlobBlobUploadSize in uploadBlockBlob operation (#8881)
    
    * CAMEL-18808: add BLOB_SIZE support to uploadBlockBlob operation
    
    This commit adds support for the CamelAzureStorageBlobBlobSize-header in the producers uploadBlockBlob -operation.
    
    This is necessary to allow a user to upload blobs from streams that do not support mark and reset, e.g. a FileInputStream.
    
    * CAMEL-18808: separate the consumer and producer blob size headers
    
    Separate the two headers to mitigate a propagation of the header from a consumer to a subsequent producer.
    
    Also remove the header at the end of the uploadBlockBlob -operation to avoid affecting any following uploads.
    
    * CAMEL_18808: fix formatting
    
    * CAMEL-18808: move header removal to more intuitive spot
---
 .../azure/storage/blob/azure-storage-blob.json     |  1 +
 .../azure/storage/blob/BlobConstants.java          |  7 ++++-
 .../azure/storage/blob/BlobStreamAndLength.java    |  7 +++--
 .../storage/blob/client/BlobClientWrapper.java     | 10 +++++--
 .../storage/blob/integration/BlobOperationsIT.java | 33 ++++++++++++++++++++++
 5 files changed, 53 insertions(+), 5 deletions(-)

diff --git a/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json b/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json
index 6d296819229..9ebf681a396 100644
--- a/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json
+++ b/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json
@@ -68,6 +68,7 @@
     "CamelAzureStorageBlobContentLanguage": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Content language specified for the blob.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#CONTENT_LANGUAGE" },
     "CamelAzureStorageBlobCacheControl": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Cache control specified for the blob.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#CACHE_CONTROL" },
     "CamelAzureStorageBlobBlobSize": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The size of the blob.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#BLOB_SIZE" },
+    "CamelAzureStorageBlobBlobUploadSize": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "When uploading a blob with the uploadBlockBlob-operation this can be used to tell the client what the length of an InputStream is.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#BLOB_UPLOAD_SIZE" },
     "CamelAzureStorageBlobSequenceNumber": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "(producer) (createPageBlob) A user-controlled value that you can use to track requests. The value of the sequence number must be between 0 and 263 - 1. The default value is 0. (consumer) The current sequence number for a page blob.", "constan [...]
     "CamelAzureStorageBlobBlobType": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.azure.storage.blob.BlobType", "enum": [ "blockblob", "appendblob", "pageblob" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The type of the blob.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#BLOB_TYPE" },
     "CamelAzureStorageBlobLeaseStatus": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "com.azure.storage.blob.models.LeaseStatusType", "enum": [ "locked", "unlocked" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Status of the lease on the blob.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#LEASE_STATUS" },
diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java
index ec068d8bd35..282b077232f 100644
--- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java
+++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java
@@ -57,8 +57,13 @@ public final class BlobConstants {
     public static final String CONTENT_LANGUAGE = HEADER_PREFIX + "ContentLanguage";
     @Metadata(label = "consumer", description = "Cache control specified for the blob.", javaType = "String")
     public static final String CACHE_CONTROL = HEADER_PREFIX + "CacheControl";
-    @Metadata(label = "consumer", description = "The size of the blob.", javaType = "long")
+    @Metadata(label = "consumer", description = "The size of the blob.",
+              javaType = "long")
     public static final String BLOB_SIZE = HEADER_PREFIX + "BlobSize";
+    @Metadata(label = "producer",
+              description = "When uploading a blob with the uploadBlockBlob-operation this can be used to tell the client what the length of an InputStream is.",
+              javaType = "long")
+    public static final String BLOB_UPLOAD_SIZE = HEADER_PREFIX + "BlobUploadSize";
     @Metadata(description = "(producer) (createPageBlob) A user-controlled value that you can use to track requests. " +
                             "The value of the sequence number must be between 0 and 2^63 - 1. The default value is 0.\n" +
                             "(consumer) The current sequence number for a page blob.",
diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobStreamAndLength.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobStreamAndLength.java
index a38a6040799..34bd2e3d688 100644
--- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobStreamAndLength.java
+++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobStreamAndLength.java
@@ -40,6 +40,8 @@ public final class BlobStreamAndLength {
     @SuppressWarnings("rawtypes")
     public static BlobStreamAndLength createBlobStreamAndLengthFromExchangeBody(final Exchange exchange) throws IOException {
         Object body = exchange.getIn().getBody();
+        Long blobSize = exchange.getIn().getHeader(BlobConstants.BLOB_UPLOAD_SIZE, () -> null, Long.class);
+        exchange.getIn().removeHeader(BlobConstants.BLOB_UPLOAD_SIZE); // remove to avoid issues for further uploads
 
         if (body instanceof WrappedFile) {
             // unwrap file
@@ -47,7 +49,8 @@ public final class BlobStreamAndLength {
         }
 
         if (body instanceof InputStream) {
-            return new BlobStreamAndLength((InputStream) body, BlobUtils.getInputStreamLength((InputStream) body));
+            return new BlobStreamAndLength(
+                    (InputStream) body, blobSize != null ? blobSize : BlobUtils.getInputStreamLength((InputStream) body));
         }
         if (body instanceof File) {
             return new BlobStreamAndLength(new BufferedInputStream(new FileInputStream((File) body)), ((File) body).length());
@@ -65,7 +68,7 @@ public final class BlobStreamAndLength {
             throw new IllegalArgumentException("Unsupported blob type:" + body.getClass().getName());
         }
 
-        return new BlobStreamAndLength(inputStream, BlobUtils.getInputStreamLength(inputStream));
+        return new BlobStreamAndLength(inputStream, blobSize != null ? blobSize : BlobUtils.getInputStreamLength(inputStream));
     }
 
     public InputStream getInputStream() {
diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/client/BlobClientWrapper.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/client/BlobClientWrapper.java
index b99cff0bd14..f020b542fdf 100644
--- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/client/BlobClientWrapper.java
+++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/client/BlobClientWrapper.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.azure.storage.blob.client;
 
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.time.OffsetDateTime;
 import java.util.HashMap;
@@ -52,6 +53,7 @@ import com.azure.storage.blob.models.PageBlobRequestConditions;
 import com.azure.storage.blob.models.PageRange;
 import com.azure.storage.blob.models.PageRangeItem;
 import com.azure.storage.blob.models.ParallelTransferOptions;
+import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions;
 import com.azure.storage.blob.options.ListPageRangesOptions;
 import com.azure.storage.blob.sas.BlobSasPermission;
 import com.azure.storage.blob.sas.BlobServiceSasSignatureValues;
@@ -60,7 +62,9 @@ import com.azure.storage.blob.specialized.BlobInputStream;
 import com.azure.storage.blob.specialized.BlockBlobClient;
 import com.azure.storage.blob.specialized.PageBlobClient;
 import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.common.Utility;
 import org.apache.camel.util.ObjectHelper;
+import reactor.core.publisher.Flux;
 
 public class BlobClientWrapper {
     private static final String SERVICE_URI_SEGMENT = ".blob.core.windows.net";
@@ -120,8 +124,10 @@ public class BlobClientWrapper {
             final Map<String, String> metadata, AccessTier tier, final byte[] contentMd5,
             final BlobRequestConditions requestConditions,
             final Duration timeout) {
-        return getBlockBlobClient().uploadWithResponse(data, length, headers, metadata, tier, contentMd5, requestConditions,
-                timeout, Context.NONE);
+        Flux<ByteBuffer> dataBuffer = Utility.convertStreamToByteBuffer(data, length, 4194304, false);
+        BlockBlobSimpleUploadOptions uploadOptions = new BlockBlobSimpleUploadOptions(dataBuffer, length).setHeaders(headers)
+                .setMetadata(metadata).setTier(tier).setContentMd5(contentMd5).setRequestConditions(requestConditions);
+        return getBlockBlobClient().uploadWithResponse(uploadOptions, timeout, Context.NONE);
     }
 
     public HttpHeaders stageBlockBlob(
diff --git a/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/integration/BlobOperationsIT.java b/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/integration/BlobOperationsIT.java
index 88d5d9a99f7..6fe837e8f86 100644
--- a/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/integration/BlobOperationsIT.java
+++ b/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/integration/BlobOperationsIT.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.azure.storage.blob.integration;
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -59,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class BlobOperationsIT extends Base {
@@ -229,6 +231,37 @@ class BlobOperationsIT extends Base {
         blobClientWrapper.delete(null, null, null);
     }
 
+    @Test
+    void testUploadBlockBlobAsStreamWithBlobSizeHeader() throws Exception {
+        final BlobClientWrapper blobClientWrapper = blobContainerClientWrapper.getBlobClientWrapper("upload_test_file");
+        final BlobOperations operations = new BlobOperations(configuration, blobClientWrapper);
+
+        final File fileToUpload
+                = new File(Objects.requireNonNull(getClass().getClassLoader().getResource("upload_test_file")).getFile());
+        final Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setBody(new FileInputStream(fileToUpload));
+        exchange.getIn().setHeader(BlobConstants.BLOB_UPLOAD_SIZE, fileToUpload.length());
+
+        final BlobOperationResponse response = operations.uploadBlockBlob(exchange);
+
+        assertNotNull(response);
+        assertTrue((boolean) response.getBody());
+        // check for eTag and md5 to make sure is uploaded
+        assertNotNull(response.getHeaders().get(BlobConstants.E_TAG));
+        assertNotNull(response.getHeaders().get(BlobConstants.CONTENT_MD5));
+
+        // check that the size header got removed
+        assertNull(exchange.getIn().getHeader(BlobConstants.BLOB_UPLOAD_SIZE));
+
+        // check content
+        final BlobOperationResponse getBlobResponse = operations.getBlob(null);
+
+        assertEquals("awesome camel to upload!",
+                IOUtils.toString((InputStream) getBlobResponse.getBody(), Charset.defaultCharset()));
+
+        blobClientWrapper.delete(null, null, null);
+    }
+
     @Test
     void testCommitAndStageBlockBlob() throws Exception {
         final BlobClientWrapper blobClientWrapper = blobContainerClientWrapper.getBlobClientWrapper("upload_test_file");