You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/05/13 13:03:04 UTC
[nifi] branch master updated: NIFI-7367: Add tests for
FetchAzureDataLakeStorage
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 9608fe3 NIFI-7367: Add tests for FetchAzureDataLakeStorage
9608fe3 is described below
commit 9608fe33fa4a3f39ee6ddca54184ccc85ef1a163
Author: Peter Gyori <pe...@gmail.com>
AuthorDate: Wed May 6 18:28:47 2020 +0200
NIFI-7367: Add tests for FetchAzureDataLakeStorage
NIFI-7367: Negative test cases for expression language in FetchAzureDataLakeStorage
FetchAzureDataLakeStorage throws exception when filesystem or filename is blank.
Fixed logged error messages in all 3 of the Delete, Fetch and Put ADLS processors.
testFetchDirectory test case marked as ignored.
This closes #4257.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../azure/storage/DeleteAzureDataLakeStorage.java | 2 +-
.../azure/storage/FetchAzureDataLakeStorage.java | 29 +-
.../azure/storage/PutAzureDataLakeStorage.java | 2 +-
.../storage/AbstractAzureDataLakeStorageIT.java | 19 +
.../azure/storage/ITFetchAzureDataLakeStorage.java | 413 ++++++++++++++++++++-
.../azure/storage/ITPutAzureDataLakeStorage.java | 15 +
6 files changed, 450 insertions(+), 30 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
index bf29087..86b23aa 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
@@ -64,7 +64,7 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().invokeRemoteProcess(flowFile, fileClient.getFileUrl(), "File deleted");
} catch (Exception e) {
- getLogger().error("Failed to delete the specified file from Azure Data Lake Storage, due to {}", e);
+ getLogger().error("Failed to delete the specified file from Azure Data Lake Storage", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
index 1d0e8d4..a616439 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
@@ -16,8 +16,11 @@
*/
package org.apache.nifi.processors.azure.storage;
-import java.util.concurrent.TimeUnit;
-
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.DataLakeFileSystemClient;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -29,11 +32,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
-import com.azure.storage.file.datalake.DataLakeDirectoryClient;
-import com.azure.storage.file.datalake.DataLakeFileSystemClient;
-import com.azure.storage.file.datalake.DataLakeServiceClient;
-import com.azure.storage.file.datalake.DataLakeFileClient;
-
+import java.util.concurrent.TimeUnit;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class})
@@ -44,17 +43,25 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
-
if (flowFile == null) {
return;
}
final long startNanos = System.nanoTime();
-
try {
final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue();
+
+ if (StringUtils.isBlank(fileSystem)) {
+ throw new ProcessException(FILESYSTEM.getDisplayName() + " property evaluated to empty string. " +
+ FILESYSTEM.getDisplayName() + " must be specified as a non-empty string.");
+ }
+ if (StringUtils.isBlank(fileName)) {
+ throw new ProcessException(FILE.getDisplayName() + " property evaluated to empty string. " +
+ FILE.getDisplayName() + " must be specified as a non-empty string.");
+ }
+
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
@@ -67,9 +74,9 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().fetch(flowFile, fileClient.getFileUrl(), transferMillis);
} catch (Exception e) {
- getLogger().error("Failure to fetch file from Azure Data Lake Storage, due to {}", e);
+ getLogger().error("Failure to fetch file from Azure Data Lake Storage", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
}
-}
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index 94fa90f..7d395a3 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -98,7 +98,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis);
} catch (Exception e) {
- getLogger().error("Failed to create file, due to {}", e);
+ getLogger().error("Failed to create file", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
index a9b2bd9..e0b591a 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
@@ -17,6 +17,8 @@
package org.apache.nifi.processors.azure.storage;
import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeDirectoryClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
@@ -24,6 +26,7 @@ import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.junit.After;
import org.junit.Before;
+import java.io.ByteArrayInputStream;
import java.util.UUID;
public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorageIT {
@@ -54,4 +57,20 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
.credential(new StorageSharedKeyCredential(getAccountName(), getAccountKey()))
.buildClient();
}
+
+ protected void uploadFile(String directory, String filename, String fileContent) {
+ byte[] fileContentBytes = fileContent.getBytes();
+
+ DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
+ DataLakeFileClient fileClient = directoryClient.createFile(filename);
+
+ fileClient.append(new ByteArrayInputStream(fileContentBytes), 0, fileContentBytes.length);
+ fileClient.flush(fileContentBytes.length);
+ }
+
+ protected void createDirectoryAndUploadFile(String directory, String filename, String fileContent) {
+ fileSystemClient.createDirectory(directory);
+
+ uploadFile(directory, filename, fileContent);
+ }
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
index a242ede..f08b75e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
@@ -16,43 +16,422 @@
*/
package org.apache.nifi.processors.azure.storage;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
+import com.google.common.collect.Sets;
import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
-import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
-import java.util.List;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
-public class ITFetchAzureDataLakeStorage extends AbstractAzureBlobStorageIT {
+import static org.junit.Assert.assertEquals;
+
+public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
@Override
protected Class<? extends Processor> getProcessorClass() {
return FetchAzureDataLakeStorage.class;
}
- @Before
- public void setUp() throws Exception {
- runner.setProperty(PutAzureDataLakeStorage.FILE, TEST_FILE_NAME);
+ @Test
+ public void testFetchFileFromDirectory() {
+ // GIVEN
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
+ }
+
+ @Test
+ public void testFetchFileFromRoot() {
+ // GIVEN
+ String directory= "";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ uploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
+ }
+
+ @Test
+ public void testFetchFileFromDirectoryWithWhitespace() {
+ // GIVEN
+ String directory= "A Test Directory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
+ }
+
+ @Test
+ public void testFetchFileWithWhitespaceFromDirectory() {
+ // GIVEN
+ String directory= "TestDirectory";
+ String filename = "A test file.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
+ }
+
+ @Test
+ public void testFetchFileCaseSensitiveFilename() {
+ // GIVEN
+ String directory = "TestDirectory";
+ String filename1 = "testFile.txt";
+ String filename2 = "testfile.txt";
+ String fileContent1 = "ContentOfFile1";
+ String fileContent2 = "ContentOfFile2";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename1, fileContent1);
+ createDirectoryAndUploadFile(directory, filename2, fileContent2);
+
+ // WHEN
+ // THEN
+ testSuccessfulFetch(fileSystemName, directory, filename1, inputFlowFileContent, fileContent1);
+ runner.clearProvenanceEvents();
+ runner.clearTransferState();
+ testSuccessfulFetch(fileSystemName, directory, filename2, inputFlowFileContent, fileContent2);
+ }
+
+ @Test
+ public void testFetchFileCaseSensitiveDirectoryName() {
+ // GIVEN
+ String directory1 = "TestDirectory";
+ String directory2 = "Testdirectory";
+ String filename1 = "testFile1.txt";
+ String filename2 = "testFile2.txt";
+ String fileContent1 = "ContentOfFile1";
+ String fileContent2 = "ContentOfFile2";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory1, filename1, fileContent1);
+ createDirectoryAndUploadFile(directory2, filename2, fileContent2);
+
+ // WHEN
+ // THEN
+ testSuccessfulFetch(fileSystemName, directory1, filename1, inputFlowFileContent, fileContent1);
+ runner.clearProvenanceEvents();
+ runner.clearTransferState();
+ testSuccessfulFetch(fileSystemName, directory2, filename2, inputFlowFileContent, fileContent2);
+ }
+
+ @Test
+ public void testFetchFileFromDeepDirectoryStructure() {
+ // GIVEN
+ String directory= "Directory01/Directory02/Directory03/Directory04/Directory05/Directory06/Directory07/"
+ + "Directory08/Directory09/Directory10/Directory11/Directory12/Directory13/Directory14/Directory15/"
+ + "Directory16/Directory17/Directory18/Directory19/Directory20/TestDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
+ }
+
+ @Ignore("Fetching a directory currently produces an empty flowfile. This will change in the future, and this test case will need to be modified.")
+ @Test
+ public void testFetchDirectory() {
+ // GIVEN
+ String parentDirectory = "ParentDirectory";
+ String childDirectory = "ChildDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+ String expectedFlowFileContent = "";
+
+ createDirectoryAndUploadFile(parentDirectory + "/" + childDirectory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulFetch(fileSystemName, parentDirectory, childDirectory, inputFlowFileContent, expectedFlowFileContent);
+ }
+
+ @Test
+ public void testFetchNonExistentFileSystem() {
+ // GIVEN
+ String fileSystem = "NonExistentFileSystem";
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ // WHEN
+ // THEN
+ testFailedFetch(fileSystem, directory, filename, inputFlowFileContent, inputFlowFileContent, 400);
+ }
+
+ @Test
+ public void testFetchNonExistentDirectory() {
+ // GIVEN
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ // WHEN
+ // THEN
+ testFailedFetch(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent, 404);
+ }
+
+ @Test
+ public void testFetchNonExistentFile() {
+ // GIVEN
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ fileSystemClient.createDirectory(directory);
+
+ // WHEN
+ // THEN
+ testFailedFetch(fileSystemName, directory, filename, inputFlowFileContent, inputFlowFileContent, 404);
+ }
+
+ @Ignore("Takes some time, only recommended for manual testing.")
+ @Test
+ public void testFetchLargeFile() {
+ // GIVEN
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ byte[] fileContentBytes = new byte[100_000_000];
+ String fileContent = new String(fileContentBytes);
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, fileContent);
+ }
+
+ @Test
+ public void testFetchInvalidDirectoryName() {
+ // GIVEN
+ String directory = "TestDirectory";
+ String invalidDirectoryName = "Test/\\Directory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testFailedFetch(fileSystemName, invalidDirectoryName, filename, inputFlowFileContent, inputFlowFileContent, 404);
+ }
+
+ @Test
+ public void testFetchInvalidFilename() {
+ // GIVEN
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String invalidFilename = "test/\\File.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testFailedFetch(fileSystemName, directory, invalidFilename, inputFlowFileContent, inputFlowFileContent, 404);
+ }
+
+ @Test
+ public void testFetchUsingExpressionLanguage() {
+ // GIVEN
+ String expLangFileSystem = "az.filesystem";
+ String expLangDirectory = "az.directory";
+ String expLangFilename = "az.filename";
+
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(expLangFileSystem, fileSystemName);
+ attributes.put(expLangDirectory, directory);
+ attributes.put(expLangFilename, filename);
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testSuccessfulFetch("${" + expLangFileSystem + "}",
+ "${" + expLangDirectory + "}",
+ "${" + expLangFilename + "}",
+ attributes,
+ inputFlowFileContent,
+ fileContent);
+ }
+
+ @Test
+ public void testFetchUsingExpressionLanguageFileSystemIsNotSpecified() {
+ // GIVEN
+ String expLangFileSystem = "az.filesystem";
+ String expLangDirectory = "az.directory";
+ String expLangFilename = "az.filename";
+
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(expLangDirectory, directory);
+ attributes.put(expLangFilename, filename);
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testFailedFetchWithProcessException("${" + expLangFileSystem + "}",
+ "${" + expLangDirectory + "}",
+ "${" + expLangFilename + "}",
+ attributes,
+ inputFlowFileContent,
+ inputFlowFileContent);
}
- @Ignore
@Test
- public void testFetchFile() throws Exception {
+ public void testFetchUsingExpressionLanguageFilenameIsNotSpecified() {
+ // GIVEN
+ String expLangFileSystem = "az.filesystem";
+ String expLangDirectory = "az.directory";
+ String expLangFilename = "az.filename";
+
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(expLangFileSystem, fileSystemName);
+ attributes.put(expLangDirectory, directory);
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ // WHEN
+ // THEN
+ testFailedFetchWithProcessException("${" + expLangFileSystem + "}",
+ "${" + expLangDirectory + "}",
+ "${" + expLangFilename + "}",
+ attributes,
+ inputFlowFileContent,
+ inputFlowFileContent);
+ }
+
+ private void testSuccessfulFetch(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent) {
+ testSuccessfulFetch(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent);
+ }
+
+ private void testSuccessfulFetch(String fileSystem, String directory, String filename, Map<String, String> attributes, String inputFlowFileContent, String expectedFlowFileContent) {
+ // GIVEN
+ Set<ProvenanceEventType> expectedEventTypes = Sets.newHashSet(ProvenanceEventType.CONTENT_MODIFIED, ProvenanceEventType.FETCH);
+
+ setRunnerProperties(fileSystem, directory, filename);
+
+ // WHEN
+ startRunner(inputFlowFileContent, attributes);
+
+ // THEN
+ assertSuccess(expectedFlowFileContent, expectedEventTypes);
+ }
+
+ private void testFailedFetch(String fileSystem, String directory, String filename, String inputFlowFileContent, String expectedFlowFileContent, int expectedErrorCode) {
+ testFailedFetch(fileSystem, directory, filename, Collections.emptyMap(), inputFlowFileContent, expectedFlowFileContent, expectedErrorCode);
+ }
+
+ private void testFailedFetch(String fileSystem, String directory, String filename, Map<String, String> attributes,
+ String inputFlowFileContent, String expectedFlowFileContent, int expectedErrorCode) {
+ // GIVEN
+ setRunnerProperties(fileSystem, directory, filename);
+
+ // WHEN
+ startRunner(inputFlowFileContent, attributes);
+
+ // THEN
+ DataLakeStorageException e = (DataLakeStorageException)runner.getLogger().getErrorMessages().get(0).getThrowable();
+ assertEquals(expectedErrorCode, e.getStatusCode());
+
+ assertFailure(expectedFlowFileContent);
+ }
+
+ private void testFailedFetchWithProcessException(String fileSystem, String directory, String filename, Map<String, String> attributes,
+ String inputFlowFileContent, String expectedFlowFileContent) {
+ // GIVEN
+ setRunnerProperties(fileSystem, directory, filename);
+
+ // WHEN
+ startRunner(inputFlowFileContent, attributes);
+
+ // THEN
+ Throwable exception = runner.getLogger().getErrorMessages().get(0).getThrowable();
+ assertEquals(ProcessException.class, exception.getClass());
+
+ assertFailure(expectedFlowFileContent);
+ }
+
+ private void setRunnerProperties(String fileSystem, String directory, String filename) {
+ runner.setProperty(FetchAzureDataLakeStorage.FILESYSTEM, fileSystem);
+ runner.setProperty(FetchAzureDataLakeStorage.DIRECTORY, directory);
+ runner.setProperty(FetchAzureDataLakeStorage.FILE, filename);
runner.assertValid();
- runner.enqueue(new byte[0]);
+ }
+
+ private void startRunner(String inputFlowFileContent, Map<String, String> attributes) {
+ runner.enqueue(inputFlowFileContent, attributes);
runner.run();
+ }
+
+ private void assertSuccess(String expectedFlowFileContent, Set<ProvenanceEventType> expectedEventTypes) {
+ runner.assertAllFlowFilesTransferred(FetchAzureDataLakeStorage.REL_SUCCESS, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchAzureDataLakeStorage.REL_SUCCESS).get(0);
+ flowFile.assertContentEquals(expectedFlowFileContent);
- assertResult();
+ Set<ProvenanceEventType> actualEventTypes = runner.getProvenanceEvents().stream()
+ .map(ProvenanceEventRecord::getEventType)
+ .collect(Collectors.toSet());
+ assertEquals(expectedEventTypes, actualEventTypes);
}
- private void assertResult() throws Exception {
- runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1);
- List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS);
- for (MockFlowFile flowFile : flowFilesForRelationship) {
- flowFile.assertContentEquals("0123456789".getBytes());
- flowFile.assertAttributeEquals("azure.length", "10");
- }
+ private void assertFailure(String expectedFlowFileContent) {
+ runner.assertAllFlowFilesTransferred(FetchAzureDataLakeStorage.REL_FAILURE, 1);
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(FetchAzureDataLakeStorage.REL_FAILURE).get(0);
+ flowFile.assertContentEquals(expectedFlowFileContent);
}
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
index 1a6105f..2cf53ec 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
@@ -18,9 +18,12 @@ package org.apache.nifi.processors.azure.storage;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.google.common.collect.Sets;
import com.google.common.net.UrlEscapers;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.Processor;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
import org.junit.Ignore;
@@ -29,6 +32,8 @@ import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -224,6 +229,7 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
private void assertSuccess(String directory, String fileName, byte[] fileData) throws Exception {
assertFlowFile(directory, fileName, fileData);
assertAzureFile(directory, fileName, fileData);
+ assertProvenanceEvents();
}
private void assertFlowFile(String directory, String fileName, byte[] fileData) throws Exception {
@@ -262,6 +268,15 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertEquals(fileData.length, fileClient.getProperties().getFileSize());
}
+ private void assertProvenanceEvents() {
+ Set<ProvenanceEventType> expectedEventTypes = Sets.newHashSet(ProvenanceEventType.SEND);
+
+ Set<ProvenanceEventType> actualEventTypes = runner.getProvenanceEvents().stream()
+ .map(ProvenanceEventRecord::getEventType)
+ .collect(Collectors.toSet());
+ assertEquals(expectedEventTypes, actualEventTypes);
+ }
+
private void assertFailure() {
runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_FAILURE, 1);
}