You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/09/22 13:36:25 UTC

[GitHub] [nifi] malthe opened a new pull request, #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

malthe opened a new pull request, #6443:
URL: https://github.com/apache/nifi/pull/6443

   <!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
   <!-- contributor license agreements.  See the NOTICE file distributed with -->
   <!-- this work for additional information regarding copyright ownership. -->
   <!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
   <!-- (the "License"); you may not use this file except in compliance with -->
   <!-- the License.  You may obtain a copy of the License at -->
   <!--     http://www.apache.org/licenses/LICENSE-2.0 -->
   <!-- Unless required by applicable law or agreed to in writing, software -->
   <!-- distributed under the License is distributed on an "AS IS" BASIS, -->
   <!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
   <!-- See the License for the specific language governing permissions and -->
   <!-- limitations under the License. -->
   
   # Summary
   
   [NIFI-10491](https://issues.apache.org/jira/browse/NIFI-10491)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [x] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [x] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [x] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [x] Pull Request based on current revision of the `main` branch
   - [x] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [x] Build completed using `mvn clean install -P contrib-check`
     - [ ] JDK 8
     - [ ] JDK 11
     - [x] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r991318264


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml:
##########
@@ -40,5 +40,9 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-storage-blob</artifactId>
+        </dependency>

Review Comment:
   That's fair – fixed. Doesn't seem like something they'll go and change any day either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1002224301


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);
+
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    long length = flowFile.getSize();
+                    applyUploadResultAttributes(attributes, blob, BlobType.BLOCK_BLOB, length);
 
-            long length = flowFile.getSize();
+                }
+            } catch (BlobStorageException e) {
+                final BlobErrorCode errorCode = e.getErrorCode();
+                session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());

Review Comment:
   But the tests indicate that the attribute is correctly set. Is that a difference between the mock implementation and the real one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1002147977


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
             long length = flowFile.getSize();
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    attributes.put(ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
+                    attributes.put(ATTR_NAME_ETAG, blob.getETag());
+                    attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+                    attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+                }
+            } catch (BlobStorageException e) {
+                if (conflictResolution == AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+                    throw e;
+                }
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {
+                    getLogger().warn("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);

Review Comment:
   PutHDFS is also logging this case with info, so it is good. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1015602816


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +148,40 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyStandardBlobAttributes(attributes, blobClient);
+            final boolean ignore = conflictResolution == AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            long length = flowFile.getSize();
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);

Review Comment:
   You don't have to declare the response here because it is not used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r994887992


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();

Review Comment:
   These attributes were applied after a successful upload. Do I understand correctly that the intention now is to apply attributes to the failed FlowFile that already have value without triggering upload? Is this a requirement for a specific use case? I'm asking because I worry whether it could break an existing flow. For example, what happens if a downstream processor relies on the existence of one of these properties? What do you think, @exceptionfactory?
   
   I'd still change this part slightly if the above is not an issue.
   - I wouldn't assume the `MIME_TYPE`. I'd append it after a successful upload and investigate why the presence of `ATTR_NAME_LANG` was needed in the failing test you linked in one of your comments.
   - Currently, the code is a bit verbose. Those attribute changes distract attention from the primary function, the upload itself. Therefore I'd extract this part to something like `applyCommonAttributes()` and lines 170-173 to something like `applyUploadResultAttributes`. This is just an idea. If you know a better naming, go for it.
   
   Sorry if I didn't notice it on the first review. Can it happen that this change came with the force push?



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
             long length = flowFile.getSize();
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    attributes.put(ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
+                    attributes.put(ATTR_NAME_ETAG, blob.getETag());
+                    attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+                    attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+                }
+            } catch (BlobStorageException e) {
+                if (conflictResolution == AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+                    throw e;
+                }
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {
+                    getLogger().warn("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);
+                    attributes.putAll(createBlobAttributesMap(blobClient));
+                    attributes.put(ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());

Review Comment:
   I wonder whether it is misleading to apply `ATTR_NAME_ERROR_CODE` in an expected situation. That we rely on an error code internally is irrelevant from the user's point of view. Probably I'd instead use the "ignored" property.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
             long length = flowFile.getSize();
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    attributes.put(ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
+                    attributes.put(ATTR_NAME_ETAG, blob.getETag());
+                    attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+                    attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+                }
+            } catch (BlobStorageException e) {
+                if (conflictResolution == AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+                    throw e;
+                }
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {
+                    getLogger().warn("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);

Review Comment:
   Since this is an expected scenario, I wouldn't log with a warning log level. I would instead use debug.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -137,10 +139,15 @@ protected Map<String, String> initCommonExpressionLanguageAttributes() {
         return attributes;
     }
 
-    protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String containerName, String blobName, int blobLength) {
+    protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String containerName, String blobName, int blobLength) throws UnsupportedEncodingException {
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_CONTAINER, containerName);
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBNAME, blobName);
-        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, blobName));
+        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
+                String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, URLEncoder.encode(
+                        blobName,
+                        StandardCharsets.US_ASCII.name()
+                ).replace("+", "%20"))

