You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/05/27 07:14:34 UTC
[nifi] branch master updated: NIFI-7445: Add Conflict Resolution
property to PutAzureDataLakeStorage processor
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 1dd0e92 NIFI-7445: Add Conflict Resolution property to PutAzureDataLakeStorage processor
1dd0e92 is described below
commit 1dd0e920402d20917bf3bf421ce14ab3dc0749a5
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Fri May 15 18:04:00 2020 +0200
NIFI-7445: Add Conflict Resolution property to PutAzureDataLakeStorage processor
NIFI-7445: Add Conflict Resolution property to PutAzureDataLakeStorage processor
Made warning and error messages more informative.
Refactored flowFile assertion in the tests.
This closes #4287.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../azure/storage/PutAzureDataLakeStorage.java | 92 +++++++++++++++++-----
.../azure/storage/ITPutAzureDataLakeStorage.java | 73 ++++++++++++++---
2 files changed, 136 insertions(+), 29 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index 7d395a3..081372f 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -20,6 +20,7 @@ import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -28,15 +29,20 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import java.io.BufferedInputStream;
import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -51,6 +57,33 @@ import java.util.concurrent.TimeUnit;
@InputRequirement(Requirement.INPUT_REQUIRED)
public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+ public static final String FAIL_RESOLUTION = "fail";
+ public static final String REPLACE_RESOLUTION = "replace";
+ public static final String IGNORE_RESOLUTION = "ignore";
+
+ public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
+ .name("conflict-resolution-strategy")
+ .displayName("Conflict Resolution Strategy")
+ .description("Indicates what should happen when a file with the same name already exists in the output directory")
+ .required(true)
+ .defaultValue(FAIL_RESOLUTION)
+ .allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION)
+ .build();
+
+ private List<PropertyDescriptor> properties;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> props = new ArrayList<>(super.getSupportedPropertyDescriptors());
+ props.add(CONFLICT_RESOLUTION);
+ properties = Collections.unmodifiableList(props);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
@@ -76,29 +109,50 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem);
final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
- final DataLakeFileClient fileClient = directoryClient.createFile(fileName);
+ final DataLakeFileClient fileClient;
+
+ final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
+ boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION);
+
+ try {
+ fileClient = directoryClient.createFile(fileName, overwrite);
+
+ final long length = flowFile.getSize();
+ if (length > 0) {
+ try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
+ fileClient.append(in, 0, length);
+ }
+ }
+ fileClient.flush(length);
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("azure.filesystem", fileSystem);
+ attributes.put("azure.directory", directory);
+ attributes.put("azure.filename", fileName);
+ attributes.put("azure.primaryUri", fileClient.getFileUrl());
+ attributes.put("azure.length", String.valueOf(length));
+ flowFile = session.putAllAttributes(flowFile, attributes);
- final long length = flowFile.getSize();
- if (length > 0) {
- try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
- fileClient.append(in, 0, length);
+ session.transfer(flowFile, REL_SUCCESS);
+ final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis);
+ } catch (DataLakeStorageException dlsException) {
+ if (dlsException.getStatusCode() == 409) {
+ if (conflictResolution.equals(IGNORE_RESOLUTION)) {
+ session.transfer(flowFile, REL_SUCCESS);
+ String warningMessage = String.format("File with the same name already exists. " +
+ "Remote file not modified. " +
+ "Transferring {} to success due to %s being set to '%s'.", CONFLICT_RESOLUTION.getDisplayName(), conflictResolution);
+ getLogger().warn(warningMessage, new Object[]{flowFile});
+ } else {
+ throw dlsException;
+ }
+ } else {
+ throw dlsException;
}
}
- fileClient.flush(length);
-
- final Map<String, String> attributes = new HashMap<>();
- attributes.put("azure.filesystem", fileSystem);
- attributes.put("azure.directory", directory);
- attributes.put("azure.filename", fileName);
- attributes.put("azure.primaryUri", fileClient.getFileUrl());
- attributes.put("azure.length", String.valueOf(length));
- flowFile = session.putAllAttributes(flowFile, attributes);
-
- session.transfer(flowFile, REL_SUCCESS);
- final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis);
} catch (Exception e) {
- getLogger().error("Failed to create file", e);
+ getLogger().error("Failed to create file on Azure Data Lake Storage", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
index 2cf53ec..049031e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
@@ -69,6 +69,28 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
+ public void testPutFileToExistingDirectoryWithReplaceResolution() throws Exception {
+ fileSystemClient.createDirectory(DIRECTORY);
+
+ runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.REPLACE_RESOLUTION);
+
+ runProcessor(FILE_DATA);
+
+ assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
+ }
+
+ @Test
+ public void testPutFileToExistingDirectoryWithIgnoreResolution() throws Exception {
+ fileSystemClient.createDirectory(DIRECTORY);
+
+ runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.IGNORE_RESOLUTION);
+
+ runProcessor(FILE_DATA);
+
+ assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
+ }
+
+ @Test
public void testPutFileToNonExistingDirectory() throws Exception {
runProcessor(FILE_DATA);
@@ -156,11 +178,8 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess(directory, fileName, FILE_DATA);
}
- @Ignore
- // the existing file gets overwritten without error
- // seems to be a bug in the Azure lib
@Test
- public void testPutFileToExistingFile() {
+ public void testPutFileToExistingFileWithFailResolution() {
fileSystemClient.createFile(String.format("%s/%s", DIRECTORY, FILE_NAME));
runProcessor(FILE_DATA);
@@ -169,6 +188,29 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
@Test
+ public void testPutFileToExistingFileWithReplaceResolution() throws Exception {
+ fileSystemClient.createFile(String.format("%s/%s", DIRECTORY, FILE_NAME));
+
+ runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.REPLACE_RESOLUTION);
+
+ runProcessor(FILE_DATA);
+
+ assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
+ }
+
+ @Test
+ public void testPutFileToExistingFileWithIgnoreResolution() throws Exception {
+ String azureFileContent = "AzureFileContent";
+ createDirectoryAndUploadFile(DIRECTORY, FILE_NAME, azureFileContent);
+
+ runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.IGNORE_RESOLUTION);
+
+ runProcessor(FILE_DATA);
+
+ assertSuccessWithIgnoreResolution(DIRECTORY, FILE_NAME, FILE_DATA, azureFileContent.getBytes());
+ }
+
+ @Test
public void testPutFileWithEL() throws Exception {
Map<String, String> attributes = createAttributesMap();
setELProperties();
@@ -227,17 +269,18 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
private void assertSuccess(String directory, String fileName, byte[] fileData) throws Exception {
- assertFlowFile(directory, fileName, fileData);
+ assertFlowFile(fileData, fileName, directory);
assertAzureFile(directory, fileName, fileData);
assertProvenanceEvents();
}
- private void assertFlowFile(String directory, String fileName, byte[] fileData) throws Exception {
- runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1);
-
- MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS).get(0);
+ private void assertSuccessWithIgnoreResolution(String directory, String fileName, byte[] fileData, byte[] azureFileData) throws Exception {
+ assertFlowFile(fileData);
+ assertAzureFile(directory, fileName, azureFileData);
+ }
- flowFile.assertContentEquals(fileData);
+ private void assertFlowFile(byte[] fileData, String fileName, String directory) throws Exception {
+ MockFlowFile flowFile = assertFlowFile(fileData);
flowFile.assertAttributeEquals("azure.filesystem", fileSystemName);
flowFile.assertAttributeEquals("azure.directory", directory);
@@ -253,6 +296,16 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
flowFile.assertAttributeEquals("azure.length", Integer.toString(fileData.length));
}
+ private MockFlowFile assertFlowFile(byte[] fileData) throws Exception {
+ runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS).get(0);
+
+ flowFile.assertContentEquals(fileData);
+
+ return flowFile;
+ }
+
private void assertAzureFile(String directory, String fileName, byte[] fileData) {
DataLakeFileClient fileClient;
if (StringUtils.isNotEmpty(directory)) {