You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/04/16 09:53:34 UTC
[nifi] branch master updated: NIFI-7334 Adding FetchDataLakeStorage
Processor to provide native support for Azure Data lake Gen 2 Storage.
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 58118cf NIFI-7334 Adding FetchDataLakeStorage Processor to provide native support for Azure Data lake Gen 2 Storage.
58118cf is described below
commit 58118cf904cfb58ea119e5507a9a9549cda53bd9
Author: muazmaz <mu...@microsoft.com>
AuthorDate: Tue Apr 14 23:36:19 2020 -0700
NIFI-7334 Adding FetchDataLakeStorage Processor to provide native support for Azure Data lake Gen 2 Storage.
NIFI-7334 Update to FetchDataLakeStorage Processor
This closes #4212.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../AbstractAzureDataLakeStorageProcessor.java | 2 +-
.../azure/storage/DeleteAzureDataLakeStorage.java | 2 +-
...Storage.java => FetchAzureDataLakeStorage.java} | 19 ++++----
.../azure/storage/PutAzureDataLakeStorage.java | 2 +-
.../services/org.apache.nifi.processor.Processor | 3 +-
.../azure/storage/ITFetchAzureDataLakeStorage.java | 56 ++++++++++++++++++++++
6 files changed, 72 insertions(+), 12 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index 83c5c14..40d276c 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -107,7 +107,7 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
- .defaultValue("nifi.${uuid}")
+ .defaultValue("${azure.filename}")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description(
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
index 5cbf7f0..8403841 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
@@ -35,7 +35,7 @@ import com.azure.storage.file.datalake.DataLakeServiceClient;
import java.util.concurrent.TimeUnit;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
-@SeeAlso({PutAzureDataLakeStorage.class})
+@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class})
@CapabilityDescription("Deletes the provided file from Azure Data Lake Storage")
@InputRequirement(Requirement.INPUT_REQUIRED)
public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
similarity index 87%
copy from nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
copy to nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
index 5cbf7f0..d3068fb 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.azure.storage;
+import java.util.concurrent.TimeUnit;
+
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -28,17 +30,16 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import com.azure.storage.file.datalake.DataLakeDirectoryClient;
-import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
-import java.util.concurrent.TimeUnit;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
-@SeeAlso({PutAzureDataLakeStorage.class})
-@CapabilityDescription("Deletes the provided file from Azure Data Lake Storage")
+@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class})
+@CapabilityDescription("Fetch the provided file from Azure Data Lake Storage")
@InputRequirement(Requirement.INPUT_REQUIRED)
-public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
@@ -49,6 +50,7 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
}
final long startNanos = System.nanoTime();
+
try {
final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
@@ -56,15 +58,16 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
-
final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
- fileClient.delete();
+
+ flowFile = session.write(flowFile, os -> fileClient.read(os));
+ session.getProvenanceReporter().modifyContent(flowFile);
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis);
} catch (Exception e) {
- getLogger().error("Failed to delete the specified file from Azure Data Lake Storage, due to {}", e);
+ getLogger().error("Failure to fetch file from Azure Data Lake Storage, due to {}", e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index f67c98e..b59c366 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -42,7 +42,7 @@ import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
-@SeeAlso({DeleteAzureDataLakeStorage.class})
+@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class})
@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
@WritesAttribute(attribute = "azure.directory", description = "The name of the Azure Directory"),
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 9f8417d..6e09330 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -22,4 +22,5 @@ org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage
org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage
org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage
org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage
-org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage
\ No newline at end of file
+org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage
+org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
new file mode 100644
index 0000000..93a414f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+public class ITFetchAzureDataLakeStorage extends AbstractAzureBlobStorageIT {
+
+ @Override
+ protected Class<? extends Processor> getProcessorClass() {
+ return FetchAzureDataLakeStorage.class;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ runner.setProperty(PutAzureDataLakeStorage.FILE, TEST_FILE_NAME);
+
+ }
+
+ @Test
+ public void testFetchFile() throws Exception {
+ runner.assertValid();
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ assertResult();
+ }
+
+ private void assertResult() throws Exception {
+ runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1);
+ List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS);
+ for (MockFlowFile flowFile : flowFilesForRelationship) {
+ flowFile.assertContentEquals("0123456789".getBytes());
+ flowFile.assertAttributeEquals("azure.length", "10");
+ }
+ }
+}