You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/11/15 20:40:39 UTC

[nifi] branch main updated: NIFI-10491 Added Conflict Resolution Strategy to PutAzureBlobStorage_v12

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new a71556f115 NIFI-10491 Added Conflict Resolution Strategy to PutAzureBlobStorage_v12
a71556f115 is described below

commit a71556f115af551e40f0004694dfe82a7d86535a
Author: Malthe Borch <mb...@gmail.com>
AuthorDate: Sat Sep 24 08:43:10 2022 +0200

    NIFI-10491 Added Conflict Resolution Strategy to PutAzureBlobStorage_v12
    
    This closes #6443
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../java/org/apache/nifi/util/MockFlowFile.java    |  2 +-
 .../azure/AbstractAzureBlobProcessor_v12.java      | 17 ++++--
 .../azure/storage/PutAzureBlobStorage_v12.java     | 69 ++++++++++++++++++----
 .../azure/storage/utils/BlobAttributes.java        |  6 ++
 .../storage/AbstractAzureBlobStorage_v12IT.java    | 14 ++++-
 .../azure/storage/ITFetchAzureBlobStorage_v12.java |  3 +-
 .../azure/storage/ITListAzureBlobStorage_v12.java  |  7 ++-
 .../azure/storage/ITPutAzureBlobStorage_v12.java   | 59 ++++++++++++++----
 ...=> AzureStorageConflictResolutionStrategy.java} | 33 +++++++----
 .../azure/storage/AzureStorageCredentialsType.java |  1 +
 10 files changed, 165 insertions(+), 46 deletions(-)

diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
index 36c95652eb..52c20e08fd 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
@@ -69,7 +69,7 @@ public class MockFlowFile implements FlowFileRecord {
     public MockFlowFile(final long id, final FlowFile toCopy) {
         this.creationTime = System.nanoTime();
         this.id = id;
-        entryDate = System.currentTimeMillis();
+        entryDate = toCopy.getEntryDate();
 
         final Map<String, String> attributesToCopy = toCopy.getAttributes();
         String filename = attributesToCopy.get(CoreAttributes.FILENAME.key());
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
index 991409d5b8..be2197a01a 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
@@ -161,21 +161,26 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor {
     }
 
     protected Map<String, String> createBlobAttributesMap(BlobClient blobClient) {
-        Map<String, String> attributes = new HashMap<>();
-
-        BlobProperties properties = blobClient.getProperties();
-        String primaryUri = String.format("%s/%s", blobClient.getContainerClient().getBlobContainerUrl(), blobClient.getBlobName());
+        final Map<String, String> attributes = new HashMap<>();
+        applyStandardBlobAttributes(attributes, blobClient);
+        applyBlobMetadata(attributes, blobClient);
+        return attributes;
+    }
 
+    protected void applyStandardBlobAttributes(Map<String, String> attributes, BlobClient blobClient) {
+        String primaryUri = blobClient.getBlobUrl().replace("%2F", "/");
         attributes.put(ATTR_NAME_CONTAINER, blobClient.getContainerName());
         attributes.put(ATTR_NAME_BLOBNAME, blobClient.getBlobName());
         attributes.put(ATTR_NAME_PRIMARY_URI, primaryUri);
+    }
+
+    protected void applyBlobMetadata(Map<String, String> attributes, BlobClient blobClient) {
+        BlobProperties properties = blobClient.getProperties();
         attributes.put(ATTR_NAME_ETAG, properties.getETag());
         attributes.put(ATTR_NAME_BLOBTYPE, properties.getBlobType().toString());
         attributes.put(ATTR_NAME_MIME_TYPE, properties.getContentType());
         attributes.put(ATTR_NAME_LANG, properties.getContentLanguage());
         attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(properties.getLastModified()));
         attributes.put(ATTR_NAME_LENGTH, String.valueOf(properties.getBlobSize()));
-
-        return attributes;
     }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
index 805832ecd8..8390438f84 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
@@ -16,9 +16,14 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
+import com.azure.core.util.Context;
 import com.azure.storage.blob.BlobClient;
 import com.azure.storage.blob.BlobContainerClient;
 import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobErrorCode;
+import com.azure.storage.blob.models.BlobRequestConditions;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.options.BlobParallelUploadOptions;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -35,19 +40,23 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
 import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
 
-import java.io.BufferedInputStream;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static com.azure.core.util.FluxUtil.toFluxByteBuffer;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
+import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_ERROR_CODE;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_ETAG;
+import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_IGNORED;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_LANG;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_LENGTH;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_MIME_TYPE;
@@ -56,7 +65,9 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBTYPE;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER;
+import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_ERROR_CODE;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_ETAG;
+import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_IGNORED;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_LANG;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_LENGTH;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_MIME_TYPE;
@@ -75,7 +86,9 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
         @WritesAttribute(attribute = ATTR_NAME_MIME_TYPE, description = ATTR_DESCRIPTION_MIME_TYPE),
         @WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG),
         @WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP),
