You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jw...@apache.org on 2016/09/03 22:13:50 UTC
nifi git commit: NIFI-2631: Adding page commits and 'Use Versions' to
ListS3
Repository: nifi
Updated Branches:
refs/heads/0.x 732bf6ed3 -> 3e70fbde3
NIFI-2631: Adding page commits and 'Use Versions' to ListS3
This closes #916.
Signed-off-by: James Wing <jv...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3e70fbde
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3e70fbde
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3e70fbde
Branch: refs/heads/0.x
Commit: 3e70fbde3688c18330d07bddab5c8e9e75de51dd
Parents: 732bf6e
Author: Joe Gresock <jo...@lmco.com>
Authored: Tue Aug 23 12:46:12 2016 +0000
Committer: James Wing <jv...@gmail.com>
Committed: Sat Sep 3 15:11:35 2016 -0700
----------------------------------------------------------------------
.../apache/nifi/processors/aws/s3/ListS3.java | 223 ++++++++++++++++---
.../nifi/processors/aws/s3/AbstractS3IT.java | 6 +-
.../apache/nifi/processors/aws/s3/ITListS3.java | 24 +-
3 files changed, 213 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/3e70fbde/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index bc14925..29f771b 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -16,10 +16,16 @@
*/
package org.apache.nifi.processors.aws.s3;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.ListObjectsRequest;
-import com.amazonaws.services.s3.model.ObjectListing;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
@@ -40,15 +46,13 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListVersionsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.S3VersionSummary;
+import com.amazonaws.services.s3.model.VersionListing;
@TriggerSerially
@TriggerWhenEmpty
@@ -66,9 +70,11 @@ import java.util.concurrent.TimeUnit;
@WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
@WritesAttribute(attribute = "filename", description = "The name of the file"),
@WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"),
+ @WritesAttribute(attribute = "s3.isLatest", description = "A boolean indicating if this is the latest version of the object"),
@WritesAttribute(attribute = "s3.lastModified", description = "The last modified time in milliseconds since epoch in UTC time"),
@WritesAttribute(attribute = "s3.length", description = "The size of the object in bytes"),
- @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),})
+ @WritesAttribute(attribute = "s3.storeClass", description = "The storage class of the object"),
+ @WritesAttribute(attribute = "s3.version", description = "The version of the object, if applicable")})
@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
public class ListS3 extends AbstractS3Processor {
@@ -91,10 +97,21 @@ public class ListS3 extends AbstractS3Processor {
.description("The prefix used to filter the object list. In most cases, it should end with a forward slash ('/').")
.build();
+ public static final PropertyDescriptor USE_VERSIONS = new PropertyDescriptor.Builder()
+ .name("use-versions")
+ .displayName("Use Versions")
+ .expressionLanguageSupported(false)
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .description("Specifies whether to use S3 versions, if applicable. If false, only the latest version of each object will be returned.")
+ .build();
+
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
- PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX));
+ PROXY_HOST, PROXY_HOST_PORT, DELIMITER, PREFIX, USE_VERSIONS));
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Collections.singletonList(REL_SUCCESS)));
@@ -171,35 +188,46 @@ public class ListS3 extends AbstractS3Processor {
String delimiter = context.getProperty(DELIMITER).getValue();
String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
- ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(bucket);
+ boolean useVersions = context.getProperty(USE_VERSIONS).asBoolean();
+
+ S3BucketLister bucketLister = useVersions
+ ? new S3VersionBucketLister(client)
+ : new S3ObjectBucketLister(client);
+
+ bucketLister.setBucketName(bucket);
+
if (delimiter != null && !delimiter.isEmpty()) {
- listObjectsRequest.setDelimiter(delimiter);
+ bucketLister.setDelimiter(delimiter);
}
if (prefix != null && !prefix.isEmpty()) {
- listObjectsRequest.setPrefix(prefix);
+ bucketLister.setPrefix(prefix);
}
- ObjectListing objectListing;
+ VersionListing versionListing;
do {
- objectListing = client.listObjects(listObjectsRequest);
- for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
- long lastModified = objectSummary.getLastModified().getTime();
+ versionListing = bucketLister.listVersions();
+ for (S3VersionSummary versionSummary : versionListing.getVersionSummaries()) {
+ long lastModified = versionSummary.getLastModified().getTime();
if (lastModified < currentTimestamp
- || lastModified == currentTimestamp && currentKeys.contains(objectSummary.getKey())) {
+ || lastModified == currentTimestamp && currentKeys.contains(versionSummary.getKey())) {
continue;
}
// Create the attributes
final Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.FILENAME.key(), objectSummary.getKey());
- attributes.put("s3.bucket", objectSummary.getBucketName());
- if (objectSummary.getOwner() != null) { // We may not have permission to read the owner
- attributes.put("s3.owner", objectSummary.getOwner().getId());
+ attributes.put(CoreAttributes.FILENAME.key(), versionSummary.getKey());
+ attributes.put("s3.bucket", versionSummary.getBucketName());
+ if (versionSummary.getOwner() != null) { // We may not have permission to read the owner
+ attributes.put("s3.owner", versionSummary.getOwner().getId());
}
- attributes.put("s3.etag", objectSummary.getETag());
+ attributes.put("s3.etag", versionSummary.getETag());
attributes.put("s3.lastModified", String.valueOf(lastModified));
- attributes.put("s3.length", String.valueOf(objectSummary.getSize()));
- attributes.put("s3.storeClass", objectSummary.getStorageClass());
+ attributes.put("s3.length", String.valueOf(versionSummary.getSize()));
+ attributes.put("s3.storeClass", versionSummary.getStorageClass());
+ attributes.put("s3.isLatest", String.valueOf(versionSummary.isLatest()));
+ if (versionSummary.getVersionId() != null) {
+ attributes.put("s3.version", versionSummary.getVersionId());
+ }
// Create the flowfile
FlowFile flowFile = session.create();
@@ -212,24 +240,145 @@ public class ListS3 extends AbstractS3Processor {
currentKeys.clear();
}
if (lastModified == maxTimestamp) {
- currentKeys.add(objectSummary.getKey());
+ currentKeys.add(versionSummary.getKey());
}
listCount++;
}
- listObjectsRequest.setMarker(objectListing.getNextMarker());
- } while (objectListing.isTruncated());
+ bucketLister.setNextMarker();
+
+ commit(context, session, listCount);
+ listCount = 0;
+ } while (bucketLister.isTruncated());
currentTimestamp = maxTimestamp;
final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis});
- if (listCount > 0) {
+ if (!commit(context, session, listCount)) {
+ if (currentTimestamp > 0) {
+ persistState(context);
+ }
+ getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket});
+ context.yield();
+ }
+ }
+
+ private boolean commit(final ProcessContext context, final ProcessSession session, int listCount) {
+ boolean willCommit = listCount > 0;
+ if (willCommit) {
getLogger().info("Successfully listed {} new files from S3; routing to success", new Object[] {listCount});
session.commit();
persistState(context);
- } else {
- getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket});
- context.yield();
+ }
+ return willCommit;
+ }
+
+ private interface S3BucketLister {
+ public void setBucketName(String bucketName);
+ public void setPrefix(String prefix);
+ public void setDelimiter(String delimiter);
+ // Versions have a superset of the fields that Objects have, so we'll use
+ // them as a common interface
+ public VersionListing listVersions();
+ public void setNextMarker();
+ public boolean isTruncated();
+ }
+
+ public class S3ObjectBucketLister implements S3BucketLister {
+ private AmazonS3 client;
+ private ListObjectsRequest listObjectsRequest;
+ private ObjectListing objectListing;
+
+ public S3ObjectBucketLister(AmazonS3 client) {
+ this.client = client;
+ }
+
+ @Override
+ public void setBucketName(String bucketName) {
+ listObjectsRequest = new ListObjectsRequest().withBucketName(bucketName);
+ }
+
+ @Override
+ public void setPrefix(String prefix) {
+ listObjectsRequest.setPrefix(prefix);
+ }
+
+ @Override
+ public void setDelimiter(String delimiter) {
+ listObjectsRequest.setDelimiter(delimiter);
+ }
+
+ @Override
+ public VersionListing listVersions() {
+ VersionListing versionListing = new VersionListing();
+ this.objectListing = client.listObjects(listObjectsRequest);
+ for(S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
+ S3VersionSummary versionSummary = new S3VersionSummary();
+ versionSummary.setBucketName(objectSummary.getBucketName());
+ versionSummary.setETag(objectSummary.getETag());
+ versionSummary.setKey(objectSummary.getKey());
+ versionSummary.setLastModified(objectSummary.getLastModified());
+ versionSummary.setOwner(objectSummary.getOwner());
+ versionSummary.setSize(objectSummary.getSize());
+ versionSummary.setStorageClass(objectSummary.getStorageClass());
+ versionSummary.setIsLatest(true);
+
+ versionListing.getVersionSummaries().add(versionSummary);
+ }
+
+ return versionListing;
+ }
+
+ @Override
+ public void setNextMarker() {
+ listObjectsRequest.setMarker(objectListing.getNextMarker());
+ }
+
+ @Override
+ public boolean isTruncated() {
+ return (objectListing == null) ? false : objectListing.isTruncated();
+ }
+ }
+
+ public class S3VersionBucketLister implements S3BucketLister {
+ private AmazonS3 client;
+ private ListVersionsRequest listVersionsRequest;
+ private VersionListing versionListing;
+
+ public S3VersionBucketLister(AmazonS3 client) {
+ this.client = client;
+ }
+
+ @Override
+ public void setBucketName(String bucketName) {
+ listVersionsRequest = new ListVersionsRequest().withBucketName(bucketName);
+ }
+
+ @Override
+ public void setPrefix(String prefix) {
+ listVersionsRequest.setPrefix(prefix);
+ }
+
+ @Override
+ public void setDelimiter(String delimiter) {
+ listVersionsRequest.setDelimiter(delimiter);
+ }
+
+ @Override
+ public VersionListing listVersions() {
+ versionListing = client.listVersions(listVersionsRequest);
+ return versionListing;
+ }
+
+ @Override
+ public void setNextMarker() {
+ listVersionsRequest.setKeyMarker(versionListing.getNextKeyMarker());
+ listVersionsRequest.setVersionIdMarker(versionListing.getNextVersionIdMarker());
+ }
+
+ @Override
+ public boolean isTruncated() {
+ return (versionListing == null) ? false : versionListing.isTruncated();
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3e70fbde/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
index 95b4ba8..0ecc33e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
@@ -52,7 +52,7 @@ import static org.junit.Assert.fail;
public abstract class AbstractS3IT {
protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt";
- protected final static String REGION = "us-west-1";
+ protected final static String REGION = System.getProperty("it.aws.region", "us-west-1");
// Adding REGION to bucket prevents errors of
// "A conflicting conditional operation is currently in progress against this resource."
// when bucket is rapidly added/deleted and consistency propogation causes this error.
@@ -82,7 +82,9 @@ public abstract class AbstractS3IT {
fail("Bucket " + BUCKET_NAME + " exists. Choose a different bucket name to continue test");
}
- CreateBucketRequest request = new CreateBucketRequest(BUCKET_NAME, REGION);
+ CreateBucketRequest request = REGION.contains("east")
+ ? new CreateBucketRequest(BUCKET_NAME) // See https://github.com/boto/boto3/issues/125
+ : new CreateBucketRequest(BUCKET_NAME, REGION);
client.createBucket(request);
} catch (final AmazonS3Exception e) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/3e70fbde/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
index 6d77eb6..9370022 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
@@ -125,10 +125,31 @@ public class ITListS3 extends AbstractS3IT {
}
@Test
+ public void testSimpleListWithPrefixAndVersions() throws Throwable {
+ putTestFile("a", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+ putTestFile("b/c", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+ putTestFile("d/e", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+
+ final TestRunner runner = TestRunners.newTestRunner(new ListS3());
+
+ runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE);
+ runner.setProperty(ListS3.REGION, REGION);
+ runner.setProperty(ListS3.BUCKET, BUCKET_NAME);
+ runner.setProperty(ListS3.PREFIX, "b/");
+ runner.setProperty(ListS3.USE_VERSIONS, "true");
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
+ List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
+ flowFiles.get(0).assertAttributeEquals("filename", "b/c");
+ }
+
+ @Test
public void testGetPropertyDescriptors() throws Exception {
ListS3 processor = new ListS3();
List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors();
- assertEquals("size should be eq", 13, pd.size());
+ assertEquals("size should be eq", 14, pd.size());
assertTrue(pd.contains(ListS3.ACCESS_KEY));
assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(ListS3.BUCKET));
@@ -142,5 +163,6 @@ public class ITListS3 extends AbstractS3IT {
assertTrue(pd.contains(ListS3.PROXY_HOST_PORT));
assertTrue(pd.contains(ListS3.DELIMITER));
assertTrue(pd.contains(ListS3.PREFIX));
+ assertTrue(pd.contains(ListS3.USE_VERSIONS));
}
}