You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jw...@apache.org on 2018/11/03 21:30:55 UTC

[1/2] nifi git commit: NIFI-4715: ListS3 produces duplicates in frequently updated buckets

Repository: nifi
Updated Branches:
  refs/heads/master 2812fe60a -> 37a0e1b30


NIFI-4715: ListS3 produces duplicates in frequently updated buckets

Keep totalListCount, reduce unnecessary persistState

This closes #2361.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0a014dcd
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0a014dcd
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0a014dcd

Branch: refs/heads/master
Commit: 0a014dcdb13e30084e6378c14f8c8e5568493c33
Parents: 2812fe6
Author: Adam Lamar <ad...@gmail.com>
Authored: Sat Dec 23 20:29:02 2017 -0700
Committer: James Wing <jv...@gmail.com>
Committed: Sat Nov 3 14:24:21 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/nifi/processors/aws/s3/ListS3.java | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0a014dcd/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index f5a69ac..fc3260c 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -229,7 +229,8 @@ public class ListS3 extends AbstractS3Processor {
 
         final AmazonS3 client = getClient();
         int listCount = 0;
-        long maxTimestamp = 0L;
+        int totalListCount = 0;
+        long maxTimestamp = currentTimestamp;
         String delimiter = context.getProperty(DELIMITER).getValue();
         String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
 
@@ -298,18 +299,19 @@ public class ListS3 extends AbstractS3Processor {
             }
             bucketLister.setNextMarker();
 
+            totalListCount += listCount;
             commit(context, session, listCount);
             listCount = 0;
         } while (bucketLister.isTruncated());
+
+        // Update stateManger with the most recent timestamp
         currentTimestamp = maxTimestamp;
+        persistState(context);
 
         final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
         getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis});
 
-        if (!commit(context, session, listCount)) {
-            if (currentTimestamp > 0) {
-                persistState(context);
-            }
+        if (totalListCount == 0) {
             getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket});
             context.yield();
         }
@@ -320,7 +322,6 @@ public class ListS3 extends AbstractS3Processor {
         if (willCommit) {
             getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount});
             session.commit();
-            persistState(context);
         }
         return willCommit;
     }


[2/2] nifi git commit: NIFI-4715: Update currentKeys after listing loop

Posted by jw...@apache.org.
NIFI-4715: Update currentKeys after listing loop

    ListS3 used to update currentKeys within listing loop, that causes
    duplicates. Because S3 returns object list in lexicographic order, if we
    clear currentKeys during the loop, we cannot tell if the object has been
    listed or not, in a case where newer object has a lexicographically
    former name.

Signed-off-by: James Wing <jv...@gmail.com>

This closes #3116, closes #2361.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/37a0e1b3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/37a0e1b3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/37a0e1b3

Branch: refs/heads/master
Commit: 37a0e1b3048b5db067b6485bb437887cb0869888
Parents: 0a014dc
Author: Koji Kawamura <ij...@apache.org>
Authored: Wed Oct 31 16:01:36 2018 +0900
Committer: James Wing <jv...@gmail.com>
Committed: Sat Nov 3 14:26:00 2018 -0700

----------------------------------------------------------------------
 .../apache/nifi/processors/aws/s3/ListS3.java   | 32 ++++++++++++++------
 1 file changed, 23 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/37a0e1b3/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index fc3260c..d3bade9 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -230,7 +230,7 @@ public class ListS3 extends AbstractS3Processor {
         final AmazonS3 client = getClient();
         int listCount = 0;
         int totalListCount = 0;
-        long maxTimestamp = currentTimestamp;
+        long latestListedTimestampInThisCycle = currentTimestamp;
         String delimiter = context.getProperty(DELIMITER).getValue();
         String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
 
@@ -252,6 +252,9 @@ public class ListS3 extends AbstractS3Processor {
         }
 
         VersionListing versionListing;
+        final Set<String> listedKeys = new HashSet<>();
+        getLogger().trace("Start listing, listingTimestamp={}, currentTimestamp={}, currentKeys={}", new Object[]{listingTimestamp, currentTimestamp, currentKeys});
+
         do {
             versionListing = bucketLister.listVersions();
             for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
@@ -262,6 +265,8 @@ public class ListS3 extends AbstractS3Processor {
                     continue;
                 }
 
+                getLogger().trace("Listed key={}, lastModified={}, currentKeys={}", new Object[]{versionSummary.getKey(), lastModified, currentKeys});
+
                 // Create the attributes
                 final Map<String, String> attributes = new HashMap<>();
                 attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey());
@@ -287,14 +292,17 @@ public class ListS3 extends AbstractS3Processor {
                 flowFile = session.putAllAttributes(flowFile, attributes);
                 session.transfer(flowFile, REL_SUCCESS);
 
-                // Update state
-                if (lastModified > maxTimestamp) {
-                    maxTimestamp = lastModified;
-                    currentKeys.clear();
-                }
-                if (lastModified == maxTimestamp) {
-                    currentKeys.add(versionSummary.getKey());
+                // Track the latest lastModified timestamp and keys having that timestamp.
+                // NOTE: Amazon S3 lists objects in UTF-8 character encoding in lexicographical order. Not ordered by timestamps.
+                if (lastModified > latestListedTimestampInThisCycle) {
+                    latestListedTimestampInThisCycle = lastModified;
+                    listedKeys.clear();
+                    listedKeys.add(versionSummary.getKey());
+
+                } else if (lastModified == latestListedTimestampInThisCycle) {
+                    listedKeys.add(versionSummary.getKey());
                 }
+
                 listCount++;
             }
             bucketLister.setNextMarker();
@@ -304,8 +312,14 @@ public class ListS3 extends AbstractS3Processor {
             listCount = 0;
         } while (bucketLister.isTruncated());
 
+        // Update currentKeys.
+        if (latestListedTimestampInThisCycle > currentTimestamp) {
+            currentKeys.clear();
+        }
+        currentKeys.addAll(listedKeys);
+
         // Update stateManger with the most recent timestamp
-        currentTimestamp = maxTimestamp;
+        currentTimestamp = latestListedTimestampInThisCycle;
         persistState(context);
 
         final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);