You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/04/16 09:53:34 UTC

[nifi] branch master updated: NIFI-7334 Adding FetchDataLakeStorage Processor to provide native support for Azure Data lake Gen 2 Storage.

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 58118cf  NIFI-7334 Adding FetchDataLakeStorage Processor to provide native support for Azure Data lake Gen 2 Storage.
58118cf is described below

commit 58118cf904cfb58ea119e5507a9a9549cda53bd9
Author: muazmaz <mu...@microsoft.com>
AuthorDate: Tue Apr 14 23:36:19 2020 -0700

    NIFI-7334 Adding FetchDataLakeStorage Processor to provide native support for Azure Data lake Gen 2 Storage.
    
    NIFI-7334 Update to FetchDataLakeStorage Processor
    
    This closes #4212.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../AbstractAzureDataLakeStorageProcessor.java     |  2 +-
 .../azure/storage/DeleteAzureDataLakeStorage.java  |  2 +-
 ...Storage.java => FetchAzureDataLakeStorage.java} | 19 ++++----
 .../azure/storage/PutAzureDataLakeStorage.java     |  2 +-
 .../services/org.apache.nifi.processor.Processor   |  3 +-
 .../azure/storage/ITFetchAzureDataLakeStorage.java | 56 ++++++++++++++++++++++
 6 files changed, 72 insertions(+), 12 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index 83c5c14..40d276c 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -107,7 +107,7 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .required(true)
-            .defaultValue("nifi.${uuid}")
+            .defaultValue("${azure.filename}")
             .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description(
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
index 5cbf7f0..8403841 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
@@ -35,7 +35,7 @@ import com.azure.storage.file.datalake.DataLakeServiceClient;
 import java.util.concurrent.TimeUnit;
 
 @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
-@SeeAlso({PutAzureDataLakeStorage.class})
+@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class})
 @CapabilityDescription("Deletes the provided file from Azure Data Lake Storage")
 @InputRequirement(Requirement.INPUT_REQUIRED)
 public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
similarity index 87%
copy from nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
copy to nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
index 5cbf7f0..d3068fb 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -28,17 +30,16 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
 
 import com.azure.storage.file.datalake.DataLakeDirectoryClient;
-import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.DataLakeFileSystemClient;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeFileClient;
 
-import java.util.concurrent.TimeUnit;
 
 @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
-@SeeAlso({PutAzureDataLakeStorage.class})
-@CapabilityDescription("Deletes the provided file from Azure Data Lake Storage")
+@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class})
+@CapabilityDescription("Fetch the provided file from Azure Data Lake Storage")
 @InputRequirement(Requirement.INPUT_REQUIRED)
-public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
+public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor {
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
@@ -49,6 +50,7 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
         }
 
         final long startNanos = System.nanoTime();
+
         try {
             final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue();
             final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
@@ -56,15 +58,16 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
             final DataLakeServiceClient storageClient = getStorageClient(context, flowFile);
             final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem);
             final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory);
-
             final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName);
-            fileClient.delete();
+
+            flowFile = session.write(flowFile, os -> fileClient.read(os));
+            session.getProvenanceReporter().modifyContent(flowFile);
             session.transfer(flowFile, REL_SUCCESS);
 
             final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
             session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis);
         } catch (Exception e) {
-            getLogger().error("Failed to delete the specified file from Azure Data Lake Storage,  due to {}", e);
+            getLogger().error("Failure to fetch file from Azure Data Lake Storage, due to {}", e);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
         }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index f67c98e..b59c366 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -42,7 +42,7 @@ import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
 
 
 @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
-@SeeAlso({DeleteAzureDataLakeStorage.class})
+@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class})
 @CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2")
 @WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"),
         @WritesAttribute(attribute = "azure.directory", description = "The name of the Azure Directory"),
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 9f8417d..6e09330 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -22,4 +22,5 @@ org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage
 org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage
 org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage
 org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage
-org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage
\ No newline at end of file
+org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage
+org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
new file mode 100644
index 0000000..93a414f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+public class ITFetchAzureDataLakeStorage extends AbstractAzureBlobStorageIT {
+
+    @Override
+    protected Class<? extends Processor> getProcessorClass() {
+        return FetchAzureDataLakeStorage.class;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        runner.setProperty(PutAzureDataLakeStorage.FILE, TEST_FILE_NAME);
+
+    }
+
+    @Test
+    public void testFetchFile() throws Exception {
+        runner.assertValid();
+        runner.enqueue(new byte[0]);
+        runner.run();
+
+        assertResult();
+    }
+
+    private void assertResult() throws Exception {
+        runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFilesForRelationship) {
+            flowFile.assertContentEquals("0123456789".getBytes());
+            flowFile.assertAttributeEquals("azure.length", "10");
+        }
+    }
+}