You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/10/21 15:26:10 UTC

[GitHub] [nifi] Lehel44 commented on a change in pull request #5413: NIFI-8676 Added 'Tracking Entities' listing strategy to 'ListS3' and 'ListGCSBucket'

Lehel44 commented on a change in pull request #5413:
URL: https://github.com/apache/nifi/pull/5413#discussion_r733766851



##########
File path: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -157,6 +164,45 @@
         @WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
 })
 public class ListGCSBucket extends AbstractGCSProcessor {
+    public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps",
+        "This strategy tracks the latest timestamp of listed entity to determine new/updated entities." +
+            " Since it only tracks few timestamps, it can manage listing state efficiently." +
+            " However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy." +
+            " Also may miss files when multiple subdirectories are being written at the same time while listing is running.");

Review comment:
       ```suggestion
               " Also, files may be missed/skipped when multiple subdirectories are being written at the same time while listing is running.");
   ```

##########
File path: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -157,6 +164,45 @@
         @WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
 })
 public class ListGCSBucket extends AbstractGCSProcessor {
+    public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps",
+        "This strategy tracks the latest timestamp of listed entity to determine new/updated entities." +
+            " Since it only tracks few timestamps, it can manage listing state efficiently." +
+            " However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy." +

Review comment:
       ```suggestion
               " This strategy will not pick any newly added or modified entity with a timestamp older than the recorded latest timestamp." +
   ```

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
##########
@@ -486,6 +546,147 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         }
     }
 
+    private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
+        listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+            String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+            S3BucketLister bucketLister = getS3BucketLister(context, getClient(), bucket);
+
+            List<ListableEntityWrapper<S3VersionSummary>> listedEntities = bucketLister.listVersions().getVersionSummaries()
+                .stream()
+                .filter(s3VersionSummary -> s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
+                .map(s3VersionSummary -> new ListableEntityWrapper<S3VersionSummary>(
+                    s3VersionSummary,
+                    S3VersionSummary::getKey,
+                    summary -> summary.getKey() + "_" + summary.getVersionId(),
+                    summary -> summary.getLastModified().getTime(),
+                    S3VersionSummary::getSize
+                ))
+                .collect(Collectors.toList());
+
+            return listedEntities;
+        }, null);
+
+        justElectedPrimaryNode = false;
+    }
+
+    private class ListedS3VersionSummaryTracker extends ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> {
+        public ListedS3VersionSummaryTracker() {
+            super(getIdentifier(), getLogger(), RecordObjectWriter.RECORD_SCHEMA);
+        }
+
+        @Override
+        protected void createRecordsForEntities(
+            ProcessContext context,
+            ProcessSession session,
+            List<ListableEntityWrapper<S3VersionSummary>> updatedEntities
+        ) throws IOException, SchemaNotFoundException {
+            publishListing(context, session, updatedEntities);
+        }
+
+        @Override
+        protected void createFlowFilesForEntities(
+            ProcessContext context,
+            ProcessSession session,
+            List<ListableEntityWrapper<S3VersionSummary>> updatedEntities,
+            Function<ListableEntityWrapper<S3VersionSummary>, Map<String, String>> createAttributes
+        ) {
+            publishListing(context, session, updatedEntities);
+        }
+
+        private void publishListing(ProcessContext context, ProcessSession session, List<ListableEntityWrapper<S3VersionSummary>> updatedEntities) {
+            final S3ObjectWriter writer;
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+            if (writerFactory == null) {
+                writer = new AttributeObjectWriter(session);
+            } else {
+                writer = new RecordObjectWriter(session, writerFactory, getLogger());
+            }
+
+            try {
+                writer.beginListing();
+                final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+                int listCount = 0;
+                for (ListableEntityWrapper<S3VersionSummary> updatedEntity : updatedEntities) {
+                    S3VersionSummary s3VersionSummary = updatedEntity.getRawEntity();
+
+                    GetObjectTaggingResult taggingResult = getTaggingResult(context, getClient(), s3VersionSummary);
+                    ObjectMetadata objectMetadata = getObjectMetadata(context, getClient(), s3VersionSummary);
+
+                    writer.addToListing(s3VersionSummary, taggingResult, objectMetadata);
+
+                    listCount++;
+
+                    if (listCount >= batchSize && writer.isCheckpoint()) {
+                        getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[]{listCount});
+                        session.commitAsync();
+                    }
+
+                    final ListedEntity listedEntity = new ListedEntity(updatedEntity.getTimestamp(), updatedEntity.getSize());
+                    alreadyListedEntities.put(updatedEntity.getIdentifier(), listedEntity);
+                }
+
+                writer.finishListing();
+            } catch (final Exception e) {
+                getLogger().error("Failed to list contents of bucket due to {}", new Object[]{e}, e);
+                writer.finishListingExceptionally(e);
+                session.rollback();
+                context.yield();
+                return;
+            }
+        }
+    }
+
+    private GetObjectTaggingResult getTaggingResult(ProcessContext context, AmazonS3 client, S3VersionSummary versionSummary) {
+        GetObjectTaggingResult taggingResult = null;
+        if (context.getProperty(WRITE_OBJECT_TAGS).asBoolean()) {
+            try {
+                taggingResult = client.getObjectTagging(new GetObjectTaggingRequest(versionSummary.getBucketName(), versionSummary.getKey()));
+            } catch (final Exception e) {
+                getLogger().warn("Failed to obtain Object Tags for S3 Object {} in bucket {}. Will list S3 Object without the object tags",
+                    new Object[] {versionSummary.getKey(), versionSummary.getBucketName()}, e);
+            }
+        }
+        return taggingResult;
+    }
+
+    private ObjectMetadata getObjectMetadata(ProcessContext context, AmazonS3 client, S3VersionSummary versionSummary) {
+        ObjectMetadata objectMetadata = null;
+        if (context.getProperty(WRITE_USER_METADATA).asBoolean()) {
+            try {
+                objectMetadata = client.getObjectMetadata(new GetObjectMetadataRequest(versionSummary.getBucketName(), versionSummary.getKey()));
+            } catch (final Exception e) {
+                getLogger().warn("Failed to obtain User Metadata for S3 Object {} in bucket {}. Will list S3 Object without the user metadata",
+                    new Object[] {versionSummary.getKey(), versionSummary.getBucketName()}, e);
+            }
+        }
+        return objectMetadata;
+    }
+
+    private S3BucketLister getS3BucketLister(ProcessContext context, AmazonS3 client, String bucket) {
+        String delimiter = context.getProperty(DELIMITER).getValue();
+        boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
+        String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+
+        boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
+        int listType = context.getProperty(LIST_TYPE).asInteger();
+        S3BucketLister bucketLister = useVersions
+            ? new S3VersionBucketLister(client)
+            : listType == 2
+            ? new S3ObjectBucketListerVersion2(client)
+            : new S3ObjectBucketLister(client);

Review comment:
       Would you please make this more readable? You could extract the nested ternary operator to an if statement + a ternary operator.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
##########
@@ -486,6 +546,147 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         }
     }
 
