You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/09/30 09:16:42 UTC

[nifi] branch main updated: NIFI-7830: Support large files in PutAzureDataLakeStorage

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f9ae3bb  NIFI-7830: Support large files in PutAzureDataLakeStorage
f9ae3bb is described below

commit f9ae3bb9c970cd8d6d1d9e10f07cab9bdb66baa9
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Sat Sep 26 00:32:23 2020 +0200

    NIFI-7830: Support large files in PutAzureDataLakeStorage
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4556.
---
 .../nifi-azure-processors/pom.xml                  |  2 +-
 .../azure/storage/PutAzureDataLakeStorage.java     | 29 +++++++++++++++++++---
 .../storage/AbstractAzureDataLakeStorageIT.java    |  3 +--
 .../azure/storage/ITFetchAzureDataLakeStorage.java |  5 +++-
 .../azure/storage/ITPutAzureDataLakeStorage.java   |  5 +++-
 5 files changed, 36 insertions(+), 8 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 3edcb78..2837f3d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -83,7 +83,7 @@
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-storage-file-datalake</artifactId>
-            <version>12.1.1</version>
+            <version>12.2.0</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index ae67e3e..410320d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -21,6 +21,8 @@ import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.DataLakeFileSystemClient;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
 import com.azure.storage.file.datalake.models.DataLakeStorageException;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.io.input.BoundedInputStream;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -71,6 +73,8 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
     public static final String REPLACE_RESOLUTION = "replace";
     public static final String IGNORE_RESOLUTION = "ignore";
 
+    public static long MAX_CHUNK_SIZE = 100 * 1024 * 1024; // current chunk limit is 100 MiB on Azure
+
     public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
             .name("conflict-resolution-strategy")
             .displayName("Conflict Resolution Strategy")
@@ -120,11 +124,10 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
 
                 final long length = flowFile.getSize();
                 if (length > 0) {
-                    try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) {
-                        fileClient.append(in, 0, length);
+                    try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
+                        uploadContent(fileClient, bufferedIn, length);
                     }
                 }
-                fileClient.flush(length);
 
                 final Map<String, String> attributes = new HashMap<>();
                 attributes.put(ATTR_NAME_FILESYSTEM, fileSystem);
@@ -158,4 +161,24 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
             session.transfer(flowFile, REL_FAILURE);
         }
     }
+
+    @VisibleForTesting
+    static void uploadContent(DataLakeFileClient fileClient, InputStream in, long length) {
+        long chunkStart = 0;
+        long chunkSize;
+
+        while (chunkStart < length) {
+            chunkSize = Math.min(length - chunkStart, MAX_CHUNK_SIZE);
+
+            // com.azure.storage.common.Utility.convertStreamToByteBuffer() throws an exception
+            // if there are more available bytes in the stream after reading the chunk
+            BoundedInputStream boundedIn = new BoundedInputStream(in, chunkSize);
+
+            fileClient.append(boundedIn, chunkStart, chunkSize);
+
+            chunkStart += chunkSize;
+        }
+
+        fileClient.flush(length);
+    }
 }
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
index 553621b..fccdbf9 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java
@@ -84,8 +84,7 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag
         DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
         DataLakeFileClient fileClient = directoryClient.createFile(filename);
 
-        fileClient.append(new ByteArrayInputStream(fileContentBytes), 0, fileContentBytes.length);
-        fileClient.flush(fileContentBytes.length);
+        PutAzureDataLakeStorage.uploadContent(fileClient, new ByteArrayInputStream(fileContentBytes), fileContentBytes.length);
     }
 
     protected void uploadFile(TestFile testFile) {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
index de35508..4dc9123 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
@@ -29,6 +29,7 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -222,7 +223,9 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
         // GIVEN
         String directory = "TestDirectory";
         String filename = "testFile.txt";
-        byte[] fileContentBytes = new byte[100_000_000];
+        Random random = new Random();
+        byte[] fileContentBytes = new byte[120_000_000];
+        random.nextBytes(fileContentBytes);
         String fileContent = new String(fileContentBytes);
         String inputFlowFileContent = "InputFlowFileContent";
 
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
index 324e20d..fa3c684 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
@@ -32,6 +32,7 @@ import org.junit.Test;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -137,7 +138,9 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
     // ignore excessive test with larger file size
     @Test
     public void testPutBigFile() throws Exception {
-        byte[] fileData = new byte[100_000_000];
+        Random random = new Random();
+        byte[] fileData = new byte[120_000_000];
+        random.nextBytes(fileData);
 
         runProcessor(fileData);