You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/12/04 22:49:38 UTC
[nifi] branch main updated: NIFI-9441: Ensure that we only update our member variable for the latest timestamp after processing all objects within the GCS Bucket
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 7a83c32 NIFI-9441: Ensure that we only update our member variable for the latest timestamp after processing all objects within the GCS Bucket
7a83c32 is described below
commit 7a83c32a01a55c62918efaad2597cdb281499422
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Dec 3 12:51:16 2021 -0500
NIFI-9441: Ensure that we only update our member variable for the latest timestamp after processing all objects within the GCS Bucket
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #5567.
---
.../nifi/processors/gcp/storage/ListGCSBucket.java | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index f9975ef..ede0420 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -331,7 +331,7 @@ public class ListGCSBucket extends AbstractGCSProcessor {
}
if (writer.isCheckpoint()) {
- commit(session, listCount, maxTimestamp, keysMatchingTimestamp);
+ commit(session, listCount);
listCount = 0;
}
@@ -344,7 +344,12 @@ public class ListGCSBucket extends AbstractGCSProcessor {
getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", bucket);
context.yield();
} else {
- commit(session, listCount, maxTimestamp, keysMatchingTimestamp);
+ commit(session, listCount);
+
+ currentTimestamp = maxTimestamp;
+ currentKeys.clear();
+ currentKeys.addAll(keysMatchingTimestamp);
+ persistState(session, currentTimestamp, currentKeys);
}
} catch (final Exception e) {
getLogger().error("Failed to list contents of GCS Bucket due to {}", new Object[] {e}, e);
@@ -358,13 +363,8 @@ public class ListGCSBucket extends AbstractGCSProcessor {
getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
}
- private void commit(final ProcessSession session, final int listCount, final long timestamp, final Set<String> keysMatchingTimestamp) {
+ private void commit(final ProcessSession session, final int listCount) {
if (listCount > 0) {
- currentTimestamp = timestamp;
- currentKeys.clear();
- currentKeys.addAll(keysMatchingTimestamp);
- persistState(session, currentTimestamp, currentKeys);
-
getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
session.commitAsync();
}