+    private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
+        listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+            String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+            S3BucketLister bucketLister = getS3BucketLister(context, getClient(), bucket);
+
+            List<ListableEntityWrapper<S3VersionSummary>> listedEntities = bucketLister.listVersions().getVersionSummaries()
+                .stream()
+                .filter(s3VersionSummary -> s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
+                .map(s3VersionSummary -> new ListableEntityWrapper<S3VersionSummary>(
+                    s3VersionSummary,
+                    S3VersionSummary::getKey,
+                    summary -> summary.getKey() + "_" + summary.getVersionId(),
+                    summary -> summary.getLastModified().getTime(),
+                    S3VersionSummary::getSize
+                ))
+                .collect(Collectors.toList());
+
+            return listedEntities;
+        }, null);
+
+        justElectedPrimaryNode = false;
+    }
+
+    private class ListedS3VersionSummaryTracker extends ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> {
+        public ListedS3VersionSummaryTracker() {
+            super(getIdentifier(), getLogger(), RecordObjectWriter.RECORD_SCHEMA);
+        }
+
+        @Override
+        protected void createRecordsForEntities(
+            ProcessContext context,
+            ProcessSession session,
+            List<ListableEntityWrapper<S3VersionSummary>> updatedEntities
+        ) throws IOException, SchemaNotFoundException {
+            publishListing(context, session, updatedEntities);
+        }
+
+        @Override
+        protected void createFlowFilesForEntities(
+            ProcessContext context,
+            ProcessSession session,
+            List<ListableEntityWrapper<S3VersionSummary>> updatedEntities,
+            Function<ListableEntityWrapper<S3VersionSummary>, Map<String, String>> createAttributes
+        ) {
+            publishListing(context, session, updatedEntities);
+        }
+
+        private void publishListing(ProcessContext context, ProcessSession session, List<ListableEntityWrapper<S3VersionSummary>> updatedEntities) {
+            final S3ObjectWriter writer;
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+            if (writerFactory == null) {
+                writer = new AttributeObjectWriter(session);
+            } else {
+                writer = new RecordObjectWriter(session, writerFactory, getLogger());
+            }
+
+            try {
+                writer.beginListing();
+                final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+                int listCount = 0;
+                for (ListableEntityWrapper<S3VersionSummary> updatedEntity : updatedEntities) {
+                    S3VersionSummary s3VersionSummary = updatedEntity.getRawEntity();
+
+                    GetObjectTaggingResult taggingResult = getTaggingResult(context, getClient(), s3VersionSummary);
+                    ObjectMetadata objectMetadata = getObjectMetadata(context, getClient(), s3VersionSummary);
+
+                    writer.addToListing(s3VersionSummary, taggingResult, objectMetadata);
+
+                    listCount++;
+
+                    if (listCount >= batchSize && writer.isCheckpoint()) {
+                        getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[]{listCount});
+                        session.commitAsync();
+                    }
+
+                    final ListedEntity listedEntity = new ListedEntity(updatedEntity.getTimestamp(), updatedEntity.getSize());
+                    alreadyListedEntities.put(updatedEntity.getIdentifier(), listedEntity);
+                }
+
+                writer.finishListing();
+            } catch (final Exception e) {
+                getLogger().error("Failed to list contents of bucket due to {}", new Object[]{e}, e);
+                writer.finishListingExceptionally(e);
+                session.rollback();
+                context.yield();
+                return;

Review comment:
       Is there any reason to return here?

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
##########
@@ -63,7 +63,7 @@
 public class ListedEntityTracker<T extends ListableEntity> {
 
     private final ObjectMapper objectMapper = new ObjectMapper();
-    private volatile Map<String, ListedEntity> alreadyListedEntities;
+    protected volatile Map<String, ListedEntity> alreadyListedEntities;

Review comment:
       The implementation is always a concurrent hashmap which is thread-safe. Is it need to be volatile?

##########
File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
##########
@@ -376,7 +376,7 @@ private void createRecordsForEntities(final ProcessContext context, final Proces
         session.transfer(flowFile, REL_SUCCESS);
     }
 
-    private void createFlowFilesForEntities(final ProcessSession session, final List<T> updatedEntities, final Function<T, Map<String, String>> createAttributes) {
+    protected void createFlowFilesForEntities(ProcessContext context, final ProcessSession session, final List<T> updatedEntities, final Function<T, Map<String, String>> createAttributes) {

Review comment:
       The `context` is unused in this method, you may remove it.

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
##########
@@ -486,6 +546,147 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         }
     }
 
+    private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
+        listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+            String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+            S3BucketLister bucketLister = getS3BucketLister(context, getClient(), bucket);
+
+            List<ListableEntityWrapper<S3VersionSummary>> listedEntities = bucketLister.listVersions().getVersionSummaries()
+                .stream()
+                .filter(s3VersionSummary -> s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
+                .map(s3VersionSummary -> new ListableEntityWrapper<S3VersionSummary>(
+                    s3VersionSummary,
+                    S3VersionSummary::getKey,
+                    summary -> summary.getKey() + "_" + summary.getVersionId(),
+                    summary -> summary.getLastModified().getTime(),
+                    S3VersionSummary::getSize
+                ))
+                .collect(Collectors.toList());
+
+            return listedEntities;
+        }, null);
+
+        justElectedPrimaryNode = false;
+    }
+
+    private class ListedS3VersionSummaryTracker extends ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> {
+        public ListedS3VersionSummaryTracker() {
+            super(getIdentifier(), getLogger(), RecordObjectWriter.RECORD_SCHEMA);
+        }
+
+        @Override
+        protected void createRecordsForEntities(
+            ProcessContext context,
+            ProcessSession session,
+            List<ListableEntityWrapper<S3VersionSummary>> updatedEntities
+        ) throws IOException, SchemaNotFoundException {

Review comment:
       I think you can remove the `throws Exception` from the signature.

##########
File path: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -157,6 +164,45 @@
         @WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
 })
 public class ListGCSBucket extends AbstractGCSProcessor {
+    public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps",
+        "This strategy tracks the latest timestamp of listed entity to determine new/updated entities." +
+            " Since it only tracks few timestamps, it can manage listing state efficiently." +
+            " However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy." +
+            " Also may miss files when multiple subdirectories are being written at the same time while listing is running.");
+
+    public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities",
+        "This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities." +
+            " This strategy can pick entities having old timestamp that can be missed with 'Tracing Timestamps'." +
+            " Works even when multiple subdirectories are being written at the same time while listing is running." +
+            " However additional DistributedMapCache controller service is required and more JVM heap memory is used." +

Review comment:
       ```suggestion
               " However, an additional DistributedMapCache controller service is required, as well as more JVM heap capacity." +
   ```

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
##########
@@ -486,6 +546,147 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         }
     }
 
