You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2020/05/20 16:15:05 UTC

[GitHub] [nifi] pgyori opened a new pull request #4287: NIFI-7445: Add Conflict Resolution property to PutAzureDataLakeStorage processor

pgyori opened a new pull request #4287:
URL: https://github.com/apache/nifi/pull/4287


   https://issues.apache.org/jira/browse/NIFI-7445
   
   #### Description of PR
   
   PutAzureDataLakeStorage processor now has a conflict resolution property. With the help of this property the user can specify how the processor should behave in case there is a name conflict when uploading a file Azure.
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `master`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on both JDK 8 and JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] turcsanyip commented on a change in pull request #4287: NIFI-7445: Add Conflict Resolution property to PutAzureDataLakeStorage processor

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4287:
URL: https://github.com/apache/nifi/pull/4287#discussion_r430546794



##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
##########
@@ -76,27 +109,45 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
             final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
             final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
             final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
-            final DataLakeFileClient fileClient = directoryClient.createFile(fileName);
+            final DataLakeFileClient fileClient;
+
+            final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
+            boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION);
+
+            try {
+                fileClient = directoryClient.createFile(fileName, overwrite);
+
+                final long length = flowFile.getSize();
+                if (length > 0) {
+                    try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
+                        fileClient.append(in, 0, length);
+                    }
+                }
+                fileClient.flush(length);
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("azure.filesystem", fileSystem);
+                attributes.put("azure.directory", directory);
+                attributes.put("azure.filename", fileName);
+                attributes.put("azure.primaryUri", fileClient.getFileUrl());
+                attributes.put("azure.length", String.valueOf(length));
+                flowFile = session.putAllAttributes(flowFile, attributes);
 
-            final long length = flowFile.getSize();
-            if (length > 0) {
-                try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
-                    fileClient.append(in, 0, length);
+                session.transfer(flowFile, REL_SUCCESS);
+                final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis);
+            } catch (DataLakeStorageException dlsException) {
+                if (dlsException.getStatusCode() == 409) {
+                    if (conflictResolution.equals(IGNORE_RESOLUTION)) {
+                        session.transfer(flowFile, REL_SUCCESS);
+                        getLogger().warn("Transferring {} to success because file with same name already exists", new Object[]{flowFile});

Review comment:
       The warning message does not properly describe the cause and the effect: file exists => transfer to success
   The reason for transferring to success is the 'Ignore' resolution policy rather.
   It should also be mentioned that the file has not been overwritten in Azure.
   

##########
File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
##########
@@ -253,6 +300,14 @@ private void assertFlowFile(String directory, String fileName, byte[] fileData)
         flowFile.assertAttributeEquals("azure.length", Integer.toString(fileData.length));
     }
 
+    private void assertSimpleFlowFile(byte[] fileData) throws Exception {

Review comment:
       I could be called from `assertFlowFile` because the first section of that method is the same.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on pull request #4287: NIFI-7445: Add Conflict Resolution property to PutAzureDataLakeStorage processor

Posted by GitBox <gi...@apache.org>.
pgyori commented on pull request #4287:
URL: https://github.com/apache/nifi/pull/4287#issuecomment-634140865


   Thank you @turcsanyip ! Fixed your findings in the next commit.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] asfgit closed pull request #4287: NIFI-7445: Add Conflict Resolution property to PutAzureDataLakeStorage processor

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #4287:
URL: https://github.com/apache/nifi/pull/4287


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org