You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/01/10 19:36:08 UTC

[nifi] branch main updated: NIFI-8676 Added 'Tracking Entities' listing strategy to 'ListS3' and 'ListGCSBucket'

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 990285b  NIFI-8676 Added 'Tracking Entities' listing strategy to 'ListS3' and 'ListGCSBucket'
990285b is described below

commit 990285ba1c1429b436af1cab4b36ab8a3797bb14
Author: Tamas Palfy <ta...@gmail.com>
AuthorDate: Thu Sep 23 18:36:03 2021 +0200

    NIFI-8676 Added 'Tracking Entities' listing strategy to 'ListS3' and 'ListGCSBucket'
    
    This closes #5413.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../nifi-aws-bundle/nifi-aws-processors/pom.xml    |   4 +
 .../org/apache/nifi/processors/aws/s3/ListS3.java  | 321 +++++++++++++++++----
 .../processor/util/list/ListableEntityWrapper.java |  67 +++++
 .../processor/util/list/ListedEntityTracker.java   |  18 +-
 .../nifi-gcp-bundle/nifi-gcp-processors/pom.xml    |   4 +
 .../nifi/processors/gcp/storage/ListGCSBucket.java | 235 ++++++++++++++-
 6 files changed, 566 insertions(+), 83 deletions(-)

diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index 03e2b41..1c8b255 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -31,6 +31,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-processor-utils</artifactId>
             <version>1.16.0-SNAPSHOT</version>
         </dependency>
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 5ee1a83..57065f3 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -43,6 +43,9 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.ConfigVerificationResult.Outcome;
@@ -60,8 +63,12 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.VerifiableProcessor;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.list.ListableEntityWrapper;
+import org.apache.nifi.processor.util.list.ListedEntity;
+import org.apache.nifi.processor.util.list.ListedEntityTracker;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -87,6 +94,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 @PrimaryNodeOnly
 @TriggerSerially
@@ -117,6 +126,45 @@ import java.util.concurrent.atomic.AtomicReference;
 @SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
 public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
 