+    private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
+        listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+            String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+            S3BucketLister bucketLister = getS3BucketLister(context, getClient(), bucket);
+
+            List<ListableEntityWrapper<S3VersionSummary>> listedEntities = bucketLister.listVersions().getVersionSummaries()
+                .stream()
+                .filter(s3VersionSummary -> s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
+                .map(s3VersionSummary -> new ListableEntityWrapper<S3VersionSummary>(
+                    s3VersionSummary,
+                    S3VersionSummary::getKey,
+                    summary -> summary.getKey() + "_" + summary.getVersionId(),
+                    summary -> summary.getLastModified().getTime(),
+                    S3VersionSummary::getSize
+                ))
+                .collect(Collectors.toList());
+
+            return listedEntities;
+        }, null);
+
+        justElectedPrimaryNode = false;
+    }
+
+    private class ListedS3VersionSummaryTracker extends ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> {
+        public ListedS3VersionSummaryTracker() {
+            super(getIdentifier(), getLogger(), RecordObjectWriter.RECORD_SCHEMA);
+        }
+
+        @Override
+        protected void createRecordsForEntities(
+            ProcessContext context,
+            ProcessSession session,
+            List<ListableEntityWrapper<S3VersionSummary>> updatedEntities
+        ) throws IOException, SchemaNotFoundException {
+            publishListing(context, session, updatedEntities);
+        }
+
+        @Override
+        protected void createFlowFilesForEntities(
+            ProcessContext context,
+            ProcessSession session,
+            List<ListableEntityWrapper<S3VersionSummary>> updatedEntities,
+            Function<ListableEntityWrapper<S3VersionSummary>, Map<String, String>> createAttributes
+        ) {
+            publishListing(context, session, updatedEntities);
+        }
+
+        private void publishListing(ProcessContext context, ProcessSession session, List<ListableEntityWrapper<S3VersionSummary>> updatedEntities) {
+            final S3ObjectWriter writer;
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+            if (writerFactory == null) {
+                writer = new AttributeObjectWriter(session);
+            } else {
+                writer = new RecordObjectWriter(session, writerFactory, getLogger());
+            }
+
+            try {
+                writer.beginListing();
+                final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+                int listCount = 0;
+                for (ListableEntityWrapper<S3VersionSummary> updatedEntity : updatedEntities) {
+                    S3VersionSummary s3VersionSummary = updatedEntity.getRawEntity();
+
+                    GetObjectTaggingResult taggingResult = getTaggingResult(context, getClient(), s3VersionSummary);
+                    ObjectMetadata objectMetadata = getObjectMetadata(context, getClient(), s3VersionSummary);
+
+                    writer.addToListing(s3VersionSummary, taggingResult, objectMetadata);
+
+                    listCount++;
+
+                    if (listCount >= batchSize && writer.isCheckpoint()) {
+                        getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[]{listCount});

Review comment:
       There's a new varargs method for logging, I think you can remove the array creation.

##########
File path: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -157,6 +164,45 @@
         @WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
 })
 public class ListGCSBucket extends AbstractGCSProcessor {
+    public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps",
+        "This strategy tracks the latest timestamp of listed entity to determine new/updated entities." +
+            " Since it only tracks few timestamps, it can manage listing state efficiently." +
+            " However, any newly added, or updated entity having timestamp older than the tracked latest timestamp can not be picked by this strategy." +
+            " Also may miss files when multiple subdirectories are being written at the same time while listing is running.");
+
+    public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities",
+        "This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities." +
+            " This strategy can pick entities having old timestamp that can be missed with 'Tracing Timestamps'." +
+            " Works even when multiple subdirectories are being written at the same time while listing is running." +
+            " However additional DistributedMapCache controller service is required and more JVM heap memory is used." +
+            " See the description of 'Entity Tracking Time Window' property for further details on how it works.");

Review comment:
       ```suggestion
               " For more information on how the 'Entity Tracking Time Window' property works, see the description.");
   ```

##########
File path: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -358,6 +444,131 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
     }
 