Review Comment:
   After using URLEncoder, this replacement seems to be unnecessary.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
             long length = flowFile.getSize();
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    attributes.put(ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
+                    attributes.put(ATTR_NAME_ETAG, blob.getETag());
+                    attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+                    attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+                }
+            } catch (BlobStorageException e) {
+                if (conflictResolution == AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+                    throw e;
+                }
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {

Review Comment:
   I would add `&& conflictResolution == IGNORE_RESOLUTION` to this statement to make the code more intuitive. It doesn't change the result but makes understanding what is going on easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r996266396


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
             long length = flowFile.getSize();
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    attributes.put(ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
+                    attributes.put(ATTR_NAME_ETAG, blob.getETag());
+                    attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+                    attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+                }
+            } catch (BlobStorageException e) {
+                if (conflictResolution == AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+                    throw e;
+                }
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {
+                    getLogger().warn("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);

Review Comment:
   I have changed it to "info" – most processors are configured for WARN and it is perhaps slightly more informational than debug level I think, especially since we have defined success as also including these ignored blobs (when that strategy is selected).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r995686580


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
             long length = flowFile.getSize();
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    attributes.put(ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
+                    attributes.put(ATTR_NAME_ETAG, blob.getETag());
+                    attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+                    attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+                }
+            } catch (BlobStorageException e) {
+                if (conflictResolution == AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+                    throw e;
+                }
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {
+                    getLogger().warn("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);

Review Comment:
   I agree, but ADLS processor does the same thing. That doesn't mean of course that we shouldn't fix it.
   
   But perhaps separately?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1002231318


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();

Review Comment:
   I see what you mean now – you're right, in the case where we're not replacing the blob, it makes no sense to set those attributes. Fixed in 438c4518b317b71c24fa2633e591fbdd6ecc35e1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r985732544


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            Relationship relationship = REL_SUCCESS;
+
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
 
-            long length = flowFile.getSize();
+            try {
+                if (conflictStrategy == AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) {
+                    OffsetDateTime ifUnmodifiedSince = OffsetDateTime.ofInstant(
+                            Instant.ofEpochMilli(flowFile.getEntryDate()),
+                            ZoneId.systemDefault()
+                    );
+                    blobRequestConditions.setIfUnmodifiedSince(ifUnmodifiedSince);
+                } else if (conflictStrategy != AzureStorageConflictStrategy.OVERWRITE) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                }
+            } catch (BlobStorageException e) {
+                if (conflictStrategy == AzureStorageConflictStrategy.FAIL) throw e;
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS || errorCode == BlobErrorCode.CONDITION_NOT_MET) {
+                    relationship = REL_SKIPPED;

Review Comment:
   I have done this now – except instead of a "skipped" attribute or such, I have introduced a new "azure.error" attribute which then has the value of "BlobAlreadyExists" (the official error code string) if ... the blob already existed (and if that is an error).



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            Relationship relationship = REL_SUCCESS;
+
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
 
-            long length = flowFile.getSize();
+            try {
+                if (conflictStrategy == AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) {
+                    OffsetDateTime ifUnmodifiedSince = OffsetDateTime.ofInstant(
+                            Instant.ofEpochMilli(flowFile.getEntryDate()),
+                            ZoneId.systemDefault()
+                    );
+                    blobRequestConditions.setIfUnmodifiedSince(ifUnmodifiedSince);
+                } else if (conflictStrategy != AzureStorageConflictStrategy.OVERWRITE) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);

Review Comment:
   I have decided to put this mode back into the drawer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1007251404


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -16,9 +16,17 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
+import com.azure.core.http.rest.Response;

Review Comment:
   Won't be needed when other suggestions are accepted.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);
+            final boolean ignore = conflictResolution == AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            long length = flowFile.getSize();
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    long length = flowFile.getSize();
+                    applyUploadResultAttributes(attributes, blob, BlobType.BLOCK_BLOB, length);

Review Comment:
   ```suggestion
                       blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
                       applyBlobMetadata(attributes, blobClient);
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -144,4 +194,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             session.transfer(flowFile, REL_FAILURE);
         }
     }
+
+    private static void applyUploadResultAttributes(final Map<String, String> attributes, final BlockBlobItem blob, final BlobType blobType, final long length) {
+        attributes.put(ATTR_NAME_BLOBTYPE, blobType.toString());
+        attributes.put(ATTR_NAME_ETAG, blob.getETag());
+        attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+        attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+        attributes.put(ATTR_NAME_LANG, null);
+        attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
+    }
+
+    private static void applyCommonAttributes(final Map<String, String> attributes, final BlobClient blobClient) {
+        attributes.put(ATTR_NAME_CONTAINER, blobClient.getContainerName());
+        attributes.put(ATTR_NAME_BLOBNAME, blobClient.getBlobName());
+        attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+    }

Review Comment:
   See my summary.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -137,10 +139,18 @@ protected Map<String, String> initCommonExpressionLanguageAttributes() {
         return attributes;
     }
 
-    protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String containerName, String blobName, int blobLength) {
+    protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile, String containerName, String blobName) throws UnsupportedEncodingException {
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_CONTAINER, containerName);
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBNAME, blobName);
-        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, blobName));
+        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
+                String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, URLEncoder.encode(
+                        blobName,
+                        StandardCharsets.US_ASCII.name()
+                ).replace("+", "%20"))
+        );
+    }
+
+    protected void assertFlowFileResultBlobAttributes(MockFlowFile flowFile, int blobLength) throws UnsupportedEncodingException {

Review Comment:
   ```suggestion
       protected void assertFlowFileResultBlobAttributes(MockFlowFile flowFile, int blobLength) {
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java:
##########
@@ -136,7 +139,30 @@ public void testPutBlobToExistingBlob() throws Exception {
 
         runProcessor(BLOB_DATA);
 
-        assertFailure(BLOB_DATA);
+        MockFlowFile flowFile = assertFailure(BLOB_DATA, "BlobAlreadyExists");
+        assertEquals(flowFile.getAttribute(ATTR_NAME_IGNORED), null);
+    }
+
+    @Test
+    public void testPutBlobToExistingBlobConflictStrategyIgnore() throws Exception {
+        uploadBlob(BLOB_NAME, BLOB_DATA);
+        runner.setProperty(PutAzureBlobStorage_v12.CONFLICT_RESOLUTION, AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION.getValue());
+
+        runProcessor(BLOB_DATA);
+
+        MockFlowFile flowFile = assertIgnored(getContainerName(), BLOB_NAME);
+        assertEquals(flowFile.getAttribute(ATTR_NAME_ERROR_CODE), BlobErrorCode.BLOB_ALREADY_EXISTS.toString());

Review Comment:
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -75,7 +88,8 @@
         @WritesAttribute(attribute = ATTR_NAME_MIME_TYPE, description = ATTR_DESCRIPTION_MIME_TYPE),
         @WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG),
         @WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP),
-        @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
+        @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH),
+        @WritesAttribute(attribute = ATTR_NAME_ERROR_CODE, description = ATTR_DESCRIPTION_ERROR_CODE)})

Review Comment:
   ```suggestion
           @WritesAttribute(attribute = ATTR_NAME_ERROR_CODE, description = ATTR_DESCRIPTION_ERROR_CODE),
           @WritesAttribute(attribute = ATTR_NAME_IGNORED, description = ATTR_DESCRIPTION_IGNORED)})
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);

Review Comment:
   ```suggestion
               final Map<String, String> attributes = new HashMap<>();
               applyStandardBlobAttributes(attributes, blobClient);
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -35,18 +43,21 @@
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
 import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
 
-import java.io.BufferedInputStream;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static com.azure.core.util.FluxUtil.toFluxByteBuffer;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
+import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_ERROR_CODE;
 import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_ETAG;

Review Comment:
   ```suggestion
   import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_ETAG;
   import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_IGNORED;
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);
+            final boolean ignore = conflictResolution == AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            long length = flowFile.getSize();
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    long length = flowFile.getSize();
+                    applyUploadResultAttributes(attributes, blob, BlobType.BLOCK_BLOB, length);
+                    if (ignore) {
+                        attributes.put(ATTR_NAME_IGNORED, "false");
+                    }
+                }
+            } catch (BlobStorageException e) {
+                final BlobErrorCode errorCode = e.getErrorCode();
+                flowFile = session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());

Review Comment:
   We don't have to apply error code in case of ignore resolution.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java:
##########
@@ -45,4 +45,8 @@ public final class BlobAttributes {
     public static final String ATTR_NAME_LENGTH = "azure.length";
     public static final String ATTR_DESCRIPTION_LENGTH = "Length of the blob";
 
+    public static final String ATTR_NAME_ERROR_CODE = "azure.error.code";
+    public static final String ATTR_DESCRIPTION_ERROR_CODE = "Error code reported during blob operation";
+
+    public static final String ATTR_NAME_IGNORED = "azure.ignored";

Review Comment:
   Description is missing.
   ```suggestion
       public static final String ATTR_NAME_IGNORED = "azure.ignored";
       public static final String ATTR_DESCRIPTION_IGNORED = "When Conflict Resolution Strategy is 'ignore', " +
               "this property will be true/false depending on whether the blob was ignored.";
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -16,9 +16,17 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
+import com.azure.core.http.rest.Response;
+import com.azure.core.util.Context;
 import com.azure.storage.blob.BlobClient;
 import com.azure.storage.blob.BlobContainerClient;
 import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobErrorCode;
+import com.azure.storage.blob.models.BlobRequestConditions;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.BlobType;
+import com.azure.storage.blob.models.BlockBlobItem;

Review Comment:
   Won't be needed when other suggestions are accepted.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);

Review Comment:
   ```suggestion
               final BlobClient blobClient = containerClient.getBlobClient(blobName);
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);
+            final boolean ignore = conflictResolution == AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            long length = flowFile.getSize();
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    long length = flowFile.getSize();
+                    applyUploadResultAttributes(attributes, blob, BlobType.BLOCK_BLOB, length);
+                    if (ignore) {
+                        attributes.put(ATTR_NAME_IGNORED, "false");
+                    }
+                }
+            } catch (BlobStorageException e) {
+                final BlobErrorCode errorCode = e.getErrorCode();
+                flowFile = session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS && ignore) {
+                    getLogger().info("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);
+                    attributes.put(ATTR_NAME_IGNORED, "true");
+                } else {
+                    throw e;

Review Comment:
   We only need to apply an error code when we cannot recover.
   ```suggestion
                       throw e;
                       flowFile = session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java:
##########
@@ -164,7 +164,7 @@ protected Map<String, String> createBlobAttributesMap(BlobClient blobClient) {
         Map<String, String> attributes = new HashMap<>();
 
         BlobProperties properties = blobClient.getProperties();
-        String primaryUri = String.format("%s/%s", blobClient.getContainerClient().getBlobContainerUrl(), blobClient.getBlobName());
+        String primaryUri = blobClient.getBlobUrl();

Review Comment:
   Because of this change, some tests in `ITFetchAzureBlobStorage_v12` are failing. I don't create a suggestion here because my summary comment would overwrite it.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -34,6 +34,8 @@
 import org.junit.jupiter.api.BeforeEach;
 
 import java.io.ByteArrayInputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;

Review Comment:
   Won't be needed when other suggestions are accepted.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -137,10 +139,18 @@ protected Map<String, String> initCommonExpressionLanguageAttributes() {
         return attributes;
     }
 
-    protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String containerName, String blobName, int blobLength) {
+    protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile, String containerName, String blobName) throws UnsupportedEncodingException {

Review Comment:
   ```suggestion
       protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile, String containerName, String blobName) {
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -137,10 +139,18 @@ protected Map<String, String> initCommonExpressionLanguageAttributes() {
         return attributes;
     }
 
-    protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String containerName, String blobName, int blobLength) {
+    protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile, String containerName, String blobName) throws UnsupportedEncodingException {
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_CONTAINER, containerName);
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBNAME, blobName);
-        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, blobName));
+        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
+                String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, URLEncoder.encode(
+                        blobName,
+                        StandardCharsets.US_ASCII.name()
+                ).replace("+", "%20"))
+        );

Review Comment:
   ```suggestion
           flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, blobName));
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java:
##########
@@ -18,10 +18,12 @@
 
 import com.azure.storage.blob.BlobClient;
 import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.models.BlobErrorCode;

Review Comment:
   Won't be needed when other suggestions are accepted.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java:
##########
@@ -32,11 +34,12 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_ERROR_CODE;

Review Comment:
   Won't be needed when other suggestions are accepted.
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r984483359


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictStrategy.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.nifi.components.AllowableValue;
+
+public enum AzureStorageConflictStrategy {

Review Comment:
   I know it's a bit long, but probably I'd rename this class to `AzureStorageConflictResolutionStrategy`. Later in a separate pr, we could presumably remove the verbose AzureStorage prefix since the package already classifies that.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            Relationship relationship = REL_SUCCESS;
+
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
 
-            long length = flowFile.getSize();
+            try {
+                if (conflictStrategy == AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) {
+                    OffsetDateTime ifUnmodifiedSince = OffsetDateTime.ofInstant(
+                            Instant.ofEpochMilli(flowFile.getEntryDate()),
+                            ZoneId.systemDefault()
+                    );
+                    blobRequestConditions.setIfUnmodifiedSince(ifUnmodifiedSince);
+                } else if (conflictStrategy != AzureStorageConflictStrategy.OVERWRITE) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                }
+            } catch (BlobStorageException e) {
+                if (conflictStrategy == AzureStorageConflictStrategy.FAIL) throw e;
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS || errorCode == BlobErrorCode.CONDITION_NOT_MET) {
+                    relationship = REL_SKIPPED;

Review Comment:
   This feature is also implemented in the ADLS processors where the flowFile is routed to `REL_SUCCESS` when the file is skipped. I think it is better to follow the same logic between these processors.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            Relationship relationship = REL_SUCCESS;
+
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
 
-            long length = flowFile.getSize();
+            try {
+                if (conflictStrategy == AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) {
+                    OffsetDateTime ifUnmodifiedSince = OffsetDateTime.ofInstant(
+                            Instant.ofEpochMilli(flowFile.getEntryDate()),

Review Comment:
   Is there a use case when entryDate is useful? Almost 100% of the time, flowFile.entryDate will be newer than the modified date on azure. I'd imagine that `OVERWRITE_IF_SOURCE_NEWER` option would be useful when the use case is to synchronize files fetched by GetFile with azure. In that situation, the modified date of the original file could be used instead. All in all, I would instead rely on a flowFile attribute.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictStrategy.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.nifi.components.AllowableValue;
+
+public enum AzureStorageConflictStrategy {
+    FAIL("Fail", "Fail if the target blob already exists"),

Review Comment:
   The same feature is implemented in the ADSL processors. If possible, please use the same Strategy names that already exist there.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            Relationship relationship = REL_SUCCESS;
+
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
 
-            long length = flowFile.getSize();
+            try {
+                if (conflictStrategy == AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) {
+                    OffsetDateTime ifUnmodifiedSince = OffsetDateTime.ofInstant(
+                            Instant.ofEpochMilli(flowFile.getEntryDate()),
+                            ZoneId.systemDefault()
+                    );
+                    blobRequestConditions.setIfUnmodifiedSince(ifUnmodifiedSince);
+                } else if (conflictStrategy != AzureStorageConflictStrategy.OVERWRITE) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);

Review Comment:
   If `OVERWRITE_IF_SOURCE_NEWER` is not needed I'd consider using `upload()` with the `boolean overwrite` parameter. (See my summary comment)



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -91,14 +103,48 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
                     "will fail if the container does not exist.")
             .build();
 
+    public static final PropertyDescriptor CONFLICT_STRATEGY = new PropertyDescriptor.Builder()
+            .name("conflict-strategy")

Review Comment:
   I'd rename this property to `conflict-resolution-strategy`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r991284850


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);

Review Comment:
   It does but if this line is omitted we get:
   ```
   org.opentest4j.AssertionFailedError: Attribute lang does not exist ==> 
   Expected :true
   Actual   :false
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1015668696


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +148,40 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyStandardBlobAttributes(attributes, blobClient);
+            final boolean ignore = conflictResolution == AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            long length = flowFile.getSize();
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);

Review Comment:
   Fixed in 4ed94f868666295c12d16c1696151a4156e8ac5b.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on PR #6443:
URL: https://github.com/apache/nifi/pull/6443#issuecomment-1311520820

   @exceptionfactory this should be all good now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1002149676


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
             long length = flowFile.getSize();
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    attributes.put(ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
+                    attributes.put(ATTR_NAME_ETAG, blob.getETag());
+                    attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+                    attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+                }
+            } catch (BlobStorageException e) {
+                if (conflictResolution == AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+                    throw e;
+                }
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {
+                    getLogger().warn("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);
+                    attributes.putAll(createBlobAttributesMap(blobClient));
+                    attributes.put(ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());

Review Comment:
   I think applying error code only makes sense if downstream processors can utilize it. Otherwise, we can get it from the log. I'd remove it, but I don't feel strongly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r995688796


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
             long length = flowFile.getSize();
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    attributes.put(ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
+                    attributes.put(ATTR_NAME_ETAG, blob.getETag());
+                    attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+                    attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+                }
+            } catch (BlobStorageException e) {
+                if (conflictResolution == AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+                    throw e;
+                }
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {
+                    getLogger().warn("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);
+                    attributes.putAll(createBlobAttributesMap(blobClient));
+                    attributes.put(ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());

Review Comment:
   Perhaps we can always set `ATTR_NAME_ERROR_CODE` in case of an error, but in addition, provide "ignored" (to "true" I suppose) in case of "BLOB_ALREADY_EXISTS".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r995685308


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();

Review Comment:
   @nandorsoma the attributes are not applied in case of an error; the code will not reach the `session.putAllAttributes(flowFile, attributes)` in this case.
   
   According to the [documentation](https://learn.microsoft.com/en-us/dotnet/api/azure.storage.blobs.models.blobproperties.contenttype?view=azure-dotnet#definition), the default mimetype is "application/octet-stream".
   
   It would seem that `ATTR_NAME_LANG` should be there exactly because of backwards compatibility. It is expected that if the upload succeeds, then that attribute is there and by default, equal to null.
   
   In case of an error, the attribute will not be set (similar to above).
   
   I'll take a look about cleaning up the code to make it less verbose around the important bits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r996266396


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
             long length = flowFile.getSize();
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    attributes.put(ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
+                    attributes.put(ATTR_NAME_ETAG, blob.getETag());
+                    attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+                    attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+                }
+            } catch (BlobStorageException e) {
+                if (conflictResolution == AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+                    throw e;
+                }
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {
+                    getLogger().warn("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);

Review Comment:
   I have changed it to "info" – most processors are configured for WARN and it is perhaps slightly more informational than debug level I think, especially since we have defined success as also including these ignored blobs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1007319900


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);

Review Comment:
   ```suggestion
               final Map<String, String> attributes = new HashMap<>();
               applyStandardBlobAttributes(attributes, blobClient);
               flowFile = session.putAllAttributes(flowFile, attributes); // so they will be available in failed flowfiles too
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r984694899


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            Relationship relationship = REL_SUCCESS;
+
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
 
-            long length = flowFile.getSize();
+            try {
+                if (conflictStrategy == AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) {
+                    OffsetDateTime ifUnmodifiedSince = OffsetDateTime.ofInstant(
+                            Instant.ofEpochMilli(flowFile.getEntryDate()),
+                            ZoneId.systemDefault()
+                    );
+                    blobRequestConditions.setIfUnmodifiedSince(ifUnmodifiedSince);
+                } else if (conflictStrategy != AzureStorageConflictStrategy.OVERWRITE) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                }
+            } catch (BlobStorageException e) {
+                if (conflictStrategy == AzureStorageConflictStrategy.FAIL) throw e;
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS || errorCode == BlobErrorCode.CONDITION_NOT_MET) {
+                    relationship = REL_SKIPPED;

Review Comment:
   It's true that we could perhaps instead just introduce a new flow file attribute such as "skipped" to indicate that the blob already existed and was _not overwritten_.
   
   That might work better than introducing a new relationship actually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe closed pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe closed pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)
URL: https://github.com/apache/nifi/pull/6443


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory closed pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
exceptionfactory closed pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)
URL: https://github.com/apache/nifi/pull/6443


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r995686073


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -137,10 +139,15 @@ protected Map<String, String> initCommonExpressionLanguageAttributes() {
         return attributes;
     }
 
-    protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String containerName, String blobName, int blobLength) {
+    protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String containerName, String blobName, int blobLength) throws UnsupportedEncodingException {
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_CONTAINER, containerName);
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBNAME, blobName);
-        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, blobName));
+        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
+                String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, URLEncoder.encode(
+                        blobName,
+                        StandardCharsets.US_ASCII.name()
+                ).replace("+", "%20"))