-        @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
+        @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH),
+        @WritesAttribute(attribute = ATTR_NAME_ERROR_CODE, description = ATTR_DESCRIPTION_ERROR_CODE),
+        @WritesAttribute(attribute = ATTR_NAME_IGNORED, description = ATTR_DESCRIPTION_IGNORED)})
 public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
 
     public static final PropertyDescriptor CREATE_CONTAINER = new PropertyDescriptor.Builder()
@@ -91,10 +104,21 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
                     "will fail if the container does not exist.")
             .build();
 
+    public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+            .name("conflict-resolution-strategy")
+            .displayName("Conflict Resolution Strategy")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .allowableValues(AzureStorageConflictResolutionStrategy.class)
+            .defaultValue(AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION.getValue())
+            .description("Specifies whether an existing blob will have its contents replaced upon conflict.")
+            .build();
+
     private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
             STORAGE_CREDENTIALS_SERVICE,
             AzureStorageUtils.CONTAINER,
             CREATE_CONTAINER,
+            CONFLICT_RESOLUTION,
             BLOB_NAME,
             AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
     ));
@@ -110,9 +134,10 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
             return;
         }
 
-        String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
-        boolean createContainer = context.getProperty(CREATE_CONTAINER).asBoolean();
-        String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
+        final boolean createContainer = context.getProperty(CREATE_CONTAINER).asBoolean();
+        final String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final AzureStorageConflictResolutionStrategy conflictResolution = AzureStorageConflictResolutionStrategy.valueOf(context.getProperty(CONFLICT_RESOLUTION).getValue());
 
         long startNanos = System.nanoTime();
         try {
@@ -121,18 +146,40 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyStandardBlobAttributes(attributes, blobClient);
+            final boolean ignore = conflictResolution == AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            long length = flowFile.getSize();
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    applyBlobMetadata(attributes, blobClient);
+                    if (ignore) {
+                        attributes.put(ATTR_NAME_IGNORED, "false");
+                    }
+                }
+            } catch (BlobStorageException e) {
+                final BlobErrorCode errorCode = e.getErrorCode();
+                flowFile = session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS && ignore) {
+                    getLogger().info("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);
+                    attributes.put(ATTR_NAME_IGNORED, "true");
+                } else {
+                    throw e;
+                }
             }
 
-            Map<String, String> attributes = createBlobAttributesMap(blobClient);
             flowFile = session.putAllAttributes(flowFile, attributes);
-
             session.transfer(flowFile, REL_SUCCESS);
 
             long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java
index 7eaf2b801d..7ce30c1da7 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java
@@ -45,4 +45,10 @@ public final class BlobAttributes {
     public static final String ATTR_NAME_LENGTH = "azure.length";
     public static final String ATTR_DESCRIPTION_LENGTH = "Length of the blob";
 
+    public static final String ATTR_NAME_ERROR_CODE = "azure.error.code";
+    public static final String ATTR_DESCRIPTION_ERROR_CODE = "Error code reported during blob operation";
+
+    public static final String ATTR_NAME_IGNORED = "azure.ignored";
+    public static final String ATTR_DESCRIPTION_IGNORED = "When Conflict Resolution Strategy is 'ignore', " +
+            "this property will be true/false depending on whether the blob was ignored.";
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java
index b95d18a2c7..9d2af5d068 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java
@@ -34,6 +34,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 
 import java.io.ByteArrayInputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
@@ -137,10 +139,18 @@ public abstract class AbstractAzureBlobStorage_v12IT extends AbstractAzureStorag
         return attributes;
     }
 
