You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/10/02 17:12:51 UTC

[nifi] branch main updated: NIFI-7874: Fixed bug that results in buffering all S3 elements in heap before emitting any of them. Added Batch Size property to ListS3

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d37809  NIFI-7874: Fixed bug that results in buffering all S3 elements in heap before emitting any of them. Added Batch Size property to ListS3
5d37809 is described below

commit 5d37809d30aa8037c2306c875a0240248767d076
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Oct 2 10:42:08 2020 -0400

    NIFI-7874: Fixed bug that results in buffering all S3 elements in heap before emitting any of them. Added Batch Size property to ListS3
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4566.
---
 .../org/apache/nifi/processors/aws/s3/ListS3.java  | 38 +++++++++++++++-------
 .../nifi/processors/aws/s3/AbstractS3IT.java       |  4 +++
 2 files changed, 31 insertions(+), 11 deletions(-)

diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index b9ebcd6..4fa7079 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -56,6 +56,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
@@ -82,6 +83,7 @@ import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+
 @PrimaryNodeOnly
 @TriggerSerially
 @TriggerWhenEmpty
@@ -111,7 +113,7 @@ import org.apache.nifi.serialization.record.RecordSchema;
 @SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
 public class ListS3 extends AbstractS3Processor {
 
-    public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor DELIMITER = new Builder()
             .name("delimiter")
             .displayName("Delimiter")
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
@@ -121,7 +123,7 @@ public class ListS3 extends AbstractS3Processor {
                     "for the correct use of this field.")
             .build();
 
-    public static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor PREFIX = new Builder()
             .name("prefix")
             .displayName("Prefix")
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@@ -130,7 +132,7 @@ public class ListS3 extends AbstractS3Processor {
             .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
             .build();
 
-    public static final PropertyDescriptor USE_VERSIONS = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor USE_VERSIONS = new Builder()
             .name("use-versions")
             .displayName("Use Versions")
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
@@ -141,7 +143,7 @@ public class ListS3 extends AbstractS3Processor {
             .description("Specifies whether to use S3 versions, if applicable.  If false, only the latest version of each object will be returned.")
             .build();
 
-    public static final PropertyDescriptor LIST_TYPE = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor LIST_TYPE = new Builder()
             .name("list-type")
             .displayName("List Type")
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
@@ -154,7 +156,7 @@ public class ListS3 extends AbstractS3Processor {
             .description("Specifies whether to use the original List Objects or the newer List Objects Version 2 endpoint.")
             .build();
 
-    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor MIN_AGE = new Builder()
             .name("min-age")
             .displayName("Minimum Object Age")
             .description("The minimum age that an S3 object must be in order to be considered; any object younger than this amount of time (according to last modification date) will be ignored")
@@ -163,7 +165,7 @@ public class ListS3 extends AbstractS3Processor {
             .defaultValue("0 sec")
             .build();
 
-    public static final PropertyDescriptor WRITE_OBJECT_TAGS = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor WRITE_OBJECT_TAGS = new Builder()
             .name("write-s3-object-tags")
             .displayName("Write Object Tags")
             .description("If set to 'True', the tags associated with the S3 object will be written as FlowFile attributes")
@@ -171,7 +173,7 @@ public class ListS3 extends AbstractS3Processor {
             .allowableValues(new AllowableValue("true", "True"), new AllowableValue("false", "False"))
             .defaultValue("false")
             .build();
-    public static final PropertyDescriptor REQUESTER_PAYS = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor REQUESTER_PAYS = new Builder()
             .name("requester-pays")
             .displayName("Requester Pays")
             .required(true)
@@ -185,7 +187,7 @@ public class ListS3 extends AbstractS3Processor {
             .defaultValue("false")
             .build();
 
-    public static final PropertyDescriptor WRITE_USER_METADATA = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor WRITE_USER_METADATA = new Builder()
             .name("write-s3-user-metadata")
             .displayName("Write User Metadata")
             .description("If set to 'True', the user defined metadata associated with the S3 object will be added to FlowFile attributes/records")
@@ -194,7 +196,7 @@ public class ListS3 extends AbstractS3Processor {
             .defaultValue("false")
             .build();
 
-    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+    public static final PropertyDescriptor RECORD_WRITER = new Builder()
         .name("record-writer")
         .displayName("Record Writer")
         .description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, " +
@@ -203,6 +205,18 @@ public class ListS3 extends AbstractS3Processor {
         .identifiesControllerService(RecordSetWriterFactory.class)
         .build();
 
+    static final PropertyDescriptor BATCH_SIZE = new Builder()
+        .name("Listing Batch Size")
+        .displayName("Listing Batch Size")
+        .description("If not using a Record Writer, this property dictates how many S3 objects should be listed in a single batch. Once this number is reached, the FlowFiles that have been created " +
+            "will be transferred out of the Processor. Setting this value lower may result in lower latency by sending out the FlowFiles before the complete listing has finished. However, it can " +
+            "significantly reduce performance. Larger values may take more memory to store all of the information before sending the FlowFiles out. This property is ignored if using a Record " +
+            "Writer, as one of the main benefits of the Record Writer is being able to emit the entire listing as a single FlowFile.")
+        .required(false)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .defaultValue("100")
+        .build();
+
 
     public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
         BUCKET,
@@ -211,6 +225,7 @@ public class ListS3 extends AbstractS3Processor {
         SECRET_KEY,
         RECORD_WRITER,
         MIN_AGE,
+        BATCH_SIZE,
         WRITE_OBJECT_TAGS,
         WRITE_USER_METADATA,
         CREDENTIALS_FILE,
@@ -317,6 +332,7 @@ public class ListS3 extends AbstractS3Processor {
         final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
         final long listingTimestamp = System.currentTimeMillis();
         final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
+        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
 
         final AmazonS3 client = getClient();
         int listCount = 0;
@@ -412,7 +428,7 @@ public class ListS3 extends AbstractS3Processor {
 
                     totalListCount += listCount;
 
-                    if (listCount > 0 && writer.isCheckpoint()) {
+                    if (listCount >= batchSize && writer.isCheckpoint()) {
                         getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount});
                         session.commit();
                     }
@@ -831,7 +847,7 @@ public class ListS3 extends AbstractS3Processor {
 
         @Override
         public boolean isCheckpoint() {
-            return false;
+            return true;
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
index 0860085..83ebb45 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
@@ -121,6 +121,10 @@ public abstract class AbstractS3IT {
     public static void oneTimeTearDown() {
         // Empty the bucket before deleting it.
         try {
+            if (client == null) {
+                return;
+            }
+
             ObjectListing objectListing = client.listObjects(BUCKET_NAME);
 
             while (true) {