You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/05/27 07:14:34 UTC

[nifi] branch master updated: NIFI-7445: Add Conflict Resolution property to PutAzureDataLakeStorage processor

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dd0e92  NIFI-7445: Add Conflict Resolution property to PutAzureDataLakeStorage processor
1dd0e92 is described below

commit 1dd0e920402d20917bf3bf421ce14ab3dc0749a5
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Fri May 15 18:04:00 2020 +0200

    NIFI-7445: Add Conflict Resolution property to PutAzureDataLakeStorage processor
    
    NIFI-7445: Add Conflict Resolution property to PutAzureDataLakeStorage processor
    Made warning and error messages more informative.
    Refactored flowFile assertion in the tests.
    
    This closes #4287.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../azure/storage/PutAzureDataLakeStorage.java     | 92 +++++++++++++++++-----
 .../azure/storage/ITPutAzureDataLakeStorage.java   | 73 ++++++++++++++---
 2 files changed, 136 insertions(+), 29 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index 7d395a3..081372f 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -20,6 +20,7 @@ import com.azure.storage.file.datalake.DataLakeDirectoryClient;
 import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.DataLakeFileSystemClient;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -28,15 +29,20 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
 
 import java.io.BufferedInputStream;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -51,6 +57,33 @@ import java.util.concurrent.TimeUnit;
 @InputRequirement(Requirement.INPUT_REQUIRED)
 public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
 
+    public static final String FAIL_RESOLUTION = "fail";
+    public static final String REPLACE_RESOLUTION = "replace";
+    public static final String IGNORE_RESOLUTION = "ignore";
+
+    public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+            .name("conflict-resolution-strategy")
+            .displayName("Conflict Resolution Strategy")
+            .description("Indicates what should happen when a file with the same name already exists in the output directory")
+            .required(true)
+            .defaultValue(FAIL_RESOLUTION)
+            .allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION)
+            .build();
+
+    private List<PropertyDescriptor> properties;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> props = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        props.add(CONFLICT_RESOLUTION);
+        properties = Collections.unmodifiableList(props);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         FlowFile flowFile = session.get();
@@ -76,29 +109,50 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
             final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
             final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
             final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
-            final DataLakeFileClient fileClient = directoryClient.createFile(fileName);
+            final DataLakeFileClient fileClient;
+
+            final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
+            boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION);
+
+            try {
+                fileClient = directoryClient.createFile(fileName, overwrite);
+
+                final long length = flowFile.getSize();
+                if (length > 0) {
+                    try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
+                        fileClient.append(in, 0, length);
+                    }
+                }
+                fileClient.flush(length);
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("azure.filesystem", fileSystem);
+                attributes.put("azure.directory", directory);
+                attributes.put("azure.filename", fileName);
+                attributes.put("azure.primaryUri", fileClient.getFileUrl());
+                attributes.put("azure.length", String.valueOf(length));
+                flowFile = session.putAllAttributes(flowFile, attributes);
 
-            final long length = flowFile.getSize();
-            if (length > 0) {
-                try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
-                    fileClient.append(in, 0, length);
+                session.transfer(flowFile, REL_SUCCESS);
+                final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis);
+            } catch (DataLakeStorageException dlsException) {
+                if (dlsException.getStatusCode() == 409) {
+                    if (conflictResolution.equals(IGNORE_RESOLUTION)) {
+                        session.transfer(flowFile, REL_SUCCESS);
+                        String warningMessage = String.format("File with the same name already exists. " +
+                                "Remote file not modified. " +
+                                "Transferring {} to success due to %s being set to '%s'.", CONFLICT_RESOLUTION.getDisplayName(), conflictResolution);
+                        getLogger().warn(warningMessage, new Object[]{flowFile});
+                    } else {
+                        throw dlsException;
+                    }
+                } else {
+                    throw dlsException;
                 }
             }
-            fileClient.flush(length);
-
-            final Map<String, String> attributes = new HashMap<>();
-            attributes.put("azure.filesystem", fileSystem);
-            attributes.put("azure.directory", directory);
-            attributes.put("azure.filename", fileName);
-            attributes.put("azure.primaryUri", fileClient.getFileUrl());
-            attributes.put("azure.length", String.valueOf(length));
-            flowFile = session.putAllAttributes(flowFile, attributes);
-
-            session.transfer(flowFile, REL_SUCCESS);
-            final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-            session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis);
         } catch (Exception e) {
-            getLogger().error("Failed to create file", e);
+            getLogger().error("Failed to create file on Azure Data Lake Storage", e);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
         }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
