You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/01/26 22:21:25 UTC

[nifi] branch main updated: NIFI-10721 Avoid querying properties after Azure Blob upload

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 41649660be NIFI-10721 Avoid querying properties after Azure Blob upload
41649660be is described below

commit 41649660be11431ffb02e4e1618c6559b1eeac28
Author: Malthe Borch <mb...@gmail.com>
AuthorDate: Tue Nov 29 19:04:13 2022 +0100

    NIFI-10721 Avoid querying properties after Azure Blob upload
    
    This closes #6730
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../azure/AbstractAzureBlobProcessor_v12.java      | 28 ++++++++++++++++------
 .../azure/storage/PutAzureBlobStorage_v12.java     | 18 +++++++++++++-
 .../azure/storage/ITPutAzureBlobStorage_v12.java   | 19 ++++++++++++++-
 3 files changed, 56 insertions(+), 9 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
index be2197a01a..c1dd3dfdad 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
@@ -46,6 +46,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Supplier;
 
 import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME;
@@ -175,12 +176,25 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor {
     }
 
     protected void applyBlobMetadata(Map<String, String> attributes, BlobClient blobClient) {
-        BlobProperties properties = blobClient.getProperties();
-        attributes.put(ATTR_NAME_ETAG, properties.getETag());
-        attributes.put(ATTR_NAME_BLOBTYPE, properties.getBlobType().toString());
-        attributes.put(ATTR_NAME_MIME_TYPE, properties.getContentType());
-        attributes.put(ATTR_NAME_LANG, properties.getContentLanguage());
-        attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(properties.getLastModified()));
-        attributes.put(ATTR_NAME_LENGTH, String.valueOf(properties.getBlobSize()));
+        Supplier<BlobProperties> props = new Supplier() {
+            BlobProperties properties;
+            public BlobProperties get() {
+                if (properties == null) {
+                    properties = blobClient.getProperties();
+                }
+                return properties;
+            }
+        };
+
+        attributes.computeIfAbsent(ATTR_NAME_ETAG, key -> props.get().getETag());
+        attributes.computeIfAbsent(ATTR_NAME_BLOBTYPE, key -> props.get().getBlobType().toString());
+        attributes.computeIfAbsent(ATTR_NAME_MIME_TYPE, key -> props.get().getContentType());
+        attributes.computeIfAbsent(ATTR_NAME_TIMESTAMP, key -> String.valueOf(props.get().getLastModified()));
+        attributes.computeIfAbsent(ATTR_NAME_LENGTH, key -> String.valueOf(props.get().getBlobSize()));
+
+        // The LANG attribute is a special case because we allow it to be null.
+        if (!attributes.containsKey(ATTR_NAME_LANG)) {
+            attributes.put(ATTR_NAME_LANG, props.get().getContentLanguage());
+        }
     }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
index 8390438f84..3b7896e5c3 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
+import com.azure.core.http.rest.Response;
 import com.azure.core.util.Context;
 import com.azure.storage.blob.BlobClient;
 import com.azure.storage.blob.BlobContainerClient;
@@ -23,6 +24,8 @@ import com.azure.storage.blob.BlobServiceClient;
 import com.azure.storage.blob.models.BlobErrorCode;
 import com.azure.storage.blob.models.BlobRequestConditions;
 import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.BlobType;
+import com.azure.storage.blob.models.BlockBlobItem;
 import com.azure.storage.blob.options.BlobParallelUploadOptions;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -50,6 +53,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static com.azure.core.http.ContentType.APPLICATION_OCTET_STREAM;
 import static com.azure.core.util.FluxUtil.toFluxByteBuffer;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
@@ -161,7 +165,10 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
                 try (InputStream rawIn = session.read(flowFile)) {
                     final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
                     blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
-                    blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    long length = flowFile.getSize();
+                    applyUploadResultAttributes(attributes, blob, BlobType.BLOCK_BLOB, length);
                     applyBlobMetadata(attributes, blobClient);
                     if (ignore) {
                         attributes.put(ATTR_NAME_IGNORED, "false");
@@ -191,4 +198,13 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
             session.transfer(flowFile, REL_FAILURE);
         }
     }
+
+    private static void applyUploadResultAttributes(final Map<String, String> attributes, final BlockBlobItem blob, final BlobType blobType, final long length) {
+        attributes.put(ATTR_NAME_BLOBTYPE, blobType.toString());
+        attributes.put(ATTR_NAME_ETAG, blob.getETag());
+        attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+        attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+        attributes.put(ATTR_NAME_LANG, null);
+        attributes.put(ATTR_NAME_MIME_TYPE, APPLICATION_OCTET_STREAM);
+    }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
index a648296e02..e3481d194c 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
@@ -40,9 +40,18 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
+    public static class ITProcessor extends PutAzureBlobStorage_v12 {
+        public boolean blobMetadataApplied = false;
+        @Override
+        protected void applyBlobMetadata(Map<String, String> attributes, BlobClient blobClient) {
+            super.applyBlobMetadata(attributes, blobClient);
+            blobMetadataApplied = true;
+        }
+    }
+
     @Override
     protected Class<? extends Processor> getProcessorClass() {
-        return PutAzureBlobStorage_v12.class;
+        return ITProcessor.class;
     }
 
     @BeforeEach
@@ -57,6 +66,14 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
         assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA);
     }
 
+    @Test
+    public void testPutBlobApplyBlobMetadata() throws Exception {
+        runProcessor(BLOB_DATA);
+
+        assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA);
+        assertTrue(((ITProcessor) runner.getProcessor()).blobMetadataApplied);
+    }
+
     @Test
     public void testPutBlobWithSimpleNameUsingProxyConfigurationService() throws Exception {
         configureProxyService();