Review Comment:
   Unfortunately, this is required for successful operation. The "+" symbol is not allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
exceptionfactory commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r991269688


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);

Review Comment:
   Adding a `null` here seems unnecessary.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java:
##########
@@ -45,4 +45,6 @@ public final class BlobAttributes {
     public static final String ATTR_NAME_LENGTH = "azure.length";
     public static final String ATTR_DESCRIPTION_LENGTH = "Length of the blob";
 
+    public static final String ATTR_NAME_ERROR = "azure.error";

Review Comment:
   Recommend changing this to `azure.error.code` for more precision, since there could be other error attributes.
   ```suggestion
       public static final String ATTR_NAME_ERROR_CODE = "azure.error.code";
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");

Review Comment:
   Is there any response property that could indicate the content type as opposed to hard-coding this value?



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/pom.xml:
##########
@@ -40,5 +40,9 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-storage-blob</artifactId>
+        </dependency>

Review Comment:
   Although this module has some other Azure libraries, it is best to avoid introducing third-party libraries in Controller Service API modules as much as possible. It appears the only reason is for the `BlobErrorCode` in `AzureStorageConflictResolutionStrategy`. Recommend removing this dependency and replacing the direct reference to `BLOB_ALREADY_EXISTS` with a regular string description.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml:
##########
@@ -169,6 +169,12 @@
             <version>1.18.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-record-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>

Review Comment:
   Is there a particular reason for the addition of this dependency?



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictResolutionStrategy.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import com.azure.storage.blob.models.BlobErrorCode;
+import org.apache.nifi.components.DescribedValue;
+
+public enum AzureStorageConflictResolutionStrategy implements DescribedValue {
+    FAIL_RESOLUTION("fail", "Fail if the blob already exists"),
+    IGNORE_RESOLUTION("ignore",
+            String.format(
+                "Ignore if the blob already exists; the 'azure.error' attribute will be set to the value '%s'",
+                BlobErrorCode.BLOB_ALREADY_EXISTS.toString()

Review Comment:
   As noted in the Maven configuration, recommend removing this referencing to avoid the unnecessary dependency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1002150279


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();

Review Comment:
   Sorry I wasn't clear. It's my bad. I meant when execution jumps to the catch block and recovers in case of `ignore` resolution. But meanwhile, I realized since `ignore` is not the default behavior, it won't break anything.
   
   Nevertheless, I still have issues with the two properties I mentioned previously. Applying `mime.type` and `lang` before the upload is not ok because it can fail, and then these values don't make sense. I think it's better to apply them after upload, and then we can get the values from the client, which is more precise.
   
   The test you mentioned was written when ignore wasn't in the game; that's why it is probably failing. But I think we can modify it accordingly. Which test is that?
   
   Also, applying the other three attributes before uploading could have a purpose. Still, if we don't have a use case for that, then I think it would be clearer to apply all attributes after a successful upload, like in the original version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1002147108


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();

Review Comment:
   Sorry I wasn't clear. It's my bad. I meant when execution jumps to the catch block and recovers in case of `ignore` resolution. But meanwhile, I realized since `ignore` is not the default behavior, it won't break anything.
   
   Nevertheless, I still have issues with the two properties I mentioned previously. Applying `mime.type` and `lang` before the upload is not ok because it can fail, and then these values don't make sense. I think it's better to apply them after upload, and then we can get the values from the client, which is more precise.
   
   The test you mentioned was written when ignore wasn't in the game; that's why it is probably failing. But I think we can modify it accordingly. Which test is that?
   
   Also, applying the other three attributes before uploading could have a purpose. Still, if we don't have a use case for that, then I think it would be clearer to apply all attributes after a successful upload, like in the original version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1002223463


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
             long length = flowFile.getSize();
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    attributes.put(ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
+                    attributes.put(ATTR_NAME_ETAG, blob.getETag());
+                    attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+                    attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+                }
+            } catch (BlobStorageException e) {
+                if (conflictResolution == AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+                    throw e;
+                }
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {
+                    getLogger().warn("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);
+                    attributes.putAll(createBlobAttributesMap(blobClient));
+                    attributes.put(ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());

Review Comment:
   I do think it's rather nice to have it available in an attribute because in the log it is not isolated particularly well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1004604470


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);
+
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    long length = flowFile.getSize();
+                    applyUploadResultAttributes(attributes, blob, BlobType.BLOCK_BLOB, length);
 
-            long length = flowFile.getSize();
+                }
+            } catch (BlobStorageException e) {
+                final BlobErrorCode errorCode = e.getErrorCode();
+                session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());

Review Comment:
   From reading the code, I don't see how it could work without your suggestion. How strange!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1002157224


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);
+
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    long length = flowFile.getSize();
+                    applyUploadResultAttributes(attributes, blob, BlobType.BLOCK_BLOB, length);
 
-            long length = flowFile.getSize();
+                }
+            } catch (BlobStorageException e) {
+                final BlobErrorCode errorCode = e.getErrorCode();
+                session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());

Review Comment:
   When you put an attribute to the flowfile you need to use the returned flowfile instead of the original one.
   ```suggestion
                   flowFile = session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java:
##########
@@ -45,4 +45,8 @@ public final class BlobAttributes {
     public static final String ATTR_NAME_LENGTH = "azure.length";
     public static final String ATTR_DESCRIPTION_LENGTH = "Length of the blob";
 
+    public static final String ATTR_NAME_ERROR_CODE = "azure.error.code";
+    public static final String ATTR_DESCRIPTION_ERROR_CODE = "Error code reported during blob operation";
+
+    public static final String ATTR_NAME_IGNORED = "ignored";

Review Comment:
   I'd prefix this attribute with `azure.` to prevent attribute name collision with other processors.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);
+
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    long length = flowFile.getSize();
+                    applyUploadResultAttributes(attributes, blob, BlobType.BLOCK_BLOB, length);
 
-            long length = flowFile.getSize();
+                }
+            } catch (BlobStorageException e) {
+                final BlobErrorCode errorCode = e.getErrorCode();
+                session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                final boolean alreadyExists = errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS;
+                final boolean ignore = conflictResolution == AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+                // If the blob already exists, we always add an attribute "ignored"; depending on the value of
+                // conflict resolution, this will be true or false.
+                if (alreadyExists) {
+                    session.putAttribute(flowFile, ATTR_NAME_IGNORED, String.valueOf(ignore));

Review Comment:
   The comment is not valid. When the resolution is `replace`, this attribute is not applied, though the blob can already exist. I'd only apply this attribute in case of `ignore` resolution and set the value depending on the presence of the file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on PR #6443:
URL: https://github.com/apache/nifi/pull/6443#issuecomment-1305146623

   Bump


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r996273173


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
             long length = flowFile.getSize();
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    attributes.put(ATTR_NAME_BLOBTYPE, BlobType.BLOCK_BLOB.toString());
+                    attributes.put(ATTR_NAME_ETAG, blob.getETag());
+                    attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+                    attributes.put(ATTR_NAME_TIMESTAMP, String.valueOf(blob.getLastModified()));
+                }
+            } catch (BlobStorageException e) {
+                if (conflictResolution == AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+                    throw e;
+                }
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {

Review Comment:
   In 0d394ed99f93f2694df444ad86db13621fcb9c7f this has now become clearer for the reader:
   ```java
   if (alreadyExists && ignore) {
       getLogger().info("Blob already exists: remote blob not modified. Transferring {} to success", flowFile);
   } else {
       throw e;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1002231096


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);
+
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    long length = flowFile.getSize();
+                    applyUploadResultAttributes(attributes, blob, BlobType.BLOCK_BLOB, length);
 
-            long length = flowFile.getSize();
+                }
+            } catch (BlobStorageException e) {
+                final BlobErrorCode errorCode = e.getErrorCode();
+                session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                final boolean alreadyExists = errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS;
+                final boolean ignore = conflictResolution == AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+                // If the blob already exists, we always add an attribute "ignored"; depending on the value of
+                // conflict resolution, this will be true or false.
+                if (alreadyExists) {
+                    session.putAttribute(flowFile, ATTR_NAME_IGNORED, String.valueOf(ignore));

Review Comment:
   Good point. I have changed this in 4a822e15df25ad5eb24ddf02d54a55799213a003 now.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java:
##########
@@ -45,4 +45,8 @@ public final class BlobAttributes {
     public static final String ATTR_NAME_LENGTH = "azure.length";
     public static final String ATTR_DESCRIPTION_LENGTH = "Length of the blob";
 
+    public static final String ATTR_NAME_ERROR_CODE = "azure.error.code";
+    public static final String ATTR_DESCRIPTION_ERROR_CODE = "Error code reported during blob operation";
+
+    public static final String ATTR_NAME_IGNORED = "ignored";

Review Comment:
   Changed in ddb9b0cb4394c015e3ad89bf71a2d3b369b06ca4.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1004368761


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);
+
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    long length = flowFile.getSize();
+                    applyUploadResultAttributes(attributes, blob, BlobType.BLOCK_BLOB, length);
 
-            long length = flowFile.getSize();
+                }
+            } catch (BlobStorageException e) {
+                final BlobErrorCode errorCode = e.getErrorCode();
+                session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());

Review Comment:
   Yes, there is a difference; moreover, it may work in prod environment, but there are scenarios when the attribute set this way will be ignored. Tbh I'm not familiar with the internal part of this mechanism, but I also received such comments from more experienced contributors in the past.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1007319900


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);

Review Comment:
   ```suggestion
               final Map<String, String> attributes = new HashMap<>();
               applyStandardBlobAttributes(attributes, blobClient);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1007602431


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -137,10 +139,18 @@ protected Map<String, String> initCommonExpressionLanguageAttributes() {
         return attributes;
     }
 
-    protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String containerName, String blobName, int blobLength) {
+    protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile, String containerName, String blobName) throws UnsupportedEncodingException {
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_CONTAINER, containerName);
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBNAME, blobName);
-        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, blobName));
+        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
+                String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(), containerName, URLEncoder.encode(
+                        blobName,
+                        StandardCharsets.US_ASCII.name()
+                ).replace("+", "%20"))
+        );

