You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jw...@apache.org on 2016/09/03 22:13:50 UTC

nifi git commit: NIFI-2631: Adding page commits and 'Use Versions' to ListS3

Repository: nifi
Updated Branches:
  refs/heads/0.x 732bf6ed3 -> 3e70fbde3


NIFI-2631: Adding page commits and 'Use Versions' to ListS3

This closes #916.

Signed-off-by: James Wing <jv...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3e70fbde
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3e70fbde
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3e70fbde

Branch: refs/heads/0.x
Commit: 3e70fbde3688c18330d07bddab5c8e9e75de51dd
Parents: 732bf6e
Author: Joe Gresock <jo...@lmco.com>
Authored: Tue Aug 23 12:46:12 2016 +0000
Committer: James Wing <jv...@gmail.com>
Committed: Sat Sep 3 15:11:35 2016 -0700

----------------------------------------------------------------------
 .../apache/nifi/processors/aws/s3/ListS3.java   | 223 ++++++++++++++++---
 .../nifi/processors/aws/s3/AbstractS3IT.java    |   6 +-
 .../apache/nifi/processors/aws/s3/ITListS3.java |  24 +-
 3 files changed, 213 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3e70fbde/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index bc14925..29f771b 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -16,10 +16,16 @@
  */
 package org.apache.nifi.processors.aws.s3;
 
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.ListObjectsRequest;
-import com.amazonaws.services.s3.model.ObjectListing;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.Stateful;
@@ -40,15 +46,13 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListVersionsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.S3VersionSummary;
+import com.amazonaws.services.s3.model.VersionListing;
 
 @TriggerSerially
 @TriggerWhenEmpty
@@ -66,9 +70,11 @@ import java.util.concurrent.TimeUnit;
         @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
         @WritesAttribute(attribute = "filename", description = "The name of the file"),
         @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
+        @WritesAttribute(attribute = "s3.isLatest", description = "A boolean indicating if this is the latest version of the object"),
         @WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"),
         @WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"),
-        @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),})
+        @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),
+        @WritesAttribute(attribute = "s3.version", description = "The version of the object, if applicable")})
 @SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
 public class ListS3 extends AbstractS3Processor {
 
@@ -91,10 +97,21 @@ public class ListS3 extends AbstractS3Processor {
             .description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
             .build();
 
+    public static final PropertyDescriptor USE_VERSIONS = new PropertyDescriptor.Builder()
+            .name("use-versions")
+            .displayName("Use Versions")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .description("Specifies whether to use S3 versions, if applicable.  If false, only the latest version of each object will be returned.")
+            .build();
+
     public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
             Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
                     AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
-                    PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX));
+                    PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS));
 
     public static final Set<Relationship> relationships = Collections.unmodifiableSet(
             new HashSet<>(Collections.singletonList(REL_SUCCESS)));
@@ -171,35 +188,46 @@ public class ListS3 extends AbstractS3Processor {
         String delimiter = context.getProperty(DELIMITER).getValue();
         String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
 
-        ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket);
+        boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
+
+        S3BucketLister bucketLister = useVersions
+                ? new S3VersionBucketLister(client)
+                : new S3ObjectBucketLister(client);
+
+        bucketLister.setBucketName(bucket);
+
         if (delimiter != null && !delimiter.isEmpty()) {
-            listObjectsRequest.setDelimiter(delimiter);
+            bucketLister.setDelimiter(delimiter);
         }
         if (prefix != null && !prefix.isEmpty()) {
-            listObjectsRequest.setPrefix(prefix);
+            bucketLister.setPrefix(prefix);
         }
 
-        ObjectListing objectListing;
+        VersionListing versionListing;
         do {
-            objectListing = client.listObjects(listObjectsRequest);
-            for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
-                long lastModified = objectSummary.getLastModified().getTime();
+            versionListing = bucketLister.listVersions();
+            for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
+                long lastModified = versionSummary.getLastModified().getTime();
                 if (lastModified < currentTimestamp
-                        || lastModified == currentTimestamp && currentKeys.contains(objectSummary.getKey())) {
+                        || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())) {
                     continue;
                 }
 
                 // Create the attributes
                 final Map<String, String> attributes = new HashMap<>();
