You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/05/18 21:52:29 UTC
[nifi] branch master updated: NIFI-7446: FetchAzureDataLakeStorage
processor now throws exception when the specified path points to a
directory
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 9aae58f NIFI-7446: FetchAzureDataLakeStorage processor now throws exception when the specified path points to a directory
9aae58f is described below
commit 9aae58f1178cacb6ee4814f26288bf8ca5150d71
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Thu May 14 14:35:34 2020 +0200
NIFI-7446: FetchAzureDataLakeStorage processor now throws exception when the specified path points to a directory
A newer version (12.1.1) of azure-storage-file-datalake is imported.
This closes #4273.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../nifi-azure-bundle/nifi-azure-processors/pom.xml | 2 +-
.../processors/azure/storage/FetchAzureDataLakeStorage.java | 4 ++++
.../processors/azure/storage/ITFetchAzureDataLakeStorage.java | 10 ++++++----
3 files changed, 11 insertions(+), 5 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index b0e80df..0a3602d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -73,7 +73,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-file-datalake</artifactId>
- <version>12.0.1</version>
+ <version>12.1.1</version>
</dependency>
<!-- overriding jackson-core in azure-storage -->
<dependency>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
index a616439..2d80ea3 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
@@ -67,6 +67,10 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
+ if (fileClient.getProperties().isDirectory()) {
+ throw new ProcessException(FILE.getDisplayName() + " (" + fileName + ") points to a directory. Full path: " + fileClient.getFilePath());
+ }
+
flowFile = session.write(flowFile, os -> fileClient.read(os));
session.getProvenanceReporter().modifyContent(flowFile);
session.transfer(flowFile, REL_SUCCESS);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
index f08b75e..de35508 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
@@ -112,7 +112,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
String inputFlowFileContent = "InputFlowFileContent";
createDirectoryAndUploadFile(directory, filename1, fileContent1);
- createDirectoryAndUploadFile(directory, filename2, fileContent2);
+ uploadFile(directory, filename2, fileContent2);
// WHEN
// THEN
@@ -161,7 +161,6 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
}
- @Ignore("Fetching a directory currently produces an empty flowfile. This will change in the future, and this test case will need to be modified.")
@Test
public void testFetchDirectory() {
// GIVEN
@@ -170,13 +169,12 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
String filename = "testFile.txt";
String fileContent = "AzureFileContent";
String inputFlowFileContent = "InputFlowFileContent";
- String expectedFlowFileContent = "";
createDirectoryAndUploadFile(parentDirectory + "/" + childDirectory, filename, fileContent);
// WHEN
// THEN
- testSuccessfulFetch(fileSystemName, parentDirectory, childDirectory, inputFlowFileContent, expectedFlowFileContent);
+ testFailedFetchWithProcessException(fileSystemName, parentDirectory, childDirectory, inputFlowFileContent, inputFlowFileContent);
}
@Test
@@ -391,6 +389,10 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
assertFailure(expectedFlowFileContent);
}
+ private void testFailedFetchWithProcessException(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent) {
+ testFailedFetchWithProcessException(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent);
+ }
+
private void testFailedFetchWithProcessException(String fileSystem, String directory, String filename, Map<String, String> attributes,
String inputFlowFileContent, String expectedFlowFileContent) {
// GIVEN