You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/11/15 20:40:39 UTC
[nifi] branch main updated: NIFI-10491 Added Conflict Resolution Strategy to PutAzureBlobStorage_v12
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new a71556f115 NIFI-10491 Added Conflict Resolution Strategy to PutAzureBlobStorage_v12
a71556f115 is described below
commit a71556f115af551e40f0004694dfe82a7d86535a
Author: Malthe Borch <mb...@gmail.com>
AuthorDate: Sat Sep 24 08:43:10 2022 +0200
NIFI-10491 Added Conflict Resolution Strategy to PutAzureBlobStorage_v12
This closes #6443
Signed-off-by: David Handermann <ex...@apache.org>
---
.../java/org/apache/nifi/util/MockFlowFile.java | 2 +-
.../azure/AbstractAzureBlobProcessor_v12.java | 17 ++++--
.../azure/storage/PutAzureBlobStorage_v12.java | 69 ++++++++++++++++++----
.../azure/storage/utils/BlobAttributes.java | 6 ++
.../storage/AbstractAzureBlobStorage_v12IT.java | 14 ++++-
.../azure/storage/ITFetchAzureBlobStorage_v12.java | 3 +-
.../azure/storage/ITListAzureBlobStorage_v12.java | 7 ++-
.../azure/storage/ITPutAzureBlobStorage_v12.java | 59 ++++++++++++++----
...=> AzureStorageConflictResolutionStrategy.java} | 33 +++++++----
.../azure/storage/AzureStorageCredentialsType.java | 1 +
10 files changed, 165 insertions(+), 46 deletions(-)
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
index 36c95652eb..52c20e08fd 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
@@ -69,7 +69,7 @@ public class MockFlowFile implements FlowFileRecord {
public MockFlowFile(final long id, final FlowFile toCopy) {
this.creationTime = System.nanoTime();
this.id = id;
- entryDate = System.currentTimeMillis();
+ entryDate = toCopy.getEntryDate();
final Map<String, String> attributesToCopy = toCopy.getAttributes();
String filename = attributesToCopy.get(CoreAttributes.FILENAME.key());
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
index 991409d5b8..be2197a01a 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
@@ -161,21 +161,26 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor {
}
protected Map<String, String> createBlobAttributesMap(BlobClient blobClient) {
- Map<String, String> attributes = new HashMap<>();
-
- BlobProperties properties = blobClient.getProperties();
- String primaryUri = String.format("%s/%s", blobClient.getContainerClient().getBlobContainerUrl(), blobClient.getBlobName());
+ final Map<String, String> attributes = new HashMap<>();
+ applyStandardBlobAttributes(attributes, blobClient);
+ applyBlobMetadata(attributes, blobClient);
+ return attributes;
+ }
+ protected void applyStandardBlobAttributes(Map<String, String> attributes, BlobClient blobClient) {
+ String primaryUri = blobClient.getBlobUrl().replace("%2F", "/");
attributes.put(ATTR_NAME_CONTAINER, blobClient.getContainerName());
attributes.put(ATTR_NAME_BLOBNAME, blobClient.getBlobName());
attributes.put(ATTR_NAME_PRIMARY_URI, primaryUri);
+ }
+
+ protected void applyBlobMetadata(Map<String, String> attributes, BlobClient blobClient) {
+ BlobProperties properties = blobClient.getProperties();
attributes.put(ATTR_NAME_ETAG, properties.getETag());
attributes.put(ATTR_NAME_BLOBTYPE, properties.getBlobType().toString());
attributes.put(ATTR_NAME_MIME_TYPE, properties.getContentType());
attributes.put(ATTR_NAME_LANG, properties.getContentLanguage());
attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(properties.getLastModified()));
attributes.put(ATTR_NAME_LENGTH, String.valueOf(properties.getBlobSize()));
-
- return attributes;
}
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
index 805832ecd8..8390438f84 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
@@ -16,9 +16,14 @@
*/
package org.apache.nifi.processors.azure.storage;
+import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobErrorCode;
+import com.azure.storage.blob.models.BlobRequestConditions;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.options.BlobParallelUploadOptions;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -35,19 +40,23 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
-import java.io.BufferedInputStream;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static com.azure.core.util.FluxUtil.toFluxByteBuffer;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
+import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_ERROR_CODE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_ETAG;
+import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_IGNORED;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_LANG;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_LENGTH;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_MIME_TYPE;
@@ -56,7 +65,9 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBTYPE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER;
+import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_ERROR_CODE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_ETAG;
+import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_IGNORED;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_LANG;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_LENGTH;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_MIME_TYPE;
@@ -75,7 +86,9 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
@WritesAttribute(attribute = ATTR_NAME_MIME_TYPE, description = ATTR_DESCRIPTION_MIME_TYPE),
@WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG),
@WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP),
- @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
+ @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH),
+ @WritesAttribute(attribute = ATTR_NAME_ERROR_CODE, description = ATTR_DESCRIPTION_ERROR_CODE),
+ @WritesAttribute(attribute = ATTR_NAME_IGNORED, description = ATTR_DESCRIPTION_IGNORED)})
public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
public static final PropertyDescriptor CREATE_CONTAINER = new PropertyDescriptor.Builder()
@@ -91,10 +104,21 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
"will fail if the container does not exist.")
.build();
+ public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+ .name("conflict-resolution-strategy")
+ .displayName("Conflict Resolution Strategy")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .required(true)
+ .allowableValues(AzureStorageConflictResolutionStrategy.class)
+ .defaultValue(AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION.getValue())
+ .description("Specifies whether an existing blob will have its contents replaced upon conflict.")
+ .build();
+
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
STORAGE_CREDENTIALS_SERVICE,
AzureStorageUtils.CONTAINER,
CREATE_CONTAINER,
+ CONFLICT_RESOLUTION,
BLOB_NAME,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@@ -110,9 +134,10 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
return;
}
- String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
- boolean createContainer = context.getProperty(CREATE_CONTAINER).asBoolean();
- String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String containerName = context.getProperty(AzureStorageUtils.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
+ final boolean createContainer = context.getProperty(CREATE_CONTAINER).asBoolean();
+ final String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final AzureStorageConflictResolutionStrategy conflictResolution = AzureStorageConflictResolutionStrategy.valueOf(context.getProperty(CONFLICT_RESOLUTION).getValue());
long startNanos = System.nanoTime();
try {
@@ -121,18 +146,40 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
if (createContainer && !containerClient.exists()) {
containerClient.create();
}
+
BlobClient blobClient = containerClient.getBlobClient(blobName);
+ final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+ Map<String, String> attributes = new HashMap<>();
+ applyStandardBlobAttributes(attributes, blobClient);
+ final boolean ignore = conflictResolution == AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+ try {
+ if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+ blobRequestConditions.setIfNoneMatch("*");
+ }
- long length = flowFile.getSize();
+ try (InputStream rawIn = session.read(flowFile)) {
+ final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+ blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+ blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+ applyBlobMetadata(attributes, blobClient);
+ if (ignore) {
+ attributes.put(ATTR_NAME_IGNORED, "false");
+ }
+ }
+ } catch (BlobStorageException e) {
+ final BlobErrorCode errorCode = e.getErrorCode();
+ flowFile = session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());
- try (InputStream rawIn = session.read(flowFile);
- BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
- blobClient.upload(bufferedIn, length);
+ if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS && ignore) {
+ getLogger().info("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);
+ attributes.put(ATTR_NAME_IGNORED, "true");
+ } else {
+ throw e;
+ }
}
- Map<String, String> attributes = createBlobAttributesMap(blobClient);
flowFile = session.putAllAttributes(flowFile, attributes);
-
session.transfer(flowFile, REL_SUCCESS);
long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java
index 7eaf2b801d..7ce30c1da7 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java
@@ -45,4 +45,10 @@ public final class BlobAttributes {
public static final String ATTR_NAME_LENGTH = "azure.length";
public static final String ATTR_DESCRIPTION_LENGTH = "Length of the blob";
+ public static final String ATTR_NAME_ERROR_CODE = "azure.error.code";
+ public static final String ATTR_DESCRIPTION_ERROR_CODE = "Error code reported during blob operation";
+
+ public static final String ATTR_NAME_IGNORED = "azure.ignored";
+ public static final String ATTR_DESCRIPTION_IGNORED = "When Conflict Resolution Strategy is 'ignore', " +
+ "this property will be true/false depending on whether the blob was ignored.";
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java
index b95d18a2c7..9d2af5d068 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java
@@ -34,6 +34,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import java.io.ByteArrayInputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@@ -137,10 +139,18 @@ public abstract class AbstractAzureBlobStorage_v12IT extends AbstractAzureStorag
return attributes;
}
- protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String containerName, String blobName, int blobLength) {
+ protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile, String containerName, String blobName) throws UnsupportedEncodingException {
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_CONTAINER, containerName);
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBNAME, blobName);
- flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, blobName));
+ flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
+ String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, URLEncoder.encode(
+ blobName,
+ StandardCharsets.US_ASCII.name()
+ ).replace("+", "%20").replace("%2F", "/"))
+ );
+ }
+
+ protected void assertFlowFileResultBlobAttributes(MockFlowFile flowFile, int blobLength) {
flowFile.assertAttributeExists(BlobAttributes.ATTR_NAME_ETAG);
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_MIME_TYPE, "application/octet-stream");
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java
index 4218b6c726..270948fb8e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java
@@ -191,7 +191,8 @@ public class ITFetchAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT
MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchAzureBlobStorage_v12.REL_SUCCESS).get(0);
- assertFlowFileBlobAttributes(flowFile, getContainerName(), blobName, originalLength);
+ assertFlowFileCommonBlobAttributes(flowFile, getContainerName(), blobName);
+ assertFlowFileResultBlobAttributes(flowFile, originalLength);
flowFile.assertContentEquals(blobData);
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java
index 5655cf622f..7421b5f74d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java
@@ -174,7 +174,8 @@ public class ITListAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
runner.assertAllFlowFilesTransferred(ListAzureBlobStorage_v12.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListAzureBlobStorage_v12.REL_SUCCESS).get(0);
- assertFlowFileBlobAttributes(flowFile, getContainerName(), "blob5", "Test".length());
+ assertFlowFileCommonBlobAttributes(flowFile, getContainerName(), "blob5");
+ assertFlowFileResultBlobAttributes(flowFile, "Test".length());
}
private void uploadBlobs() throws Exception {
@@ -207,8 +208,8 @@ public class ITListAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
}
private void assertFlowFile(MockFlowFile flowFile, String blobName) throws Exception {
- assertFlowFileBlobAttributes(flowFile, getContainerName(), blobName, BLOB_DATA.length);
-
+ assertFlowFileCommonBlobAttributes(flowFile, getContainerName(), blobName);
+ assertFlowFileResultBlobAttributes(flowFile, BLOB_DATA.length);
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), blobName.substring(blobName.lastIndexOf('/') + 1));
flowFile.assertContentEquals(EMPTY_CONTENT);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
index 762012246f..a648296e02 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
@@ -18,10 +18,12 @@ package org.apache.nifi.processors.azure.storage;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.models.BlobErrorCode;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -32,11 +34,12 @@ import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_ERROR_CODE;
+import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_IGNORED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
-
@Override
protected Class<? extends Processor> getProcessorClass() {
return PutAzureBlobStorage_v12.class;
@@ -101,7 +104,7 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
runProcessor(BLOB_DATA);
- assertFailure(BLOB_DATA);
+ assertFailure(BLOB_DATA, BlobErrorCode.CONTAINER_NOT_FOUND);
}
@Test
@@ -136,7 +139,29 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
runProcessor(BLOB_DATA);
- assertFailure(BLOB_DATA);
+ MockFlowFile flowFile = assertFailure(BLOB_DATA, BlobErrorCode.BLOB_ALREADY_EXISTS);
+ assertEquals(flowFile.getAttribute(ATTR_NAME_IGNORED), null);
+ }
+
+ @Test
+ public void testPutBlobToExistingBlobConflictStrategyIgnore() throws Exception {
+ uploadBlob(BLOB_NAME, BLOB_DATA);
+ runner.setProperty(PutAzureBlobStorage_v12.CONFLICT_RESOLUTION, AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION.getValue());
+
+ runProcessor(BLOB_DATA);
+
+ MockFlowFile flowFile = assertIgnored(getContainerName(), BLOB_NAME);
+ assertEquals(flowFile.getAttribute(ATTR_NAME_IGNORED), "true");
+ }
+
+ @Test
+ public void testPutBlobToExistingBlobConflictStrategyReplace() throws Exception {
+ uploadBlob(BLOB_NAME, BLOB_DATA);
+ runner.setProperty(PutAzureBlobStorage_v12.CONFLICT_RESOLUTION, AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION.getValue());
+
+ runProcessor(BLOB_DATA);
+
+ assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA);
}
@Test
@@ -158,20 +183,30 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
runner.run();
}
- private void assertSuccess(String containerName, String blobName, byte[] blobData) throws Exception {
- assertFlowFile(containerName, blobName, blobData);
+ private MockFlowFile assertSuccess(String containerName, String blobName, byte[] blobData) throws Exception {
+ MockFlowFile flowFile = assertFlowFile(containerName, blobName, blobData);
assertAzureBlob(containerName, blobName, blobData);
assertProvenanceEvents();
+ return flowFile;
}
- private void assertFlowFile(String containerName, String blobName, byte[] blobData) throws Exception {
- runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1);
+ private MockFlowFile assertIgnored(String containerName, String blobName) throws Exception {
+ MockFlowFile flowFile = assertFlowFile(containerName, blobName, null);
+ assertProvenanceEvents();
+ return flowFile;
+ }
- MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS).get(0);
+ private MockFlowFile assertFlowFile(String containerName, String blobName, byte[] blobData) throws Exception {
+ runner.assertAllFlowFilesTransferred(PutAzureBlobStorage_v12.REL_SUCCESS, 1);
- assertFlowFileBlobAttributes(flowFile, containerName, blobName, blobData.length);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutAzureBlobStorage_v12.REL_SUCCESS).get(0);
- flowFile.assertContentEquals(blobData);
+ assertFlowFileCommonBlobAttributes(flowFile, containerName, blobName);
+ if (blobData != null) {
+ assertFlowFileResultBlobAttributes(flowFile, blobData.length);
+ flowFile.assertContentEquals(blobData);
+ }
+ return flowFile;
}
private void assertAzureBlob(String containerName, String blobName, byte[] blobData) {
@@ -191,10 +226,12 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
assertEquals(expectedEventTypes, actualEventTypes);
}
- private void assertFailure(byte[] blobData) throws Exception {
+ private MockFlowFile assertFailure(byte[] blobData, BlobErrorCode errorCode) throws Exception {
runner.assertAllFlowFilesTransferred(PutAzureBlobStorage_v12.REL_FAILURE, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(DeleteAzureBlobStorage_v12.REL_FAILURE).get(0);
flowFile.assertContentEquals(blobData);
+ flowFile.assertAttributeEquals(ATTR_NAME_ERROR_CODE, errorCode.toString());
+ return flowFile;
}
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictResolutionStrategy.java
similarity index 51%
copy from nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java
copy to nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictResolutionStrategy.java
index ad68440eee..30ecd49bc9 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictResolutionStrategy.java
@@ -16,26 +16,37 @@
*/
package org.apache.nifi.services.azure.storage;
-import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.DescribedValue;
-public enum AzureStorageCredentialsType {
-
- ACCOUNT_KEY("Account Key", "The primary or secondary Account Key of the storage account that provides full access to the resources in the account"),
- SAS_TOKEN("SAS Token", "SAS (Shared Access Signature) Token generated for accessing resources in the storage account"),
- MANAGED_IDENTITY("Managed Identity", "Azure Virtual Machine Managed Identity (it can only be used when NiFi is running on Azure)"),
- SERVICE_PRINCIPAL("Service Principal", "Azure Active Directory Service Principal with Client Id / Client Secret of a registered application"),
- ACCESS_TOKEN("Access Token", "Access Token provided by custom controller service implementations");
+public enum AzureStorageConflictResolutionStrategy implements DescribedValue {
+ FAIL_RESOLUTION("fail", "Fail if the blob already exists"),
+ IGNORE_RESOLUTION("ignore",
+ String.format(
+ "Ignore if the blob already exists; the 'azure.error' attribute will be set to the value 'BLOB_ALREADY_EXISTS'"
+ )
+ ),
+ REPLACE_RESOLUTION("replace", "Replace blob contents if the blob already exist");
private final String label;
private final String description;
- AzureStorageCredentialsType(String label, String description) {
+ AzureStorageConflictResolutionStrategy(String label, String description) {
this.label = label;
this.description = description;
}
- public AllowableValue getAllowableValue() {
- return new AllowableValue(name(), label, description);
+ @Override
+ public String getValue() {
+ return this.name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return label;
}
+ @Override
+ public String getDescription() {
+ return description;
+ }
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java
index ad68440eee..4b83dcd3a2 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java
@@ -39,3 +39,4 @@ public enum AzureStorageCredentialsType {
}
}
+