-    protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String containerName, String blobName, int blobLength) {
+    protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile, String containerName, String blobName) throws UnsupportedEncodingException {
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_CONTAINER, containerName);
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBNAME, blobName);
-        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, blobName));
+        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
+                String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, URLEncoder.encode(
+                        blobName,
+                        StandardCharsets.US_ASCII.name()
+                ).replace("+", "%20").replace("%2F", "/"))
+        );
+    }
+
+    protected void assertFlowFileResultBlobAttributes(MockFlowFile flowFile, int blobLength) {
         flowFile.assertAttributeExists(BlobAttributes.ATTR_NAME_ETAG);
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_MIME_TYPE, "application/octet-stream");
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java
index 4218b6c726..270948fb8e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java
@@ -191,7 +191,8 @@ public class ITFetchAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT
 
         MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchAzureBlobStorage_v12.REL_SUCCESS).get(0);
 
-        assertFlowFileBlobAttributes(flowFile, getContainerName(), blobName, originalLength);
+        assertFlowFileCommonBlobAttributes(flowFile, getContainerName(), blobName);
+        assertFlowFileResultBlobAttributes(flowFile, originalLength);
 
         flowFile.assertContentEquals(blobData);
     }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java
index 5655cf622f..7421b5f74d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java
@@ -174,7 +174,8 @@ public class ITListAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
 
         runner.assertAllFlowFilesTransferred(ListAzureBlobStorage_v12.REL_SUCCESS, 1);
         MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListAzureBlobStorage_v12.REL_SUCCESS).get(0);
-        assertFlowFileBlobAttributes(flowFile, getContainerName(), "blob5", "Test".length());
+        assertFlowFileCommonBlobAttributes(flowFile, getContainerName(), "blob5");
+        assertFlowFileResultBlobAttributes(flowFile, "Test".length());
     }
 
     private void uploadBlobs() throws Exception {
@@ -207,8 +208,8 @@ public class ITListAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
     }
 
     private void assertFlowFile(MockFlowFile flowFile, String blobName) throws Exception {
-        assertFlowFileBlobAttributes(flowFile, getContainerName(), blobName, BLOB_DATA.length);
-
+        assertFlowFileCommonBlobAttributes(flowFile, getContainerName(), blobName);
+        assertFlowFileResultBlobAttributes(flowFile, BLOB_DATA.length);
         flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), blobName.substring(blobName.lastIndexOf('/') + 1));
 
         flowFile.assertContentEquals(EMPTY_CONTENT);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
index 762012246f..a648296e02 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
@@ -18,10 +18,12 @@ package org.apache.nifi.processors.azure.storage;
 
 import com.azure.storage.blob.BlobClient;
 import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.models.BlobErrorCode;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
 import org.apache.nifi.util.MockFlowFile;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -32,11 +34,12 @@ import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_ERROR_CODE;
+import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_IGNORED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
-
     @Override
     protected Class<? extends Processor> getProcessorClass() {
         return PutAzureBlobStorage_v12.class;
@@ -101,7 +104,7 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
 
         runProcessor(BLOB_DATA);
 
-        assertFailure(BLOB_DATA);
+        assertFailure(BLOB_DATA, BlobErrorCode.CONTAINER_NOT_FOUND);
     }
 
     @Test
@@ -136,7 +139,29 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
 
         runProcessor(BLOB_DATA);
 
-        assertFailure(BLOB_DATA);
+        MockFlowFile flowFile = assertFailure(BLOB_DATA, BlobErrorCode.BLOB_ALREADY_EXISTS);
+        assertEquals(flowFile.getAttribute(ATTR_NAME_IGNORED), null);
+    }
+
+    @Test
+    public void testPutBlobToExistingBlobConflictStrategyIgnore() throws Exception {
+        uploadBlob(BLOB_NAME, BLOB_DATA);
+        runner.setProperty(PutAzureBlobStorage_v12.CONFLICT_RESOLUTION, AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION.getValue());
+
+        runProcessor(BLOB_DATA);
+
+        MockFlowFile flowFile = assertIgnored(getContainerName(), BLOB_NAME);
+        assertEquals(flowFile.getAttribute(ATTR_NAME_IGNORED), "true");
+    }
+
+    @Test
+    public void testPutBlobToExistingBlobConflictStrategyReplace() throws Exception {
+        uploadBlob(BLOB_NAME, BLOB_DATA);
+        runner.setProperty(PutAzureBlobStorage_v12.CONFLICT_RESOLUTION, AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION.getValue());
+
+        runProcessor(BLOB_DATA);
+
+        assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA);
     }
 
     @Test
@@ -158,20 +183,30 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
         runner.run();
     }
 
