You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/12/13 18:43:07 UTC

[nifi] 03/22: NIFI-9441: Ensure that we only update our member variable for the latest timestamp after processing all objects within the GCS Bucket

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit eb108cd3830578b6aa6c3cb7e4a27c9eb4620b47
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Dec 3 12:51:16 2021 -0500

    NIFI-9441: Ensure that we only update our member variable for the latest timestamp after processing all objects within the GCS Bucket
---
 .../nifi/processors/gcp/storage/ListGCSBucket.java       | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index f9975ef..ede0420 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -331,7 +331,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
                 }
 
                 if (writer.isCheckpoint()) {
-                    commit(session, listCount, maxTimestamp, keysMatchingTimestamp);
+                    commit(session, listCount);
                     listCount = 0;
                 }
 
@@ -344,7 +344,12 @@ public class ListGCSBucket extends AbstractGCSProcessor {
                 getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", bucket);
                 context.yield();
             } else {
-                commit(session, listCount, maxTimestamp, keysMatchingTimestamp);
+                commit(session, listCount);
+
+                currentTimestamp = maxTimestamp;
+                currentKeys.clear();
+                currentKeys.addAll(keysMatchingTimestamp);
+                persistState(session, currentTimestamp, currentKeys);
             }
         } catch (final Exception e) {
             getLogger().error("Failed to list contents of GCS Bucket due to {}", new Object[] {e}, e);
@@ -358,13 +363,8 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
     }
 
-    private void commit(final ProcessSession session, final int listCount, final long timestamp, final Set<String> keysMatchingTimestamp) {
+    private void commit(final ProcessSession session, final int listCount) {
         if (listCount > 0) {
-            currentTimestamp = timestamp;
-            currentKeys.clear();
-            currentKeys.addAll(keysMatchingTimestamp);
-            persistState(session, currentTimestamp, currentKeys);
-
             getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
             session.commitAsync();
         }