-                attributes.put(CoreAttributes.FILENAME.key(), objectSummary.getKey());
-                attributes.put("s3.bucket", objectSummary.getBucketName());
-                if (objectSummary.getOwner() != null) { // We may not have permission to read the owner
-                    attributes.put("s3.owner", objectSummary.getOwner().getId());
+                attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey());
+                attributes.put("s3.bucket", versionSummary.getBucketName());
+                if (versionSummary.getOwner() != null) { // We may not have permission to read the owner
+                    attributes.put("s3.owner", versionSummary.getOwner().getId());
                 }
-                attributes.put("s3.etag", objectSummary.getETag());
+                attributes.put("s3.etag", versionSummary.getETag());
                 attributes.put("s3.lastModified", String.valueOf(lastModified));
-                attributes.put("s3.length", String.valueOf(objectSummary.getSize()));
-                attributes.put("s3.storeClass", objectSummary.getStorageClass());
+                attributes.put("s3.length", String.valueOf(versionSummary.getSize()));
+                attributes.put("s3.storeClass", versionSummary.getStorageClass());
+                attributes.put("s3.isLatest", String.valueOf(versionSummary.isLatest()));
+                if (versionSummary.getVersionId() != null) {
+                    attributes.put("s3.version", versionSummary.getVersionId());
+                }
 
                 // Create the flowfile
                 FlowFile flowFile = session.create();
@@ -212,24 +240,145 @@ public class ListS3 extends AbstractS3Processor {
                     currentKeys.clear();
                 }
                 if (lastModified == maxTimestamp) {
-                    currentKeys.add(objectSummary.getKey());
+                    currentKeys.add(versionSummary.getKey());
                 }
                 listCount++;
             }
-            listObjectsRequest.setMarker(objectListing.getNextMarker());
-        } while (objectListing.isTruncated());
+            bucketLister.setNextMarker();
+
+            commit(context, session, listCount);
+            listCount = 0;
+        } while (bucketLister.isTruncated());
         currentTimestamp = maxTimestamp;
 
         final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
         getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis});
 
-        if (listCount > 0) {
+        if (!commit(context, session, listCount)) {
+            if (currentTimestamp > 0) {
+                persistState(context);
+            }
+            getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket});
+            context.yield();
+        }
+    }
+
+    private boolean commit(final ProcessContext context, final ProcessSession session, int listCount) {
+        boolean willCommit = listCount > 0;
+        if (willCommit) {
             getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount});
             session.commit();
             persistState(context);