+    public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps",
+        "This strategy tracks the latest timestamp of listed entity to determine new/updated entities." +
+            " Since it only tracks few timestamps, it can manage listing state efficiently." +
+            " This strategy will not pick up any newly added or modified entity if their timestamps are older than the tracked latest timestamp." +
+            " Also may miss files when multiple subdirectories are being written at the same time while listing is running.");
+
+    public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities",
+        "This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities." +
+            " This strategy can pick entities having old timestamp that can be missed with 'Tracing Timestamps'." +
+            " Works even when multiple subdirectories are being written at the same time while listing is running." +
+            " However an additional DistributedMapCache controller service is required and more JVM heap memory is used." +
+            " For more information on how the 'Entity Tracking Time Window' property works, see the description.");
+
+    public static final PropertyDescriptor LISTING_STRATEGY = new Builder()
+        .name("listing-strategy")
+        .displayName("Listing Strategy")
+        .description("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
+        .required(true)
+        .allowableValues(BY_TIMESTAMPS, BY_ENTITIES)
+        .defaultValue(BY_TIMESTAMPS.getValue())
+        .build();
+
+    public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(ListedEntityTracker.TRACKING_STATE_CACHE)
+        .dependsOn(LISTING_STRATEGY, BY_ENTITIES)
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor INITIAL_LISTING_TARGET = new PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(ListedEntityTracker.INITIAL_LISTING_TARGET)
+        .dependsOn(LISTING_STRATEGY, BY_ENTITIES)
+        .build();
+
+    public static final PropertyDescriptor TRACKING_TIME_WINDOW = new PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(ListedEntityTracker.TRACKING_TIME_WINDOW)
+        .dependsOn(INITIAL_LISTING_TARGET, ListedEntityTracker.INITIAL_LISTING_TARGET_WINDOW)
+        .required(true)
+        .build();
+
     public static final PropertyDescriptor DELIMITER = new Builder()
             .name("delimiter")
             .displayName("Delimiter")
@@ -223,6 +271,10 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
 
 
     public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
+        LISTING_STRATEGY,
+        TRACKING_STATE_CACHE,
+        INITIAL_LISTING_TARGET,
+        TRACKING_TIME_WINDOW,
         BUCKET,
         REGION,
         ACCESS_KEY,
@@ -257,6 +309,40 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
     // State tracking
     private final AtomicReference<ListingSnapshot> listing = new AtomicReference<>(new ListingSnapshot(0L, Collections.emptySet()));
 
+    private volatile boolean justElectedPrimaryNode = false;
+    private volatile boolean resetEntityTrackingState = false;
+    private volatile ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> listedEntityTracker;
+
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
+        justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
+    }
+
+    @OnScheduled
+    public void initListedEntityTracker(ProcessContext context) {
+        final boolean isTrackingEntityStrategy = BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue());
+        if (listedEntityTracker != null && (resetEntityTrackingState || !isTrackingEntityStrategy)) {
+            try {
+                listedEntityTracker.clearListedEntities();
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to reset previously listed entities due to " + e, e);
+            }
+        }
+        resetEntityTrackingState = false;
+
+        if (isTrackingEntityStrategy) {
+            if (listedEntityTracker == null) {
+                listedEntityTracker = createListedEntityTracker();
+            }
+        } else {
+            listedEntityTracker = null;
+        }
+    }
+
+    protected ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> createListedEntityTracker() {
+        return new ListedS3VersionSummaryTracker();
+    }
+
     private static Validator createRequesterPaysValidator() {
         return new Validator() {
             @Override
@@ -334,6 +420,18 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
+
+        if (BY_TIMESTAMPS.equals(listingStrategy)) {
+            listByTrackingTimestamps(context, session);
+        } else if (BY_ENTITIES.equals(listingStrategy)) {
+            listByTrackingEntities(context, session);
+        } else {
+            throw new ProcessException("Unknown listing strategy: " + listingStrategy);
+        }
+    }
+
+    private void listByTrackingTimestamps(ProcessContext context, ProcessSession session) {
         try {
             restoreState(session);
         } catch (IOException ioe) {
@@ -360,7 +458,6 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
         int totalListCount = 0;
         long latestListedTimestampInThisCycle = currentTimestamp;
 
-        VersionListing versionListing;
         final Set<String> listedKeys = new HashSet<>();
         getLogger().trace("Start listing, listingTimestamp={}, currentTimestamp={}, currentKeys={}", new Object[]{listingTimestamp, currentTimestamp, currentKeys});
 
@@ -375,69 +472,51 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
         try {
             writer.beginListing();
 
-                do {
-                    versionListing = bucketLister.listVersions();
-                    for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
-                        long lastModified = versionSummary.getLastModified().getTime();
-                        if (lastModified < currentTimestamp
-                            || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())
-                            || lastModified > (listingTimestamp - minAgeMilliseconds)) {
-                            continue;
-                        }
-
-                        getLogger().trace("Listed key={}, lastModified={}, currentKeys={}", new Object[]{versionSummary.getKey(), lastModified, currentKeys});
-
-                        // Get object tags if configured to do so
-                        GetObjectTaggingResult taggingResult = null;
-                        if (context.getProperty(WRITE_OBJECT_TAGS).asBoolean()) {
-                            try {
-                                taggingResult = client.getObjectTagging(new GetObjectTaggingRequest(versionSummary.getBucketName(), versionSummary.getKey()));
-                            } catch (final Exception e) {
-                                getLogger().warn("Failed to obtain Object Tags for S3 Object {} in bucket {}. Will list S3 Object without the object tags",
-                                    new Object[] {versionSummary.getKey(), versionSummary.getBucketName()}, e);
-                            }
-                        }
-
-                        // Get user metadata if configured to do so
-                        ObjectMetadata objectMetadata = null;
-                        if (context.getProperty(WRITE_USER_METADATA).asBoolean()) {
-                            try {
-                                objectMetadata = client.getObjectMetadata(new GetObjectMetadataRequest(versionSummary.getBucketName(), versionSummary.getKey()));
-                            } catch (final Exception e) {
-                                getLogger().warn("Failed to obtain User Metadata for S3 Object {} in bucket {}. Will list S3 Object without the user metadata",
-                                    new Object[] {versionSummary.getKey(), versionSummary.getBucketName()}, e);
-                            }
-                        }
-
-                        // Write the entity to the listing
-                        writer.addToListing(versionSummary, taggingResult, objectMetadata);
-
-                        // Track the latest lastModified timestamp and keys having that timestamp.
-                        // NOTE: Amazon S3 lists objects in UTF-8 character encoding in lexicographical order. Not ordered by timestamps.
-                        if (lastModified > latestListedTimestampInThisCycle) {
-                            latestListedTimestampInThisCycle = lastModified;
-                            listedKeys.clear();
-                            listedKeys.add(versionSummary.getKey());
-
-                        } else if (lastModified == latestListedTimestampInThisCycle) {
-                            listedKeys.add(versionSummary.getKey());
-                        }
-
-                        listCount++;
+            do {
+                VersionListing versionListing = bucketLister.listVersions();
+                for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
+                    long lastModified = versionSummary.getLastModified().getTime();
+                    if (lastModified < currentTimestamp
+                        || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())
+                        || lastModified > (listingTimestamp - minAgeMilliseconds)) {
+                        continue;
                     }
-                    bucketLister.setNextMarker();
 
-                    totalListCount += listCount;
+                    getLogger().trace("Listed key={}, lastModified={}, currentKeys={}", new Object[]{versionSummary.getKey(), lastModified, currentKeys});
 
-                    if (listCount >= batchSize && writer.isCheckpoint()) {
-                        getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount});
-                        session.commitAsync();
+                    GetObjectTaggingResult taggingResult = getTaggingResult(context, client, versionSummary);
+
+                    ObjectMetadata objectMetadata = getObjectMetadata(context, client, versionSummary);
+
+                    // Write the entity to the listing
+                    writer.addToListing(versionSummary, taggingResult, objectMetadata);
+
+                    // Track the latest lastModified timestamp and keys having that timestamp.
+                    // NOTE: Amazon S3 lists objects in UTF-8 character encoding in lexicographical order. Not ordered by timestamps.
+                    if (lastModified > latestListedTimestampInThisCycle) {
+                        latestListedTimestampInThisCycle = lastModified;
+                        listedKeys.clear();
+                        listedKeys.add(versionSummary.getKey());
+
+                    } else if (lastModified == latestListedTimestampInThisCycle) {
+                        listedKeys.add(versionSummary.getKey());
                     }
 
-                    listCount = 0;
-                } while (bucketLister.isTruncated());
+                    listCount++;
+                }
+                bucketLister.setNextMarker();
 
-                writer.finishListing();
+                totalListCount += listCount;
+
+                if (listCount >= batchSize && writer.isCheckpoint()) {
+                    getLogger().info("Successfully listed {} new files from S3; routing to success", listCount);
+                    session.commitAsync();
+                }
+
+                listCount = 0;
+            } while (bucketLister.isTruncated());
+
+            writer.finishListing();
         } catch (final Exception e) {
             getLogger().error("Failed to list contents of bucket due to {}", new Object[] {e}, e);
             writer.finishListingExceptionally(e);
@@ -468,7 +547,123 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
         }
     }
 