index 2cf53ec..049031e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
@@ -69,6 +69,28 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
     }
 
     @Test
+    public void testPutFileToExistingDirectoryWithReplaceResolution() throws Exception {
+        fileSystemClient.createDirectory(DIRECTORY);
+
+        runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.REPLACE_RESOLUTION);
+
+        runProcessor(FILE_DATA);
+
+        assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
+    }
+
+    @Test
+    public void testPutFileToExistingDirectoryWithIgnoreResolution() throws Exception {
+        fileSystemClient.createDirectory(DIRECTORY);
+
+        runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.IGNORE_RESOLUTION);
+
+        runProcessor(FILE_DATA);
+
+        assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
+    }
+
+    @Test
     public void testPutFileToNonExistingDirectory() throws Exception {
         runProcessor(FILE_DATA);
 
@@ -156,11 +178,8 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
         assertSuccess(directory, fileName, FILE_DATA);
     }
 
-    @Ignore
-    // the existing file gets overwritten without error
-    // seems to be a bug in the Azure lib
     @Test
-    public void testPutFileToExistingFile() {
+    public void testPutFileToExistingFileWithFailResolution() {
         fileSystemClient.createFile(String.format("%s/%s", DIRECTORY, FILE_NAME));
 
         runProcessor(FILE_DATA);
@@ -169,6 +188,29 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
     }
 
     @Test
+    public void testPutFileToExistingFileWithReplaceResolution() throws Exception {
+        fileSystemClient.createFile(String.format("%s/%s", DIRECTORY, FILE_NAME));
+
+        runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.REPLACE_RESOLUTION);
+
+        runProcessor(FILE_DATA);
+
+        assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
+    }
+
+    @Test
+    public void testPutFileToExistingFileWithIgnoreResolution() throws Exception {
+        String azureFileContent = "AzureFileContent";
+        createDirectoryAndUploadFile(DIRECTORY, FILE_NAME, azureFileContent);
+
+        runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.IGNORE_RESOLUTION);
+
+        runProcessor(FILE_DATA);
+
+        assertSuccessWithIgnoreResolution(DIRECTORY, FILE_NAME, FILE_DATA, azureFileContent.getBytes());
+    }
+
+    @Test
     public void testPutFileWithEL() throws Exception {
         Map<String, String> attributes = createAttributesMap();
         setELProperties();
@@ -227,17 +269,18 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
     }
 
     private void assertSuccess(String directory, String fileName, byte[] fileData) throws Exception {
-        assertFlowFile(directory, fileName, fileData);
+        assertFlowFile(fileData, fileName, directory);
         assertAzureFile(directory, fileName, fileData);
         assertProvenanceEvents();
     }
 
-    private void assertFlowFile(String directory, String fileName, byte[] fileData) throws Exception {
-        runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1);
-
-        MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS).get(0);
+    private void assertSuccessWithIgnoreResolution(String directory, String fileName, byte[] fileData, byte[] azureFileData) throws Exception {
+        assertFlowFile(fileData);
+        assertAzureFile(directory, fileName, azureFileData);
+    }
 
-        flowFile.assertContentEquals(fileData);
+    private void assertFlowFile(byte[] fileData, String fileName, String directory) throws Exception {
+        MockFlowFile flowFile = assertFlowFile(fileData);
 
         flowFile.assertAttributeEquals("azure.filesystem", fileSystemName);
         flowFile.assertAttributeEquals("azure.directory", directory);
@@ -253,6 +296,16 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
         flowFile.assertAttributeEquals("azure.length", Integer.toString(fileData.length));
     }
 
+    private MockFlowFile assertFlowFile(byte[] fileData) throws Exception {
+        runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS).get(0);
+
+        flowFile.assertContentEquals(fileData);
+
+        return flowFile;
+    }
+
     private void assertAzureFile(String directory, String fileName, byte[] fileData) {
         DataLakeFileClient fileClient;
         if (StringUtils.isNotEmpty(directory)) {