-        } else {
-            getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket});
-            context.yield();
+        }
+        return willCommit;
+    }
+
+    private interface S3BucketLister {
+        public void setBucketName(String bucketName);
+        public void setPrefix(String prefix);
+        public void setDelimiter(String delimiter);
+        // Versions have a superset of the fields that Objects have, so we'll use
+        // them as a common interface
+        public VersionListing listVersions();
+        public void setNextMarker();
+        public boolean isTruncated();
+    }
+
+    public class S3ObjectBucketLister implements S3BucketLister {
+        private AmazonS3 client;
+        private ListObjectsRequest listObjectsRequest;
+        private ObjectListing objectListing;
+
+        public S3ObjectBucketLister(AmazonS3 client) {
+            this.client = client;
+        }
+
+        @Override
+        public void setBucketName(String bucketName) {
+            listObjectsRequest = new ListObjectsRequest().withBucketName(bucketName);
+        }
+
+        @Override
+        public void setPrefix(String prefix) {
+            listObjectsRequest.setPrefix(prefix);
+        }
+
+        @Override
+        public void setDelimiter(String delimiter) {
+            listObjectsRequest.setDelimiter(delimiter);
+        }
+
+        @Override
+        public VersionListing listVersions() {
+            VersionListing versionListing = new VersionListing();
+            this.objectListing = client.listObjects(listObjectsRequest);
+            for(S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
+                S3VersionSummary versionSummary = new S3VersionSummary();
+                versionSummary.setBucketName(objectSummary.getBucketName());
+                versionSummary.setETag(objectSummary.getETag());
+                versionSummary.setKey(objectSummary.getKey());
+                versionSummary.setLastModified(objectSummary.getLastModified());
+                versionSummary.setOwner(objectSummary.getOwner());
+                versionSummary.setSize(objectSummary.getSize());
+                versionSummary.setStorageClass(objectSummary.getStorageClass());
+                versionSummary.setIsLatest(true);
+
+                versionListing.getVersionSummaries().add(versionSummary);
+            }
+
+            return versionListing;
+        }
+
+        @Override
+        public void setNextMarker() {
+            listObjectsRequest.setMarker(objectListing.getNextMarker());
+        }
+
+        @Override
+        public boolean isTruncated() {
+            return (objectListing == null) ? false : objectListing.isTruncated();
+        }
+    }
+
+    public class S3VersionBucketLister implements S3BucketLister {
+        private AmazonS3 client;
+        private ListVersionsRequest listVersionsRequest;
+        private VersionListing versionListing;
+
+        public S3VersionBucketLister(AmazonS3 client) {
+            this.client = client;
+        }
+
+        @Override
+        public void setBucketName(String bucketName) {
+            listVersionsRequest = new ListVersionsRequest().withBucketName(bucketName);
+        }
+
+        @Override
+        public void setPrefix(String prefix) {
+            listVersionsRequest.setPrefix(prefix);
+        }
+
+        @Override
+        public void setDelimiter(String delimiter) {
+            listVersionsRequest.setDelimiter(delimiter);
+        }
+
+        @Override
+        public VersionListing listVersions() {
+            versionListing = client.listVersions(listVersionsRequest);
+            return versionListing;
+        }
+
+        @Override
+        public void setNextMarker() {
+            listVersionsRequest.setKeyMarker(versionListing.getNextKeyMarker());
+            listVersionsRequest.setVersionIdMarker(versionListing.getNextVersionIdMarker());
+        }
+
+        @Override
+        public boolean isTruncated() {
+            return (versionListing == null) ? false : versionListing.isTruncated();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3e70fbde/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
index 95b4ba8..0ecc33e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
@@ -52,7 +52,7 @@ import static org.junit.Assert.fail;
 public abstract class AbstractS3IT {
     protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
     protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt";
-    protected final static String REGION = "us-west-1";
+    protected final static String REGION = System.getProperty("it.aws.region", "us-west-1");
     // Adding REGION to bucket prevents errors of
     //      "A conflicting conditional operation is currently in progress against this resource."
     // when bucket is rapidly added/deleted and consistency propogation causes this error.
@@ -82,7 +82,9 @@ public abstract class AbstractS3IT {
                 fail("Bucket " + BUCKET_NAME + " exists. Choose a different bucket name to continue test");
             }
 
-            CreateBucketRequest request = new CreateBucketRequest(BUCKET_NAME, REGION);
+            CreateBucketRequest request = REGION.contains("east")
+                    ? new CreateBucketRequest(BUCKET_NAME) // See https://github.com/boto/boto3/issues/125
+                    : new CreateBucketRequest(BUCKET_NAME, REGION);
             client.createBucket(request);
 
         } catch (final AmazonS3Exception e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/3e70fbde/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
index 6d77eb6..9370022 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
@@ -125,10 +125,31 @@ public class ITListS3 extends AbstractS3IT {
     }
 
     @Test
+    public void testSimpleListWithPrefixAndVersions() throws Throwable {
+        putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+        putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+        putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+
+        final TestRunner runner = TestRunners.newTestRunner(new ListS3());
+
+        runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(ListS3.REGION, REGION);
+        runner.setProperty(ListS3.BUCKET, BUCKET_NAME);
+        runner.setProperty(ListS3.PREFIX, "b/");
+        runner.setProperty(ListS3.USE_VERSIONS, "true");
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
+        flowFiles.get(0).assertAttributeEquals("filename", "b/c");
+    }
+
+    @Test
     public void testGetPropertyDescriptors() throws Exception {
         ListS3 processor = new ListS3();
         List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
-        assertEquals("size should be eq", 13, pd.size());
+        assertEquals("size should be eq", 14, pd.size());
         assertTrue(pd.contains(ListS3.ACCESS_KEY));
         assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
         assertTrue(pd.contains(ListS3.BUCKET));
@@ -142,5 +163,6 @@ public class ITListS3 extends AbstractS3IT {
         assertTrue(pd.contains(ListS3.PROXY_HOST_PORT));
         assertTrue(pd.contains(ListS3.DELIMITER));
         assertTrue(pd.contains(ListS3.PREFIX));
+        assertTrue(pd.contains(ListS3.USE_VERSIONS));
     }
 }