You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/12/12 16:59:05 UTC
[camel] 01/02: [Camel-18808] add support for CamelAzureStorageBlobBlobUploadSize in uploadBlockBlob operation (#8881)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
commit b57a75d0ee349017442cc3e9fe7f95ed9a591bf7
Author: Jussi Wallin <ju...@beanbakers.fi>
AuthorDate: Mon Dec 12 18:41:59 2022 +0200
[Camel-18808] add support for CamelAzureStorageBlobBlobUploadSize in uploadBlockBlob operation (#8881)
* CAMEL-18808: add BLOB_SIZE support to uploadBlockBlob operation
This commit adds support for the CamelAzureStorageBlobBlobSize-header in the producers uploadBlockBlob -operation.
This is necessary to allow a user to upload blobs from streams that do not support mark and reset, e.g. a FileInputStream.
* CAMEL-18808: separate the consumer and producer blob size headers
Separate the two headers to mitigate a propagation of the header from a consumer to a subsequent producer.
Also remove the header at the end of the uploadBlockBlob -operation to avoid affecting any following uploads.
* CAMEL_18808: fix formatting
* CAMEL-18808: move header removal to more intuitive spot
---
.../azure/storage/blob/azure-storage-blob.json | 1 +
.../azure/storage/blob/BlobConstants.java | 7 ++++-
.../azure/storage/blob/BlobStreamAndLength.java | 7 +++--
.../storage/blob/client/BlobClientWrapper.java | 10 +++++--
.../storage/blob/integration/BlobOperationsIT.java | 33 ++++++++++++++++++++++
5 files changed, 53 insertions(+), 5 deletions(-)
diff --git a/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json b/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json
index 6d296819229..9ebf681a396 100644
--- a/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json
+++ b/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json
@@ -68,6 +68,7 @@
"CamelAzureStorageBlobContentLanguage": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Content language specified for the blob.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#CONTENT_LANGUAGE" },
"CamelAzureStorageBlobCacheControl": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Cache control specified for the blob.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#CACHE_CONTROL" },
"CamelAzureStorageBlobBlobSize": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The size of the blob.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#BLOB_SIZE" },
+ "CamelAzureStorageBlobBlobUploadSize": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "When uploading a blob with the uploadBlockBlob-operation this can be used to tell the client what the length of an InputStream is.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#BLOB_UPLOAD_SIZE" },
"CamelAzureStorageBlobSequenceNumber": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "(producer) (createPageBlob) A user-controlled value that you can use to track requests. The value of the sequence number must be between 0 and 263 - 1. The default value is 0. (consumer) The current sequence number for a page blob.", "constan [...]
"CamelAzureStorageBlobBlobType": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.azure.storage.blob.BlobType", "enum": [ "blockblob", "appendblob", "pageblob" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The type of the blob.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#BLOB_TYPE" },
"CamelAzureStorageBlobLeaseStatus": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "com.azure.storage.blob.models.LeaseStatusType", "enum": [ "locked", "unlocked" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Status of the lease on the blob.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#LEASE_STATUS" },
diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java
index ec068d8bd35..282b077232f 100644
--- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java
+++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java
@@ -57,8 +57,13 @@ public final class BlobConstants {
public static final String CONTENT_LANGUAGE = HEADER_PREFIX + "ContentLanguage";
@Metadata(label = "consumer", description = "Cache control specified for the blob.", javaType = "String")
public static final String CACHE_CONTROL = HEADER_PREFIX + "CacheControl";
- @Metadata(label = "consumer", description = "The size of the blob.", javaType = "long")
+ @Metadata(label = "consumer", description = "The size of the blob.",
+ javaType = "long")
public static final String BLOB_SIZE = HEADER_PREFIX + "BlobSize";
+ @Metadata(label = "producer",
+ description = "When uploading a blob with the uploadBlockBlob-operation this can be used to tell the client what the length of an InputStream is.",
+ javaType = "long")
+ public static final String BLOB_UPLOAD_SIZE = HEADER_PREFIX + "BlobUploadSize";
@Metadata(description = "(producer) (createPageBlob) A user-controlled value that you can use to track requests. " +
"The value of the sequence number must be between 0 and 2^63 - 1. The default value is 0.\n" +
"(consumer) The current sequence number for a page blob.",
diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobStreamAndLength.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobStreamAndLength.java
index a38a6040799..34bd2e3d688 100644
--- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobStreamAndLength.java
+++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobStreamAndLength.java
@@ -40,6 +40,8 @@ public final class BlobStreamAndLength {
@SuppressWarnings("rawtypes")
public static BlobStreamAndLength createBlobStreamAndLengthFromExchangeBody(final Exchange exchange) throws IOException {
Object body = exchange.getIn().getBody();
+ Long blobSize = exchange.getIn().getHeader(BlobConstants.BLOB_UPLOAD_SIZE, () -> null, Long.class);
+ exchange.getIn().removeHeader(BlobConstants.BLOB_UPLOAD_SIZE); // remove to avoid issues for further uploads
if (body instanceof WrappedFile) {
// unwrap file
@@ -47,7 +49,8 @@ public final class BlobStreamAndLength {
}
if (body instanceof InputStream) {
- return new BlobStreamAndLength((InputStream) body, BlobUtils.getInputStreamLength((InputStream) body));
+ return new BlobStreamAndLength(
+ (InputStream) body, blobSize != null ? blobSize : BlobUtils.getInputStreamLength((InputStream) body));
}
if (body instanceof File) {
return new BlobStreamAndLength(new BufferedInputStream(new FileInputStream((File) body)), ((File) body).length());
@@ -65,7 +68,7 @@ public final class BlobStreamAndLength {
throw new IllegalArgumentException("Unsupported blob type:" + body.getClass().getName());
}
- return new BlobStreamAndLength(inputStream, BlobUtils.getInputStreamLength(inputStream));
+ return new BlobStreamAndLength(inputStream, blobSize != null ? blobSize : BlobUtils.getInputStreamLength(inputStream));
}
public InputStream getInputStream() {
diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/client/BlobClientWrapper.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/client/BlobClientWrapper.java
index b99cff0bd14..f020b542fdf 100644
--- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/client/BlobClientWrapper.java
+++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/client/BlobClientWrapper.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.azure.storage.blob.client;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.HashMap;
@@ -52,6 +53,7 @@ import com.azure.storage.blob.models.PageBlobRequestConditions;
import com.azure.storage.blob.models.PageRange;
import com.azure.storage.blob.models.PageRangeItem;
import com.azure.storage.blob.models.ParallelTransferOptions;
+import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions;
import com.azure.storage.blob.options.ListPageRangesOptions;
import com.azure.storage.blob.sas.BlobSasPermission;
import com.azure.storage.blob.sas.BlobServiceSasSignatureValues;
@@ -60,7 +62,9 @@ import com.azure.storage.blob.specialized.BlobInputStream;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.blob.specialized.PageBlobClient;
import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.common.Utility;
import org.apache.camel.util.ObjectHelper;
+import reactor.core.publisher.Flux;
public class BlobClientWrapper {
private static final String SERVICE_URI_SEGMENT = ".blob.core.windows.net";
@@ -120,8 +124,10 @@ public class BlobClientWrapper {
final Map<String, String> metadata, AccessTier tier, final byte[] contentMd5,
final BlobRequestConditions requestConditions,
final Duration timeout) {
- return getBlockBlobClient().uploadWithResponse(data, length, headers, metadata, tier, contentMd5, requestConditions,
- timeout, Context.NONE);
+ Flux<ByteBuffer> dataBuffer = Utility.convertStreamToByteBuffer(data, length, 4194304, false);
+ BlockBlobSimpleUploadOptions uploadOptions = new BlockBlobSimpleUploadOptions(dataBuffer, length).setHeaders(headers)
+ .setMetadata(metadata).setTier(tier).setContentMd5(contentMd5).setRequestConditions(requestConditions);
+ return getBlockBlobClient().uploadWithResponse(uploadOptions, timeout, Context.NONE);
}
public HttpHeaders stageBlockBlob(
diff --git a/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/integration/BlobOperationsIT.java b/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/integration/BlobOperationsIT.java
index 88d5d9a99f7..6fe837e8f86 100644
--- a/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/integration/BlobOperationsIT.java
+++ b/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/integration/BlobOperationsIT.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.azure.storage.blob.integration;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -59,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
class BlobOperationsIT extends Base {
@@ -229,6 +231,37 @@ class BlobOperationsIT extends Base {
blobClientWrapper.delete(null, null, null);
}
+ @Test
+ void testUploadBlockBlobAsStreamWithBlobSizeHeader() throws Exception {
+ final BlobClientWrapper blobClientWrapper = blobContainerClientWrapper.getBlobClientWrapper("upload_test_file");
+ final BlobOperations operations = new BlobOperations(configuration, blobClientWrapper);
+
+ final File fileToUpload
+ = new File(Objects.requireNonNull(getClass().getClassLoader().getResource("upload_test_file")).getFile());
+ final Exchange exchange = new DefaultExchange(context);
+ exchange.getIn().setBody(new FileInputStream(fileToUpload));
+ exchange.getIn().setHeader(BlobConstants.BLOB_UPLOAD_SIZE, fileToUpload.length());
+
+ final BlobOperationResponse response = operations.uploadBlockBlob(exchange);
+
+ assertNotNull(response);
+ assertTrue((boolean) response.getBody());
+ // check for eTag and md5 to make sure is uploaded
+ assertNotNull(response.getHeaders().get(BlobConstants.E_TAG));
+ assertNotNull(response.getHeaders().get(BlobConstants.CONTENT_MD5));
+
+ // check that the size header got removed
+ assertNull(exchange.getIn().getHeader(BlobConstants.BLOB_UPLOAD_SIZE));
+
+ // check content
+ final BlobOperationResponse getBlobResponse = operations.getBlob(null);
+
+ assertEquals("awesome camel to upload!",
+ IOUtils.toString((InputStream) getBlobResponse.getBody(), Charset.defaultCharset()));
+
+ blobClientWrapper.delete(null, null, null);
+ }
+
@Test
void testCommitAndStageBlockBlob() throws Exception {
final BlobClientWrapper blobClientWrapper = blobContainerClientWrapper.getBlobClientWrapper("upload_test_file");