-    private void assertSuccess(String containerName, String blobName, byte[] blobData) throws Exception {
-        assertFlowFile(containerName, blobName, blobData);
+    private MockFlowFile assertSuccess(String containerName, String blobName, byte[] blobData) throws Exception {
+        MockFlowFile flowFile = assertFlowFile(containerName, blobName, blobData);
         assertAzureBlob(containerName, blobName, blobData);
         assertProvenanceEvents();
+        return flowFile;
     }
 
-    private void assertFlowFile(String containerName, String blobName, byte[] blobData) throws Exception {
-        runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1);
+    private MockFlowFile assertIgnored(String containerName, String blobName) throws Exception {
+        MockFlowFile flowFile = assertFlowFile(containerName, blobName, null);
+        assertProvenanceEvents();
+        return flowFile;
+    }
 
-        MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS).get(0);
+    private MockFlowFile assertFlowFile(String containerName, String blobName, byte[] blobData) throws Exception {
+        runner.assertAllFlowFilesTransferred(PutAzureBlobStorage_v12.REL_SUCCESS, 1);
 
-        assertFlowFileBlobAttributes(flowFile, containerName, blobName, blobData.length);
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutAzureBlobStorage_v12.REL_SUCCESS).get(0);
 
-        flowFile.assertContentEquals(blobData);
+        assertFlowFileCommonBlobAttributes(flowFile, containerName, blobName);
+        if (blobData != null) {
+            assertFlowFileResultBlobAttributes(flowFile, blobData.length);
+            flowFile.assertContentEquals(blobData);
+        }
+        return flowFile;
     }
 
     private void assertAzureBlob(String containerName, String blobName, byte[] blobData) {
@@ -191,10 +226,12 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
         assertEquals(expectedEventTypes, actualEventTypes);
     }
 
-    private void assertFailure(byte[] blobData) throws Exception {
+    private MockFlowFile assertFailure(byte[] blobData, BlobErrorCode errorCode) throws Exception {
         runner.assertAllFlowFilesTransferred(PutAzureBlobStorage_v12.REL_FAILURE, 1);
 
         MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteAzureBlobStorage_v12.REL_FAILURE).get(0);
         flowFile.assertContentEquals(blobData);
+        flowFile.assertAttributeEquals(ATTR_NAME_ERROR_CODE, errorCode.toString());
+        return flowFile;
     }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictResolutionStrategy.java
similarity index 51%
copy from nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java
copy to nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictResolutionStrategy.java
index ad68440eee..30ecd49bc9 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictResolutionStrategy.java
@@ -16,26 +16,37 @@
  */
 package org.apache.nifi.services.azure.storage;
 
-import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.DescribedValue;
 
-public enum AzureStorageCredentialsType {
-
-    ACCOUNT_KEY("Account Key", "The primary or secondary Account Key of the storage account that provides full access to the resources in the account"),
-    SAS_TOKEN("SAS Token", "SAS (Shared Access Signature) Token generated for accessing resources in the storage account"),
-    MANAGED_IDENTITY("Managed Identity", "Azure Virtual Machine Managed Identity (it can only be used when NiFi is running on Azure)"),
-    SERVICE_PRINCIPAL("Service Principal", "Azure Active Directory Service Principal with Client Id / Client Secret of a registered application"),
-    ACCESS_TOKEN("Access Token", "Access Token provided by custom controller service implementations");
+public enum AzureStorageConflictResolutionStrategy implements DescribedValue {
+    FAIL_RESOLUTION("fail", "Fail if the blob already exists"),
+    IGNORE_RESOLUTION("ignore",
+            String.format(
+                "Ignore if the blob already exists; the 'azure.error' attribute will be set to the value 'BLOB_ALREADY_EXISTS'"
+            )
+    ),
+    REPLACE_RESOLUTION("replace", "Replace blob contents if the blob already exist");
 
     private final String label;
     private final String description;
 
-    AzureStorageCredentialsType(String label, String description) {
+    AzureStorageConflictResolutionStrategy(String label, String description) {
         this.label = label;
         this.description = description;
     }
 
-    public AllowableValue getAllowableValue() {
-        return new AllowableValue(name(), label, description);
+    @Override
+    public String getValue() {
+        return this.name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return label;
     }
 
+    @Override
+    public String getDescription() {
+        return description;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java
index ad68440eee..4b83dcd3a2 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java
@@ -39,3 +39,4 @@ public enum AzureStorageCredentialsType {
     }
 
 }
+