You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/10/13 17:21:20 UTC

[GitHub] [nifi] nandorsoma commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r994887992


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();

Review Comment:
   These attributes were applied after a successful upload. Do I understand correctly that the intention now is to apply attributes to the failed FlowFile that already have value without triggering upload? Is this a requirement for a specific use case? I'm asking because I worry whether it could break an existing flow. For example, what happens if a downstream processor relies on the existence of one of these properties? What do you think, @exceptionfactory?
   
   I'd still change this part slightly if the above is not an issue.
   - I wouldn't assume the `MIME_TYPE`. I'd append it after a successful upload and investigate why the presence of `ATTR_NAME_LANG` was needed in the failing test you linked in one of your comments.
   - Currently, the code is a bit verbose. Those attribute changes distract attention from the primary function, the upload itself. Therefore I'd extract this part to something like `applyCommonAttributes()` and lines 170-173 to something like `applyUploadResultAttributes`. This is just an idea. If you know a better naming, go for it.
   
   Sorry if I didn't notice it on the first review. Can it happen that this change came with the force push?



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
             long length = flowFile.getSize();
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    attributes.put(ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
+                    attributes.put(ATTR_NAME_ETAG, blob.getETag());
+                    attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+                    attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+                }
+            } catch (BlobStorageException e) {
+                if (conflictResolution == AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+                    throw e;
+                }
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {
+                    getLogger().warn("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);
+                    attributes.putAll(createBlobAttributesMap(blobClient));
+                    attributes.put(ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());

Review Comment:
   I wonder whether it is misleading to apply `ATTR_NAME_ERROR_CODE` in an expected situation. That we rely on an error code internally is irrelevant from the user's point of view. Probably I'd instead use the "ignored" property.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
             long length = flowFile.getSize();
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    attributes.put(ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
+                    attributes.put(ATTR_NAME_ETAG, blob.getETag());
+                    attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+                    attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+                }
+            } catch (BlobStorageException e) {
+                if (conflictResolution == AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+                    throw e;
+                }
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {
+                    getLogger().warn("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);

Review Comment:
   Since this is an expected scenario, I wouldn't log with a warning log level. I would instead use debug.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -137,10 +139,15 @@ protected Map<String, String> initCommonExpressionLanguageAttributes() {
         return attributes;
     }
 
-    protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String containerName, String blobName, int blobLength) {
+    protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String containerName, String blobName, int blobLength) throws UnsupportedEncodingException {
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_CONTAINER, containerName);
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBNAME, blobName);
-        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, blobName));
+        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
+                String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, URLEncoder.encode(
+                        blobName,
+                        StandardCharsets.US_ASCII.name()
+                ).replace("+", "%20"))

Review Comment:
   After using URLEncoder, this replacement seems to be unnecessary.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
             long length = flowFile.getSize();
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    attributes.put(ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
+                    attributes.put(ATTR_NAME_ETAG, blob.getETag());
+                    attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+                    attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+                }
+            } catch (BlobStorageException e) {
+                if (conflictResolution == AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+                    throw e;
+                }
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {

Review Comment:
   I would add `&& conflictResolution == IGNORE_RESOLUTION` to this statement to make the code more intuitive. It doesn't change the result but makes understanding what is going on easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org