-    private S3BucketLister getS3BucketLister(final ProcessContext context, final AmazonS3 client) {
+    private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
+        listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+            S3BucketLister bucketLister = getS3BucketLister(context, getClient());
+
+            List<ListableEntityWrapper<S3VersionSummary>> listedEntities = bucketLister.listVersions().getVersionSummaries()
+                .stream()
+                .filter(s3VersionSummary -> s3VersionSummary.getLastModified().getTime() >= minTimestampToList)
+                .map(s3VersionSummary -> new ListableEntityWrapper<S3VersionSummary>(
+                    s3VersionSummary,
+                    S3VersionSummary::getKey,
+                    summary -> summary.getKey() + "_" + summary.getVersionId(),
+                    summary -> summary.getLastModified().getTime(),
+                    S3VersionSummary::getSize
+                ))
+                .collect(Collectors.toList());
+
+            return listedEntities;
+        }, null);
+
+        justElectedPrimaryNode = false;
+    }
+
+    private class ListedS3VersionSummaryTracker extends ListedEntityTracker<ListableEntityWrapper<S3VersionSummary>> {
+        public ListedS3VersionSummaryTracker() {
+            super(getIdentifier(), getLogger(), RecordObjectWriter.RECORD_SCHEMA);
+        }
+
+        @Override
+        protected void createRecordsForEntities(
+            ProcessContext context,
+            ProcessSession session,
+            List<ListableEntityWrapper<S3VersionSummary>> updatedEntities
+        ) throws IOException, SchemaNotFoundException {
+            publishListing(context, session, updatedEntities);
+        }
+
+        @Override
+        protected void createFlowFilesForEntities(
+            ProcessContext context,
+            ProcessSession session,
+            List<ListableEntityWrapper<S3VersionSummary>> updatedEntities,
+            Function<ListableEntityWrapper<S3VersionSummary>, Map<String, String>> createAttributes
+        ) {
+            publishListing(context, session, updatedEntities);
+        }
+
+        private void publishListing(ProcessContext context, ProcessSession session, List<ListableEntityWrapper<S3VersionSummary>> updatedEntities) {
+            final S3ObjectWriter writer;
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+            if (writerFactory == null) {
+                writer = new AttributeObjectWriter(session);
+            } else {
+                writer = new RecordObjectWriter(session, writerFactory, getLogger());
+            }
+
+            try {
+                writer.beginListing();
+                final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+
+                int listCount = 0;
+                for (ListableEntityWrapper<S3VersionSummary> updatedEntity : updatedEntities) {
+                    S3VersionSummary s3VersionSummary = updatedEntity.getRawEntity();
+
+                    GetObjectTaggingResult taggingResult = getTaggingResult(context, getClient(), s3VersionSummary);
+                    ObjectMetadata objectMetadata = getObjectMetadata(context, getClient(), s3VersionSummary);
+
+                    writer.addToListing(s3VersionSummary, taggingResult, objectMetadata);
+
+                    listCount++;
+
+                    if (listCount >= batchSize && writer.isCheckpoint()) {
+                        getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[]{listCount});
+                        session.commitAsync();
+                    }
+
+                    final ListedEntity listedEntity = new ListedEntity(updatedEntity.getTimestamp(), updatedEntity.getSize());
+                    alreadyListedEntities.put(updatedEntity.getIdentifier(), listedEntity);
+                }
+
+                writer.finishListing();
+            } catch (final Exception e) {
+                getLogger().error("Failed to list contents of bucket due to {}", new Object[]{e}, e);
+                writer.finishListingExceptionally(e);
+                session.rollback();
+                context.yield();
+                return;
+            }
+        }
+    }
+
+    private GetObjectTaggingResult getTaggingResult(ProcessContext context, AmazonS3 client, S3VersionSummary versionSummary) {
+        GetObjectTaggingResult taggingResult = null;
+        if (context.getProperty(WRITE_OBJECT_TAGS).asBoolean()) {
+            try {
+                taggingResult = client.getObjectTagging(new GetObjectTaggingRequest(versionSummary.getBucketName(), versionSummary.getKey()));
+            } catch (final Exception e) {
+                getLogger().warn("Failed to obtain Object Tags for S3 Object {} in bucket {}. Will list S3 Object without the object tags",
+                    new Object[] {versionSummary.getKey(), versionSummary.getBucketName()}, e);
+            }
+        }
+        return taggingResult;
+    }
+
+    private ObjectMetadata getObjectMetadata(ProcessContext context, AmazonS3 client, S3VersionSummary versionSummary) {
+        ObjectMetadata objectMetadata = null;
+        if (context.getProperty(WRITE_USER_METADATA).asBoolean()) {
+            try {
+                objectMetadata = client.getObjectMetadata(new GetObjectMetadataRequest(versionSummary.getBucketName(), versionSummary.getKey()));
+            } catch (final Exception e) {
+                getLogger().warn("Failed to obtain User Metadata for S3 Object {} in bucket {}. Will list S3 Object without the user metadata",
+                    new Object[] {versionSummary.getKey(), versionSummary.getBucketName()}, e);
+            }
+        }
+        return objectMetadata;
+    }
+
+    private S3BucketLister getS3BucketLister(ProcessContext context, AmazonS3 client) {
         final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean();
         final boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
 
@@ -479,10 +674,10 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor {
         final int listType = context.getProperty(LIST_TYPE).asInteger();
 
         final S3BucketLister bucketLister = useVersions
-                ? new S3VersionBucketLister(client)
-                : listType == 2
-                ? new S3ObjectBucketListerVersion2(client)
-                : new S3ObjectBucketLister(client);
+            ? new S3VersionBucketLister(client)
+            : listType == 2
+            ? new S3ObjectBucketListerVersion2(client)
+            : new S3ObjectBucketLister(client);
 
         bucketLister.setBucketName(bucket);
         bucketLister.setRequesterPays(requesterPays);
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntityWrapper.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntityWrapper.java
new file mode 100644
index 0000000..7d8de91
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListableEntityWrapper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processor.util.list;
+
+import java.util.function.Function;
+
+public class ListableEntityWrapper<T> implements ListableEntity {
+
+    private T rawEntity;
+    private final Function<T, String> toName;
+    private final Function<T, String> toIdentifier;
+    private final Function<T, Long> toTimestamp;
+    private final Function<T, Long> toSize;
+
+    public ListableEntityWrapper(
+        T rawEntity,
+        Function<T, String> toName,
+        Function<T, String> toIdentifier,
+        Function<T, Long> toTimestamp,
+        Function<T, Long> toSize
+    ) {
+        this.rawEntity = rawEntity;
+        this.toName = toName;
+        this.toIdentifier = toIdentifier;
+        this.toTimestamp = toTimestamp;
+        this.toSize = toSize;
+    }
+
+    public T getRawEntity() {
+        return rawEntity;
+    }
+
+    @Override
+    public String getName() {
+        return toName.apply(rawEntity);
+    }
+
+    @Override
+    public String getIdentifier() {
+        return toIdentifier.apply(rawEntity);
+    }
+
+    @Override
+    public long getTimestamp() {
+        return toTimestamp.apply(rawEntity);
+    }
+
+    @Override
+    public long getSize() {
+        return toSize.apply(rawEntity);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
index 1a70ecb..13f1f16 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/ListedEntityTracker.java
@@ -63,7 +63,7 @@ import static org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCC
 public class ListedEntityTracker<T extends ListableEntity> {
 
     private final ObjectMapper objectMapper = new ObjectMapper();
-    private volatile Map<String, ListedEntity> alreadyListedEntities;
+    protected volatile Map<String, ListedEntity> alreadyListedEntities;
 
     private static final String NOTE = "Used by 'Tracking Entities' strategy.";
     public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder()
@@ -100,9 +100,9 @@ public class ListedEntityTracker<T extends ListableEntity> {
             .defaultValue("3 hours")
             .build();
 
-    private static final AllowableValue INITIAL_LISTING_TARGET_ALL = new AllowableValue("all", "All Available",
+    public static final AllowableValue INITIAL_LISTING_TARGET_ALL = new AllowableValue("all", "All Available",
             "Regardless of entities timestamp, all existing entities will be listed at the initial listing activity.");
-    private static final AllowableValue INITIAL_LISTING_TARGET_WINDOW = new AllowableValue("window", "Tracking Time Window",
+    public static final AllowableValue INITIAL_LISTING_TARGET_WINDOW = new AllowableValue("window", "Tracking Time Window",
             "Ignore entities having timestamp older than the specified 'Tracking Time Window' at the initial listing activity.");
 
     public static final PropertyDescriptor INITIAL_LISTING_TARGET = new PropertyDescriptor.Builder()
@@ -158,7 +158,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
     private String nodeId;
     private DistributedMapCacheClient mapCacheClient;
 
-    ListedEntityTracker(final String componentId, final ComponentLog logger, final RecordSchema recordSchema) {
+    public ListedEntityTracker(final String componentId, final ComponentLog logger, final RecordSchema recordSchema) {
         this(componentId, logger, DEFAULT_CURRENT_TIMESTAMP_SUPPLIER, recordSchema);
     }
 
@@ -166,7 +166,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
      * This constructor is used by unit test code so that it can produce the consistent result by controlling current timestamp.
      * @param currentTimestampSupplier a function to return current timestamp.
      */
-    ListedEntityTracker(final String componentId, final ComponentLog logger, final Supplier<Long> currentTimestampSupplier, final RecordSchema recordSchema) {
+    public ListedEntityTracker(final String componentId, final ComponentLog logger, final Supplier<Long> currentTimestampSupplier, final RecordSchema recordSchema) {
         this.componentId = componentId;
         this.logger = logger;
         this.currentTimestampSupplier = currentTimestampSupplier;
@@ -222,7 +222,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
         return listedEntities;
     }
 
-    void clearListedEntities() throws IOException {
+    public void clearListedEntities() throws IOException {
         alreadyListedEntities = null;
         if (mapCacheClient != null) {
             final String cacheKey = getCacheKey();
@@ -327,7 +327,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
                 logger.error("Failed to create records for listed entities", e);
             }
         } else {
-            createFlowFilesForEntities(session, updatedEntities, createAttributes);
+            createFlowFilesForEntities(context, session, updatedEntities, createAttributes);
         }
 
         // Commit ProcessSession before persisting listed entities.
@@ -344,7 +344,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
         });
     }
 
-    private void createRecordsForEntities(final ProcessContext context, final ProcessSession session, final List<T> updatedEntities) throws IOException, SchemaNotFoundException {
+    protected void createRecordsForEntities(final ProcessContext context, final ProcessSession session, final List<T> updatedEntities) throws IOException, SchemaNotFoundException {
         if (updatedEntities.isEmpty()) {
             logger.debug("No entities to write records for");
             return;
@@ -376,7 +376,7 @@ public class ListedEntityTracker<T extends ListableEntity> {
         session.transfer(flowFile, REL_SUCCESS);
     }
 
-    private void createFlowFilesForEntities(final ProcessSession session, final List<T> updatedEntities, final Function<T, Map<String, String>> createAttributes) {
+    protected void createFlowFilesForEntities(ProcessContext context, final ProcessSession session, final List<T> updatedEntities, final Function<T, Map<String, String>> createAttributes) {
         for (T updatedEntity : updatedEntities) {
             FlowFile flowFile = session.create();
             flowFile = session.putAllAttributes(flowFile, createAttributes.apply(updatedEntity));
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index 890cb0b..5ae7453 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -52,6 +52,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-processor-utils</artifactId>
             <version>1.16.0-SNAPSHOT</version>
         </dependency>
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index dd43e78..8656327 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -35,6 +35,10 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
@@ -46,6 +50,9 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.list.ListableEntityWrapper;
+import org.apache.nifi.processor.util.list.ListedEntity;
+import org.apache.nifi.processor.util.list.ListedEntityTracker;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -68,6 +75,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.nifi.processors.gcp.storage.StorageAttributes.BUCKET_ATTR;
@@ -159,6 +167,45 @@ import static org.apache.nifi.processors.gcp.storage.StorageAttributes.URI_DESC;
         @WritesAttribute(attribute = URI_ATTR, description = URI_DESC)
 })
 public class ListGCSBucket extends AbstractGCSProcessor {
+    public static final AllowableValue BY_TIMESTAMPS = new AllowableValue("timestamps", "Tracking Timestamps",
+        "This strategy tracks the latest timestamp of listed entity to determine new/updated entities." +
+            " Since it only tracks few timestamps, it can manage listing state efficiently." +
+            " This strategy will not pick up any newly added or modified entity if their timestamps are older than the tracked latest timestamp." +
+            " Also may miss files when multiple subdirectories are being written at the same time while listing is running.");
+
+    public static final AllowableValue BY_ENTITIES = new AllowableValue("entities", "Tracking Entities",
+        "This strategy tracks information of all the listed entities within the latest 'Entity Tracking Time Window' to determine new/updated entities." +
+            " This strategy can pick entities having old timestamp that can be missed with 'Tracing Timestamps'." +
+            " Works even when multiple subdirectories are being written at the same time while listing is running." +
+            " However an additional DistributedMapCache controller service is required and more JVM heap memory is used." +
+            " For more information on how the 'Entity Tracking Time Window' property works, see the description.");
+
+    public static final PropertyDescriptor LISTING_STRATEGY = new PropertyDescriptor.Builder()
+        .name("listing-strategy")
+        .displayName("Listing Strategy")
+        .description("Specify how to determine new/updated entities. See each strategy descriptions for detail.")
+        .required(true)
+        .allowableValues(BY_TIMESTAMPS, BY_ENTITIES)
+        .defaultValue(BY_TIMESTAMPS.getValue())
+        .build();
+
+    public static final PropertyDescriptor TRACKING_STATE_CACHE = new PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(ListedEntityTracker.TRACKING_STATE_CACHE)
+        .dependsOn(LISTING_STRATEGY, BY_ENTITIES)
+        .required(true)
+        .build();
+
+    public static final PropertyDescriptor INITIAL_LISTING_TARGET = new PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(ListedEntityTracker.INITIAL_LISTING_TARGET)
+        .dependsOn(LISTING_STRATEGY, BY_ENTITIES)
+        .build();
+
+    public static final PropertyDescriptor TRACKING_TIME_WINDOW = new PropertyDescriptor.Builder()
+        .fromPropertyDescriptor(ListedEntityTracker.TRACKING_TIME_WINDOW)
+        .dependsOn(INITIAL_LISTING_TARGET, ListedEntityTracker.INITIAL_LISTING_TARGET_WINDOW)
+        .required(true)
+        .build();
+
     public static final PropertyDescriptor BUCKET = new PropertyDescriptor
             .Builder().name("gcs-bucket")
             .displayName("Bucket")
@@ -201,6 +248,10 @@ public class ListGCSBucket extends AbstractGCSProcessor {
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return ImmutableList.<PropertyDescriptor>builder()
+            .add(LISTING_STRATEGY)
+            .add(TRACKING_STATE_CACHE)
+            .add(INITIAL_LISTING_TARGET)
+            .add(TRACKING_TIME_WINDOW)
             .add(BUCKET)
             .add(RECORD_WRITER)
             .addAll(super.getSupportedPropertyDescriptors())
@@ -222,6 +273,39 @@ public class ListGCSBucket extends AbstractGCSProcessor {
     private volatile long currentTimestamp = 0L;
     private final Set<String> currentKeys = Collections.synchronizedSet(new HashSet<>());
 
+    private volatile boolean justElectedPrimaryNode = false;
+    private volatile boolean resetEntityTrackingState = false;
+    private volatile ListedEntityTracker<ListableBlob> listedEntityTracker;
+
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
+        justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE);
+    }
+
+    @OnScheduled
+    public void initListedEntityTracker(ProcessContext context) {
+        final boolean isTrackingEntityStrategy = BY_ENTITIES.getValue().equals(context.getProperty(LISTING_STRATEGY).getValue());
+        if (listedEntityTracker != null && (resetEntityTrackingState || !isTrackingEntityStrategy)) {
+            try {
+                listedEntityTracker.clearListedEntities();
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to reset previously listed entities due to " + e, e);
+            }
+        }
+        resetEntityTrackingState = false;
+
+        if (isTrackingEntityStrategy) {
+            if (listedEntityTracker == null) {
+                listedEntityTracker = createListedEntityTracker();
+            }
+        } else {
+            listedEntityTracker = null;
+        }
+    }
+
+    protected ListedEntityTracker<ListableBlob> createListedEntityTracker() {
+        return new ListedBlobTracker();
+    }
 
     private Set<String> extractKeys(final StateMap stateMap) {
         return stateMap.toMap().entrySet().parallelStream()
@@ -303,7 +387,19 @@ public class ListGCSBucket extends AbstractGCSProcessor {
     }
 
     @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+    public void onTrigger(final ProcessContext context, final ProcessSession session) {
+        final String listingStrategy = context.getProperty(LISTING_STRATEGY).getValue();
+
+        if (BY_TIMESTAMPS.equals(listingStrategy)) {
+            listByTrackingTimestamps(context, session);
+        } else if (BY_ENTITIES.equals(listingStrategy)) {
+            listByTrackingEntities(context, session);
+        } else {
+            throw new ProcessException("Unknown listing strategy: " + listingStrategy);
+        }
+    }
+
+    private void listByTrackingTimestamps(ProcessContext context, ProcessSession session) {
         try {
             restoreState(session);
         } catch (IOException e) {
@@ -331,16 +427,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
 
     private void listBucket(final ProcessContext context, final ListingAction listingAction) throws IOException, SchemaNotFoundException {
         final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
-        final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
-        final boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();
-
-        final List<Storage.BlobListOption> listOptions = new ArrayList<>();
-        if (prefix != null) {
-            listOptions.add(Storage.BlobListOption.prefix(prefix));
-        }
-        if (useGenerations) {
-            listOptions.add(Storage.BlobListOption.versions(true));
-        }
+        final List<Storage.BlobListOption> listOptions = getBlobListOptions(context);
 
         final Storage storage = listingAction.getCloudService();
 
@@ -388,6 +475,51 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         listingAction.finishListing(listCount, maxTimestamp, keysMatchingTimestamp);
     }
 
+    private List<Storage.BlobListOption> getBlobListOptions(ProcessContext context) {
+        final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+        final boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();
+
+        final List<Storage.BlobListOption> listOptions = new ArrayList<>();
+
+        if (prefix != null) {
+            listOptions.add(Storage.BlobListOption.prefix(prefix));
+        }
+        if (useGenerations) {
+            listOptions.add(Storage.BlobListOption.versions(true));
+        }
+
+        return listOptions;
+    }
+
+    private void listByTrackingEntities(ProcessContext context, ProcessSession session) {
+        listedEntityTracker.trackEntities(context, session, justElectedPrimaryNode, Scope.CLUSTER, minTimestampToList -> {
+            List<ListableBlob> listedEntities = new ArrayList<>();
+
+            Storage storage = getCloudService();
+            String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+            final List<Storage.BlobListOption> listOptions = getBlobListOptions(context);
+
+            Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[0]));
+            int pageNr=0;
+            do {
+                for (final Blob blob : blobPage.getValues()) {
+                    if (blob.getUpdateTime() >= minTimestampToList) {
+                        listedEntities.add(new ListableBlob(
+                            blob,
+                            pageNr
+                        ));
+                    }
+                }
+                blobPage = blobPage.getNextPage();
+                pageNr++;
+            } while (blobPage != null);
+
+            return listedEntities;
+        }, null);
+
+        justElectedPrimaryNode = false;
+    }
+
     private void commit(final ProcessSession session, final int listCount) {
         if (listCount > 0) {
             getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
@@ -461,6 +593,87 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         }
     }
 
+    protected class ListedBlobTracker extends ListedEntityTracker<ListableBlob> {
+        public ListedBlobTracker() {
+            super(getIdentifier(), getLogger(), RecordBlobWriter.RECORD_SCHEMA);
+        }
+
+        @Override
+        protected void createRecordsForEntities(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities) throws IOException, SchemaNotFoundException {
+            publishListing(context, session, updatedEntities);
+        }
+
+        @Override
+        protected void createFlowFilesForEntities(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities, Function<ListableBlob, Map<String, String>> createAttributes) {
+            publishListing(context, session, updatedEntities);
+        }
+
+        private void publishListing(ProcessContext context, ProcessSession session, List<ListableBlob> updatedEntities) {
+            final BlobWriter writer;
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+            if (writerFactory == null) {
+                writer = new AttributeBlobWriter(session);
+            } else {
+                writer = new RecordBlobWriter(session, writerFactory, getLogger());
+            }
+
+            try {
+                writer.beginListing();
+
+                int listCount = 0;
+                int pageNr = -1;
+                for (ListableBlob listableBlob : updatedEntities) {
+                    Blob blob = listableBlob.getRawEntity();
+                    int currentPageNr = listableBlob.getPageNr();
+
+                    writer.addToListing(blob);
+
+                    listCount++;
+
+                    if (pageNr != -1 && pageNr != currentPageNr && writer.isCheckpoint()) {
+                        commit(session, listCount);
+                        listCount = 0;
+                    }
+
+                    pageNr = currentPageNr;
+
+                    final ListedEntity listedEntity = new ListedEntity(listableBlob.getTimestamp(), listableBlob.getSize());
+                    alreadyListedEntities.put(listableBlob.getIdentifier(), listedEntity);
+                }
+
+                writer.finishListing();
+            } catch (final Exception e) {
+                getLogger().error("Failed to list contents of bucket due to {}", new Object[] {e}, e);
+                writer.finishListingExceptionally(e);
+                session.rollback();
+                context.yield();
+                return;
+            }
+        }
+    }
+
+    private static class ListableBlob extends ListableEntityWrapper<Blob> {
+        private final int pageNr;
+
+        public ListableBlob(
+            Blob blob,
+            int pageNr
+        ) {
+            super(
+                blob,
+                Blob::getName,
+                Blob::getGeneratedId,
+                Blob::getUpdateTime,
+                Blob::getSize
+            );
+            this.pageNr = pageNr;
+        }
+
+        public int getPageNr() {
+            return pageNr;
+        }
+    }
+
     private class VerifyListingAction implements ListingAction<CountingBlobWriter> {
         final ProcessContext context;
         final CountingBlobWriter blobWriter;