Review Comment:
   This breaks real live code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] nandorsoma commented on pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
nandorsoma commented on PR #6443:
URL: https://github.com/apache/nifi/pull/6443#issuecomment-1313757028

   @malthe, if it is not mandatory, please try to avoid using force push.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1015668696


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +148,40 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyStandardBlobAttributes(attributes, blobClient);
+            final boolean ignore = conflictResolution == AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+            try {
+                if (conflictResolution != AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            long length = flowFile.getSize();
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);

Review Comment:
   Fixed in 38b7ff1fb49eaca13c10999acbf407589ae6f53e.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r991283281


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
-            BlobClient blobClient = containerClient.getBlobClient(blobName);
 
+            BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            attributes.put(ATTR_NAME_CONTAINER, containerName);
+            attributes.put(ATTR_NAME_BLOBNAME, blobName);
+            attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+            attributes.put(ATTR_NAME_LANG, null);
+            attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");

Review Comment:
   As far as I can tell there is not – but this is the value you will get if you don't set it during upload and we currently do not support setting it.
   
   (And if we did and the user had specified a value, we would know what it was.)
   
   I think it's a reasonable bet that the default mime type will not be changed. It basically assumes it's binary if you don't specify something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r991317413


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml:
##########
@@ -169,6 +169,12 @@
             <version>1.18.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-record-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>

Review Comment:
   I don't think so – I have removed it now; must be an artifact of the coding process.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r991317749


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java:
##########
@@ -45,4 +45,6 @@ public final class BlobAttributes {
     public static final String ATTR_NAME_LENGTH = "azure.length";
     public static final String ATTR_DESCRIPTION_LENGTH = "Length of the blob";
 
+    public static final String ATTR_NAME_ERROR = "azure.error";

Review Comment:
   Agreed – and fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r984699844


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            Relationship relationship = REL_SUCCESS;
+
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
 
-            long length = flowFile.getSize();
+            try {
+                if (conflictStrategy == AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) {
+                    OffsetDateTime ifUnmodifiedSince = OffsetDateTime.ofInstant(
+                            Instant.ofEpochMilli(flowFile.getEntryDate()),
+                            ZoneId.systemDefault()
+                    );
+                    blobRequestConditions.setIfUnmodifiedSince(ifUnmodifiedSince);
+                } else if (conflictStrategy != AzureStorageConflictStrategy.OVERWRITE) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);

Review Comment:
   I can perhaps talk a little bit about `OVERWRITE_IF_SOURCE_NEWER` – it's because I have a follow-up PR more or less ready which adds functionality to this processor to copy not from the flow file content, but from another blob (possibly in another storage account) using the [Put Blob From URL](https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob-from-url) service call.
   
   See https://issues.apache.org/jira/browse/NIFI-9972.
   
   This option is also supported by azcopy, the reason being that it's supported directly by the service such that it's able to conditionally check this before processing the request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] malthe commented on a diff in pull request #6443: NIFI-10491: Azure Blob Storage should have conflict resolution (overwrite mode)

Posted by GitBox <gi...@apache.org>.
malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r985298456


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            Relationship relationship = REL_SUCCESS;
+
+            final BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
 
-            long length = flowFile.getSize();
+            try {
+                if (conflictStrategy == AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) {
+                    OffsetDateTime ifUnmodifiedSince = OffsetDateTime.ofInstant(
+                            Instant.ofEpochMilli(flowFile.getEntryDate()),

Review Comment:
   While I did experiment with a pair of property to control the format and attribute name in case that resolution strategy was set to `OVERWRITE_IF_SOURCE_NEWER` – I ultimately decided that there is probably not a strong enough use-case to warrant the added complexity.
   
   And it's easy to bring this functionality in if the need should arise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org