+    private List<Storage.BlobListOption> getBlobListOptions(ProcessContext context) {
+        final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+        final boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();
+
+        final List<Storage.BlobListOption> listOptions = new ArrayList<>();
+
+        if (prefix != null) {
+            listOptions.add(Storage.BlobListOption.prefix(prefix));
+        }
+        if (useGenerations) {
+            listOptions.add(Storage.BlobListOption.versions(true));
+        }
+
+        return listOptions;
+    }
+
+    private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
+        listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+            List<ListableBlob> listedEntities = new ArrayList<>();
+
+            Storage storage = getCloudService();
+            String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+            final List<Storage.BlobListOption> listOptions = getBlobListOptions(context);
+
+            Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[0]));
+            int pageNr=0;
+            do {
+                for (final Blob blob : blobPage.getValues()) {
+                    if (blob.getUpdateTime() >= minTimestampToList) {
+                        listedEntities.add(new ListableBlob(
+                            blob,
+                            pageNr
+                        ));
+                    }
+                }
+                blobPage = blobPage.getNextPage();
+                pageNr++;
+            } while (blobPage != null);
+
+            return listedEntities;
+        }, null);
+
+        justElectedPrimaryNode = false;
+    }
+
+    protected class ListedBlobTracker extends ListedEntityTracker<ListableBlob> {
+        public ListedBlobTracker() {
+            super(getIdentifier(), getLogger(), RecordBlobWriter.RECORD_SCHEMA);
+        }
+
+        @Override
+        protected void createRecordsForEntities(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities) throws IOException, SchemaNotFoundException {
+            publishListing(context, session, updatedEntities);
+        }
+
+        @Override
+        protected void createFlowFilesForEntities(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities, Function<ListableBlob, Map<String, String>> createAttributes) {
+            publishListing(context, session, updatedEntities);
+        }
+
+        private void publishListing(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities) {
+            final BlobWriter writer;
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+            if (writerFactory == null) {
+                writer = new AttributeBlobWriter(session);
+            } else {
+                writer = new RecordBlobWriter(session, writerFactory, getLogger());
+            }
+
+            long maxTimestamp = 0L;
+            final Set<String> keysMatchingTimestamp = new HashSet<>();
+
+            try {
+                writer.beginListing();
+
+                int listCount = 0;
+                int pageNr = -1;
+                for (ListableBlob listableBlob : updatedEntities) {
+                    Blob blob = listableBlob.getRawEntity();
+                    int currentPageNr = listableBlob.getPageNr();
+
+                    writer.addToListing(blob);
+
+                    listCount++;
+
+                    if (pageNr != -1 && pageNr != currentPageNr && writer.isCheckpoint()) {

Review comment:
       Can `pageNr != -1` be true here at any time?

##########
File path: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
##########
@@ -486,6 +546,147 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         }
     }
 
+    private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
+        listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+            String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+            S3BucketLister bucketLister = getS3BucketLister(context, getClient(), bucket);
+
+            List<ListableEntityWrapper<S3VersionSummary>> listedEntities = bucketLister.listVersions().getVersionSummaries()
+                .stream()
+                .filter(s3VersionSummary -> s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
+                .map(s3VersionSummary -> new ListableEntityWrapper<S3VersionSummary>(
+                    s3VersionSummary,
+                    S3VersionSummary::getKey,
+                    summary -> summary.getKey() + "_" + summary.getVersionId(),
+                    summary -> summary.getLastModified().getTime(),
+                    S3VersionSummary::getSize
+                ))
+                .collect(Collectors.toList());
+
+            return listedEntities;

Review comment:
       ```suggestion
       return bucketLister.listVersions().getVersionSummaries()
                   .stream()
                   .filter(s3VersionSummary -> s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
                   .map(s3VersionSummary -> new ListableEntityWrapper<>(
                           s3VersionSummary,
                           S3VersionSummary::getKey,
                           summary -> summary.getKey() + "_" + summary.getVersionId(),
                           summary -> summary.getLastModified().getTime(),
                           S3VersionSummary::getSize
                   ))
                   .collect(Collectors.toList());
           }, null);
   ```

##########
File path: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -358,6 +444,131 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
     }
 
+    private List<Storage.BlobListOption> getBlobListOptions(ProcessContext context) {
+        final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+        final boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();
+
+        final List<Storage.BlobListOption> listOptions = new ArrayList<>();
+
+        if (prefix != null) {
+            listOptions.add(Storage.BlobListOption.prefix(prefix));
+        }
+        if (useGenerations) {
+            listOptions.add(Storage.BlobListOption.versions(true));
+        }
+
+        return listOptions;
+    }
+
+    private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
+        listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+            List<ListableBlob> listedEntities = new ArrayList<>();
+
+            Storage storage = getCloudService();
+            String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+            final List<Storage.BlobListOption> listOptions = getBlobListOptions(context);
+
+            Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[0]));
+            int pageNr=0;
+            do {
+                for (final Blob blob : blobPage.getValues()) {
+                    if (blob.getUpdateTime() >= minTimestampToList) {
+                        listedEntities.add(new ListableBlob(
+                            blob,
+                            pageNr
+                        ));
+                    }
+                }
+                blobPage = blobPage.getNextPage();
+                pageNr++;
+            } while (blobPage != null);
+
+            return listedEntities;
+        }, null);
+
+        justElectedPrimaryNode = false;
+    }
+
+    protected class ListedBlobTracker extends ListedEntityTracker<ListableBlob> {
+        public ListedBlobTracker() {
+            super(getIdentifier(), getLogger(), RecordBlobWriter.RECORD_SCHEMA);
+        }
+
+        @Override
+        protected void createRecordsForEntities(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities) throws IOException, SchemaNotFoundException {
+            publishListing(context, session, updatedEntities);
+        }
+
+        @Override
+        protected void createFlowFilesForEntities(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities, Function<ListableBlob, Map<String, String>> createAttributes) {
+            publishListing(context, session, updatedEntities);
+        }
+
+        private void publishListing(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities) {
+            final BlobWriter writer;
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+            if (writerFactory == null) {
+                writer = new AttributeBlobWriter(session);
+            } else {
+                writer = new RecordBlobWriter(session, writerFactory, getLogger());
+            }
+
+            long maxTimestamp = 0L;
+            final Set<String> keysMatchingTimestamp = new HashSet<>();
+
+            try {
+                writer.beginListing();
+
+                int listCount = 0;
+                int pageNr = -1;
+                for (ListableBlob listableBlob : updatedEntities) {
+                    Blob blob = listableBlob.getRawEntity();
+                    int currentPageNr = listableBlob.getPageNr();
+
+                    writer.addToListing(blob);
+
+                    listCount++;
+
+                    if (pageNr != -1 && pageNr != currentPageNr && writer.isCheckpoint()) {
+                        commit(session, listCount, maxTimestamp, keysMatchingTimestamp);
+                        listCount = 0;
+                        pageNr = currentPageNr;
+                    }
+                }
+
+                writer.finishListing();
+            } catch (final Exception e) {
+                getLogger().error("Failed to list contents of bucket due to {}", new Object[] {e}, e);
+                writer.finishListingExceptionally(e);
+                session.rollback();
+                context.yield();
+                return;

Review comment:
       I think we return here anyway.

##########
File path: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -358,6 +444,131 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
         getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
     }
 
+    private List<Storage.BlobListOption> getBlobListOptions(ProcessContext context) {
+        final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+        final boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();
+
+        final List<Storage.BlobListOption> listOptions = new ArrayList<>();
+
+        if (prefix != null) {
+            listOptions.add(Storage.BlobListOption.prefix(prefix));
+        }
+        if (useGenerations) {
+            listOptions.add(Storage.BlobListOption.versions(true));
+        }
+
+        return listOptions;
+    }
+
+    private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
+        listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+            List<ListableBlob> listedEntities = new ArrayList<>();
+
+            Storage storage = getCloudService();
+            String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+            final List<Storage.BlobListOption> listOptions = getBlobListOptions(context);
+
+            Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[0]));
+            int pageNr=0;

Review comment:
       ```suggestion
               int pageNumber = 0;
   ```

##########
File path: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
##########
@@ -266,7 +349,19 @@ long getStateTimestamp() {
     }
 
     @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
+
+        if (BY_TIMESTAMPS.equals(listingStrategy)) {
+            listByTrackingTimestamps(context, session);
+        } else if (BY_ENTITIES.equals(listingStrategy)) {
+            listByTrackingEntities(context, session);
+        } else {
+            throw new ProcessException("Unknown listing strategy: " + listingStrategy);

Review comment:
       Is this necessary? The strategy can be chosen from AllowableValues.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org