You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/26 05:56:37 UTC

[1/2] nifi git commit: NIFI-1107: Integrate Multipart uploads into the PutS3Object processor * Add nifi-ssl-context-service-nar to nifi-aws-nar pom. * Add nifi-ssl-context-service-api to nifi-aws-processors pom. * Add SSL context service and endpoint ove

Repository: nifi
Updated Branches:
  refs/heads/NIFI-1107 [created] 08e65abd2


NIFI-1107: Integrate Multipart uploads into the PutS3Object processor
* Add nifi-ssl-context-service-nar to nifi-aws-nar pom.
* Add nifi-ssl-context-service-api to nifi-aws-processors pom.
* Add SSL context service and endpoint override properties to AbstractAWSProcessor to support
 non-Amazon S3-compatible endpoints.
* Added Multipart upload API support, multipart threshold and size properties, state tracking for multipart upload,
 and expanded flow file attributes to PutS3Object.
* Updated annotations and property list for FetchS3Object and DeleteS3Object.
* Added region to bucket in AbstractS3Test to minimize AWS propogation issues during testing.
* Expanded tests in TestPutS3Object to validate new multipart logic


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8917df88
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8917df88
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8917df88

Branch: refs/heads/NIFI-1107
Commit: 8917df88a40fcf6ac5b142d490c915a365858d48
Parents: a3cb803
Author: Joe Skora <js...@gmail.com>
Authored: Wed Nov 25 23:24:56 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Wed Nov 25 23:24:56 2015 -0500

----------------------------------------------------------------------
 .../nifi-aws-bundle/nifi-aws-nar/pom.xml        |   5 +
 .../nifi-aws-bundle/nifi-aws-processors/pom.xml |   4 +
 .../processors/aws/AbstractAWSProcessor.java    |  38 +-
 .../nifi/processors/aws/s3/DeleteS3Object.java  |   7 +-
 .../nifi/processors/aws/s3/FetchS3Object.java   |   6 +-
 .../nifi/processors/aws/s3/PutS3Object.java     | 538 ++++++++++++++++++-
 .../nifi/processors/aws/s3/AbstractS3Test.java  |  13 +-
 .../nifi/processors/aws/s3/TestPutS3Object.java | 410 +++++++++++++-
 8 files changed, 962 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
index 13bca24..fb066f0 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
@@ -31,6 +31,11 @@
             <artifactId>nifi-aws-processors</artifactId>
             <version>0.4.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-nar</artifactId>
+            <type>nar</type>
+        </dependency>
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index 1e16614..a86af8a 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -50,6 +50,10 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index 61c3c85..896fa7b 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.amazonaws.http.conn.ssl.SdkTLSSocketFactory;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -47,6 +48,11 @@ import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.auth.PropertiesCredentials;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+
+import static org.apache.commons.lang3.StringUtils.trimToEmpty;
 
 public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractProcessor {
 
@@ -92,6 +98,22 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
             .defaultValue("30 secs")
             .build();
 
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
+            .name("Endpoint Override URL")
+            .description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " +
+                    "The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " +
+                    "the selected endpoint URL, allowing use with other S3-compatible endpoints.")
+            .required(false)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
     private volatile ClientType client;
     private volatile Region region;
 
@@ -146,6 +168,13 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
         config.setConnectionTimeout(commsTimeout);
         config.setSocketTimeout(commsTimeout);
 
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).evaluateAttributeExpressions().asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE);
+            SdkTLSSocketFactory sdkTLSSocketFactory = new SdkTLSSocketFactory(sslContext, null);
+            config.getApacheHttpClientConfig().setSslSocketFactory(sdkTLSSocketFactory);
+        }
+
         return config;
     }
 
@@ -160,10 +189,17 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
             if (region != null) {
                 this.region = Region.getRegion(Regions.fromName(region));
                 client.setRegion(this.region);
-            } else{
+            } else {
                 this.region = null;
             }
         }
+
+        // if the endpoint override has been configured, set the endpoint.
+        // (per Amazon docs this should only be configured at client creation)
+        final String urlstr = trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue());
+        if (!urlstr.isEmpty()) {
+            this.client.setEndpoint(urlstr);
+        }
     }
 
     protected abstract ClientType createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config);

http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index 6b078d1..fc43492 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -40,7 +40,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 
 
 @SupportsBatching
-@SeeAlso({PutS3Object.class})
+@SeeAlso({PutS3Object.class, FetchS3Object.class})
 @Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " +
@@ -56,8 +56,9 @@ public class DeleteS3Object extends AbstractS3Processor {
             .build();
 
     public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
-            Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, TIMEOUT, VERSION_ID,
-                    FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
+            Arrays.asList(BUCKET, KEY, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
+                    REGION, TIMEOUT, VERSION_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST,
+                    WRITE_ACL_LIST, OWNER));
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index d7ec88a..fda194a 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -46,10 +46,10 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.S3Object;
 
 @SupportsBatching
-@SeeAlso({PutS3Object.class})
+@SeeAlso({PutS3Object.class, DeleteS3Object.class})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
-@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile")
+@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile.")
 @WritesAttributes({
     @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
     @WritesAttribute(attribute = "path", description = "The path of the file"),
@@ -73,7 +73,7 @@ public class FetchS3Object extends AbstractS3Processor {
             .build();
 
     public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
-            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT, VERSION_ID));
+            Arrays.asList(BUCKET, KEY, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, REGION, TIMEOUT, VERSION_ID));
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index b03d85d..19bd881 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -17,16 +17,29 @@
 package org.apache.nifi.processors.aws.s3;
 
 import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -40,6 +53,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -55,20 +69,48 @@ import com.amazonaws.services.s3.model.PutObjectResult;
 import com.amazonaws.services.s3.model.StorageClass;
 
 @SupportsBatching
-@SeeAlso({FetchS3Object.class})
+@SeeAlso({FetchS3Object.class, DeleteS3Object.class})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
-@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
-@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", value = "The value of a User-Defined Metadata field to add to the S3 Object",
-    description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true)
+@CapabilityDescription("Uploads FlowFiles to an Amazon S3 Bucket.\n" +
+        "The upload uses either the PutS3Object method or PutS3MultipartUpload methods.  The PutS3Object method " +
+        "send the file in a single synchronous call, but it has a 5GB size limit.  Larger files are sent using the " +
+        "multipart upload methods that initiate, transfer the parts, and complete an upload.  This multipart process " +
+        "saves state after each step so that a large upload can be resumed with minimal loss if the processor or " +
+        "cluster is stopped and restarted.\n" +
+        "A multipart upload consists of three steps\n" +
+        "  1) initiate upload,\n" +
+        "  2) upload the parts, and\n" +
+        "  3) complete the upload.\n" +
+        "For multipart uploads, the processor saves state locally tracking the upload ID and parts uploaded, which " +
+        "must both be provided to complete the upload.\n" +
+        "The AWS libraries select an endpoint URL based on the AWS region, but this can be overridden with the " +
+        "'Endpoint Override URL' property for use with other S3-compatible endpoints.\n" +
+        "The S3 API specifies that the maximum file size for a PutS3Object upload is 5GB. It also requires that " +
+        "parts in a multipart upload must be at least 5MB in size, except for the last part.  These limits are " +
+        "establish the bounds for the Multipart Upload Threshold and Part Size properties.")
+@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object",
+        value = "The value of a User-Defined Metadata field to add to the S3 Object",
+    description = "Allows user-defined metadata to be added to the S3 object as key/value pairs",
+        supportsExpressionLanguage = true)
 @ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
 @WritesAttributes({
+    @WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"),
+    @WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"),
     @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
     @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
-    @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set")
+    @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"),
+    @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of " +
+            "the S3 object, if one is set"),
+    @WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of " +
+            "the S3 object, if any was set")
 })
 public class PutS3Object extends AbstractS3Processor {
 
+    public static final long MIN_S3_PART_SIZE = 50L * 1024L * 1024L;
+    public static final long MAX_S3_PUTOBJECT_SIZE = 5L * 1024L * 1024L * 1024L;
+    public static final String PERSISTENCE_ROOT = "conf/state/";
+
     public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
         .name("Expiration Time Rule")
         .required(false)
@@ -83,9 +125,42 @@ public class PutS3Object extends AbstractS3Processor {
         .defaultValue(StorageClass.Standard.name())
         .build();
 
+    public static final PropertyDescriptor MULTIPART_THRESHOLD = new PropertyDescriptor.Builder()
+            .name("Multipart Threshold")
+            .description("Specifies the file size threshold for switch from the PutS3Object API to the " +
+                    "PutS3MultipartUpload API.  Flow files bigger than this limit will be sent using the stateful " +
+                    "multipart process.\n" +
+                    "The valid range is 50MB to 5GB.")
+            .required(true)
+            .defaultValue("5 GB")
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(MIN_S3_PART_SIZE, MAX_S3_PUTOBJECT_SIZE))
+            .build();
+
+    public static final PropertyDescriptor MULTIPART_PART_SIZE = new PropertyDescriptor.Builder()
+            .name("Multipart Part Size")
+            .description("Specifies the part size for use when the PutS3Multipart Upload API is used.\n" +
+                    "Flow files will be broken into chunks of this size for the upload process, but the last part " +
+                    "sent can be smaller since it is not padded.\n" +
+                    "The valid range is 50MB to 5GB.")
+            .required(true)
+            .defaultValue("5 GB")
+            .addValidator(StandardValidators.createDataSizeBoundsValidator(MIN_S3_PART_SIZE, MAX_S3_PUTOBJECT_SIZE))
+            .build();
+
     public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
-        Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
-            FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
+        Arrays.asList(BUCKET, KEY, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
+                MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
+                FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
+
+    final static String S3_BUCKET_KEY = "s3.bucket";
+    final static String S3_OBJECT_KEY = "s3.key";
+    final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId";
+    final static String S3_VERSION_ATTR_KEY = "s3.version";
+    final static String S3_ETAG_ATTR_KEY = "s3.etag";
+    final static String S3_EXPIRATION_ATTR_KEY = "s3.expiration";
+    final static String S3_STORAGECLASS_ATTR_KEY = "s3.storeClass";
+    final static String S3_STORAGECLASS_META_KEY = "x-amz-storage-class";
+    final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata";
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -102,6 +177,94 @@ public class PutS3Object extends AbstractS3Processor {
             .build();
     }
 
+    protected File getPersistenceFile() {
+        return new File(PERSISTENCE_ROOT + getIdentifier());
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+        if (descriptor.equals(KEY)
+                || descriptor.equals(BUCKET)
+                || descriptor.equals(ENDPOINT_OVERRIDE)
+                || descriptor.equals(STORAGE_CLASS)
+                || descriptor.equals(REGION)) {
+            destroyState();
+        }
+    }
+
+    protected MultipartState getState(final String s3ObjectKey) throws IOException {
+        // get local state if it exists
+        MultipartState currState = null;
+        final File persistenceFile = getPersistenceFile();
+        if (persistenceFile.exists()) {
+            try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+                final Properties props = new Properties();
+                props.load(fis);
+                if (props.containsKey(s3ObjectKey)) {
+                    final String localSerialState = props.getProperty(s3ObjectKey);
+                    if (localSerialState != null) {
+                        currState = new MultipartState(localSerialState);
+                        getLogger().info("Local state for {} loaded with uploadId {} and {} partETags",
+                                new Object[]{s3ObjectKey, currState.getUploadId(), currState.getPartETags().size()});
+                    }
+                }
+            } catch (IOException ioe) {
+                getLogger().warn("Failed to recover local state for {} due to {}. Assuming no local state and " +
+                        "restarting upload.", new Object[]{s3ObjectKey, ioe.getMessage()});
+            }
+        }
+        return currState;
+    }
+
+    protected void persistState(final String s3ObjectKey, final MultipartState currState) throws IOException {
+        final String currStateStr = (currState == null) ? null : currState.toString();
+        final File persistenceFile = getPersistenceFile();
+        final File parentDir = persistenceFile.getParentFile();
+        if (!parentDir.exists() && !parentDir.mkdirs()) {
+            throw new IOException("Persistence directory (" + parentDir.getAbsolutePath() + ") does not exist and " +
+                    "could not be created.");
+        }
+        final Properties props = new Properties();
+        if (persistenceFile.exists()) {
+            try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+                props.load(fis);
+            }
+        }
+        if (currStateStr != null) {
+            props.setProperty(s3ObjectKey, currStateStr);
+        } else {
+            props.remove(s3ObjectKey);
+        }
+
+        try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
+            props.store(fos, null);
+        } catch (IOException ioe) {
+            getLogger().error("Could not store state {} due to {}.",
+                    new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
+        }
+    }
+
+    protected void removeState(final String s3ObjectKey) throws IOException {
+        persistState(s3ObjectKey, null);
+    }
+
+    protected void destroyState() {
+        final File persistenceFile = getPersistenceFile();
+        if (persistenceFile.exists()) {
+            if (!persistenceFile.delete()) {
+                getLogger().warn("Could not delete state file {}, attempting to delete contents.",
+                        new Object[]{persistenceFile.getAbsolutePath()});
+            } else {
+                try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
+                    new Properties().store(fos, null);
+                } catch (IOException ioe) {
+                    getLogger().error("Could not store empty state file {} due to {}.",
+                            new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
+                }
+            }
+        }
+    }
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) {
         FlowFile flowFile = session.get();
@@ -113,10 +276,18 @@ public class PutS3Object extends AbstractS3Processor {
 
         final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String cacheKey = getIdentifier() + "/" + bucket + "/" + key;
 
         final AmazonS3 s3 = getClient();
         final FlowFile ff = flowFile;
         final Map<String, String> attributes = new HashMap<>();
+        final String ffFilename = ff.getAttributes().get(CoreAttributes.FILENAME.key());
+        attributes.put(S3_BUCKET_KEY, bucket);
+        attributes.put(S3_OBJECT_KEY, key);
+
+        final Long multipartThreshold = context.getProperty(MULTIPART_THRESHOLD).asDataSize(DataUnit.B).longValue();
+        final Long multipartPartSize = context.getProperty(MULTIPART_PART_SIZE).asDataSize(DataUnit.B).longValue();
+
         try {
             session.read(flowFile, new InputStreamCallback() {
                 @Override
@@ -126,7 +297,8 @@ public class PutS3Object extends AbstractS3Processor {
                         objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key()));
                         objectMetadata.setContentLength(ff.getSize());
 
-                        final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue();
+                        final String expirationRule = context.getProperty(EXPIRATION_RULE_ID)
+                                .evaluateAttributeExpressions(ff).getValue();
                         if (expirationRule != null) {
                             objectMetadata.setExpirationTimeRuleId(expirationRule);
                         }
@@ -134,7 +306,8 @@ public class PutS3Object extends AbstractS3Processor {
                         final Map<String, String> userMetadata = new HashMap<>();
                         for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
                             if (entry.getKey().isDynamic()) {
-                                final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue();
+                                final String value = context.getProperty(
+                                        entry.getKey()).evaluateAttributeExpressions(ff).getValue();
                                 userMetadata.put(entry.getKey().getName(), value);
                             }
                         }
@@ -143,23 +316,223 @@ public class PutS3Object extends AbstractS3Processor {
                             objectMetadata.setUserMetadata(userMetadata);
                         }
 
-                        final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
-                        request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
-                        final AccessControlList acl = createACL(context, ff);
-                        if (acl != null) {
-                            request.setAccessControlList(acl);
-                        }
+                        if (ff.getSize() <= multipartThreshold) {
+                            //----------------------------------------
+                            // single part upload
+                            //----------------------------------------
+                            final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
+                            request.setStorageClass(
+                                    StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+                            final AccessControlList acl = createACL(context, ff);
+                            if (acl != null) {
+                                request.setAccessControlList(acl);
+                            }
 
-                        final PutObjectResult result = s3.putObject(request);
-                        if (result.getVersionId() != null) {
-                            attributes.put("s3.version", result.getVersionId());
-                        }
+                            try {
+                                final PutObjectResult result = s3.putObject(request);
+                                if (result.getVersionId() != null) {
+                                    attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId());
+                                }
+                                if (result.getETag() != null) {
+                                    attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
+                                }
+                                if (result.getExpirationTime() != null) {
+                                    attributes.put(S3_EXPIRATION_ATTR_KEY, result.getExpirationTime().toString());
+                                }
+                                if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY)) {
+                                    attributes.put(S3_STORAGECLASS_ATTR_KEY,
+                                            result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString());
+                                }
+                                if (userMetadata.size() > 0) {
+                                    StringBuilder userMetaBldr = new StringBuilder();
+                                    for (String userKey : userMetadata.keySet()) {
+                                        userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
+                                    }
+                                    attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
+                                }
+                            } catch (AmazonClientException e) {
+                                getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
+                                        new Object[]{ffFilename, bucket, key, e.getMessage()});
+                                throw (e);
+                            }
+                        } else {
+                            //----------------------------------------
+                            // multippart upload
+                            //----------------------------------------
 
-                        attributes.put("s3.etag", result.getETag());
+                            // load or create persistent state
+                            //------------------------------------------------------------
+                            MultipartState currentState;
+                            try {
+                                currentState = getState(cacheKey);
+                                if (currentState != null) {
+                                    if (currentState.getPartETags().size() > 0) {
+                                        final PartETag lastETag = currentState.getPartETags().get(
+                                                currentState.getPartETags().size() - 1);
+                                        getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
+                                                "uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
+                                                "contentLength='{}' partsLoaded={} lastPart={}/{}",
+                                                new Object[]{ffFilename, bucket, key, currentState.getUploadId(),
+                                                        currentState.getFilePosition(), currentState.getPartSize(),
+                                                        currentState.getStorageClass().toString(),
+                                                        currentState.getContentLength(),
+                                                        currentState.getPartETags().size(),
+                                                        Integer.toString(lastETag.getPartNumber()),
+                                                        lastETag.getETag()});
+                                    } else {
+                                        getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
+                                                "uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
+                                                "contentLength='{}' no partsLoaded",
+                                                new Object[]{ffFilename, bucket, key, currentState.getUploadId(),
+                                                        currentState.getFilePosition(), currentState.getPartSize(),
+                                                        currentState.getStorageClass().toString(),
+                                                        currentState.getContentLength()});
+                                    }
+                                } else {
+                                    currentState = new MultipartState();
+                                    currentState.setPartSize(multipartPartSize);
+                                    currentState.setStorageClass(
+                                            StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+                                    currentState.setContentLength(ff.getSize());
+                                    persistState(cacheKey, currentState);
+                                    getLogger().info("Starting new upload for flowfile='{}' bucket='{}' key='{}'",
+                                            new Object[]{ffFilename, bucket, key});
+                                }
+                            } catch (IOException e) {
+                                getLogger().error("IOException initiating cache state while processing flow files: " +
+                                        e.getMessage());
+                                throw (e);
+                            }
+
+                            // initiate multipart upload or find position in file
+                            //------------------------------------------------------------
+                            if (currentState.getUploadId().isEmpty()) {
+                                final InitiateMultipartUploadRequest initiateRequest =
+                                        new InitiateMultipartUploadRequest(bucket, key, objectMetadata);
+                                initiateRequest.setStorageClass(currentState.getStorageClass());
+                                final AccessControlList acl = createACL(context, ff);
+                                if (acl != null) {
+                                    initiateRequest.setAccessControlList(acl);
+                                }
+                                try {
+                                    final InitiateMultipartUploadResult initiateResult =
+                                            s3.initiateMultipartUpload(initiateRequest);
+                                    currentState.setUploadId(initiateResult.getUploadId());
+                                    currentState.getPartETags().clear();
+                                    try {
+                                        persistState(cacheKey, currentState);
+                                    } catch (Exception e) {
+                                        getLogger().info("Exception saving cache state while processing flow file: " +
+                                                e.getMessage());
+                                        throw(new ProcessException("Exception saving cache state", e));
+                                    }
+                                    getLogger().info("Success initiating upload flowfile={} available={} position={} " +
+                                            "length={} bucket={} key={} uploadId={}",
+                                            new Object[]{ffFilename, in.available(), currentState.getFilePosition(),
+                                                    currentState.getContentLength(), bucket, key,
+                                                    currentState.getUploadId()});
+                                    if (initiateResult.getUploadId() != null) {
+                                        attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId());
+                                    }
+                                } catch (AmazonClientException e) {
+                                    getLogger().info("Failure initiating upload flowfile={} bucket={} key={} reason={}",
+                                            new Object[]{ffFilename, bucket, key, e.getMessage()});
+                                    throw(e);
+                                }
+                            } else {
+                                if (currentState.getFilePosition() > 0) {
+                                    try {
+                                        final long skipped = in.skip(currentState.getFilePosition());
+                                        if (skipped != currentState.getFilePosition()) {
+                                            getLogger().info("Failure skipping to resume upload flowfile={} " +
+                                                    "bucket={} key={} position={} skipped={}",
+                                                    new Object[]{ffFilename, bucket, key,
+                                                            currentState.getFilePosition(), skipped});
+                                        }
+                                    } catch (Exception e) {
+                                        getLogger().info("Failure skipping to resume upload flowfile={} bucket={} " +
+                                                "key={} position={} reason={}",
+                                                new Object[]{ffFilename, bucket, key, currentState.getFilePosition(),
+                                                        e.getMessage()});
+                                        throw(new ProcessException(e));
+                                    }
+                                }
+                            }
+
+                            // upload parts
+                            //------------------------------------------------------------
+                            long thisPartSize;
+                            for (int part = currentState.getPartETags().size() + 1;
+                                 currentState.getFilePosition() < currentState.getContentLength(); part++) {
+                                if (!PutS3Object.this.isScheduled()) {
+                                    getLogger().info("Processor unscheduled, stopping upload flowfile={} part={} " +
+                                            "uploadId={}", new Object[]{ffFilename, part, currentState.getUploadId()});
+                                    session.rollback();
+                                    return;
+                                }
+                                thisPartSize = Math.min(currentState.getPartSize(),
+                                        (currentState.getContentLength() - currentState.getFilePosition()));
+                                UploadPartRequest uploadRequest = new UploadPartRequest()
+                                        .withBucketName(bucket)
+                                        .withKey(key)
+                                        .withUploadId(currentState.getUploadId())
+                                        .withInputStream(in)
+                                        .withPartNumber(part)
+                                        .withPartSize(thisPartSize);
+                                try {
+                                    UploadPartResult uploadPartResult = s3.uploadPart(uploadRequest);
+                                    currentState.addPartETag(uploadPartResult.getPartETag());
+                                    currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
+                                    try {
+                                        persistState(cacheKey, currentState);
+                                    } catch (Exception e) {
+                                        getLogger().info("Exception saving cache state processing flow file: " +
+                                                e.getMessage());
+                                    }
+                                    getLogger().info("Success uploading part flowfile={} part={} available={} " +
+                                            "etag={} uploadId={}", new Object[]{ffFilename, part, in.available(),
+                                            uploadPartResult.getETag(), currentState.getUploadId()});
+                                } catch (AmazonClientException e) {
+                                    getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={} " +
+                                            "reason={}", new Object[]{ffFilename, part, bucket, key, e.getMessage()});
+                                    throw (e);
+                                }
+                            }
 
-                        final Date expiration = result.getExpirationTime();
-                        if (expiration != null) {
-                            attributes.put("s3.expiration", expiration.toString());
+                            // complete multipart upload
+                            //------------------------------------------------------------
+                            CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
+                                    bucket, key, currentState.getUploadId(), currentState.getPartETags());
+                            try {
+                                CompleteMultipartUploadResult completeResult =
+                                        s3.completeMultipartUpload(completeRequest);
+                                getLogger().info("Success completing upload flowfile={} etag={} uploadId={}",
+                                        new Object[]{ffFilename, completeResult.getETag(), currentState.getUploadId()});
+                                if (completeResult.getVersionId() != null) {
+                                    attributes.put(S3_VERSION_ATTR_KEY, completeResult.getVersionId());
+                                }
+                                if (completeResult.getETag() != null) {
+                                    attributes.put(S3_ETAG_ATTR_KEY, completeResult.getETag());
+                                }
+                                if (completeResult.getExpirationTime() != null) {
+                                    attributes.put(S3_EXPIRATION_ATTR_KEY,
+                                            completeResult.getExpirationTime().toString());
+                                }
+                                if (currentState.getStorageClass() != null) {
+                                    attributes.put(S3_STORAGECLASS_ATTR_KEY, currentState.getStorageClass().toString());
+                                }
+                                if (userMetadata.size() > 0) {
+                                    StringBuilder userMetaBldr = new StringBuilder();
+                                    for (String userKey : userMetadata.keySet()) {
+                                        userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
+                                    }
+                                    attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
+                                }
+                            } catch (AmazonClientException e) {
+                                getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
+                                        new Object[]{ffFilename, bucket, key, e.getMessage()});
+                                throw (e);
+                            }
                         }
                     }
                 }
@@ -170,7 +543,7 @@ public class PutS3Object extends AbstractS3Processor {
             }
             session.transfer(flowFile, REL_SUCCESS);
 
-            final String url = getUrlForObject(bucket, key);
+            final String url = ((AmazonS3Client) s3).getResourceUrl(bucket, key);
             final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
             session.getProvenanceReporter().send(flowFile, url, millis);
 
@@ -180,5 +553,120 @@ public class PutS3Object extends AbstractS3Processor {
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
         }
+
+        try {
+            removeState(cacheKey);
+        } catch (IOException e) {
+            getLogger().info("Error trying to delete key {} from cache: {}",
+                    new Object[]{cacheKey, e.getMessage()});
+        }
+    }
+
+    protected static class MultipartState implements Serializable {
+
+        private static final String SEPARATOR = "#";
+
+        private String _uploadId;
+        private Long _filePosition;
+        private List<PartETag> _partETags;
+        private Long _partSize;
+        private StorageClass _storageClass;
+        private Long _contentLength;
+
+        public MultipartState() {
+            _uploadId = "";
+            _filePosition = 0L;
+            _partETags = new ArrayList<>();
+            _partSize = 0L;
+            _storageClass = StorageClass.Standard;
+            _contentLength = 0L;
+        }
+
+        // create from a previous toString() result
+        public MultipartState(String buf) {
+            String[] fields = buf.split(SEPARATOR);
+            _uploadId = fields[0];
+            _filePosition = Long.parseLong(fields[1]);
+            _partETags = new ArrayList<>();
+            for (String part : fields[2].split(",")) {
+                if (part != null && !part.isEmpty()) {
+                    String[] partFields = part.split("/");
+                    _partETags.add(new PartETag(Integer.parseInt(partFields[0]), partFields[1]));
+                }
+            }
+            _partSize = Long.parseLong(fields[3]);
+            _storageClass = StorageClass.fromValue(fields[4]);
+            _contentLength = Long.parseLong(fields[5]);
+        }
+
+        public String getUploadId() {
+            return _uploadId;
+        }
+
+        public void setUploadId(String id) {
+            _uploadId = id;
+        }
+
+        public Long getFilePosition() {
+            return _filePosition;
+        }
+
+        public void setFilePosition(Long pos) {
+            _filePosition = pos;
+        }
+
+        public List<PartETag> getPartETags() {
+            return _partETags;
+        }
+
+        public void addPartETag(PartETag tag) {
+            _partETags.add(tag);
+        }
+
+        public Long getPartSize() {
+            return _partSize;
+        }
+
+        public void setPartSize(Long size) {
+            _partSize = size;
+        }
+
+        public StorageClass getStorageClass() {
+            return _storageClass;
+        }
+
+        public void setStorageClass(StorageClass aClass) {
+            _storageClass = aClass;
+        }
+
+        public Long getContentLength() {
+            return _contentLength;
+        }
+
+        public void setContentLength(Long length) {
+            _contentLength = length;
+        }
+
+        public String toString() {
+            StringBuilder buf = new StringBuilder();
+            buf.append(_uploadId).append(SEPARATOR)
+                    .append(_filePosition.toString()).append(SEPARATOR);
+            if (_partETags.size() > 0) {
+                boolean first = true;
+                for (PartETag tag : _partETags) {
+                    if (!first) {
+                        buf.append(",");
+                    } else {
+                        first = false;
+                    }
+                    buf.append(String.format("%d/%s", tag.getPartNumber(), tag.getETag()));
+                }
+            }
+            buf.append(SEPARATOR)
+                    .append(_partSize.toString()).append(SEPARATOR)
+                    .append(_storageClass.toString()).append(SEPARATOR)
+                    .append(_contentLength.toString());
+            return buf.toString();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3Test.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3Test.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3Test.java
index 167c16b..737ee8c 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3Test.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3Test.java
@@ -24,7 +24,6 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Iterator;
 
 import org.apache.nifi.util.file.FileUtils;
 import org.junit.AfterClass;
@@ -47,9 +46,14 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
  */
 public abstract class AbstractS3Test {
     protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
-    protected final static String BUCKET_NAME = "test-bucket-00000000-0000-0000-0000-123456789021";
     protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt";
-    protected final static String REGION = "eu-west-1";
+    protected final static String REGION = "us-west-1";
+    // Adding REGION to bucket prevents errors of
+    //      "A conflicting conditional operation is currently in progress against this resource."
+    // when bucket is rapidly added/deleted and consistency propogation causes this error.
+    // (Should not be necessary if REGION remains static, but added to prevent future frustration.)
+    // [see http://stackoverflow.com/questions/13898057/aws-error-message-a-conflicting-conditional-operation-is-currently-in-progress]
+    protected final static String BUCKET_NAME = "test-bucket-00000000-0000-0000-0000-123456789021-" + REGION;
 
     // Static so multiple Tests can use same client
     protected static AmazonS3Client client;
@@ -96,8 +100,7 @@ public abstract class AbstractS3Test {
             ObjectListing objectListing = client.listObjects(BUCKET_NAME);
 
             while (true) {
-                for (Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext(); ) {
-                    S3ObjectSummary objectSummary = (S3ObjectSummary) iterator.next();
+                for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
                     client.deleteObject(BUCKET_NAME, objectSummary.getKey());
                 }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
index 1755e1d..8b0524e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
@@ -17,29 +17,73 @@
 package org.apache.nifi.processors.aws.s3;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.Region;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+import com.sun.org.apache.xerces.internal.impl.xpath.regex.RegularExpression;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import com.amazonaws.services.s3.model.StorageClass;
 
-@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@Ignore("For local testing only - interacts with S3 so the credentials file must be configured")
 public class TestPutS3Object extends AbstractS3Test {
 
-    @Test
-    public void testSimplePut() throws IOException {
-        final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+    final PutS3Object processor = new TestablePutS3Object();
+    final TestRunner runner = TestRunners.newTestRunner(processor);
+    final ProcessContext context = runner.getProcessContext();
 
+    final static String TEST_ENDPOINT = "https://endpoint.com";
+    final static String TEST_TRANSIT_URI = "https://" + BUCKET_NAME + "/bucket.endpoint.com";
+    final static String TEST_PARTSIZE_STRING = "50 mb";
+    final static Long   TEST_PARTSIZE_LONG = 50L * 1024L * 1024L;
+    final static String LINESEP = System.getProperty("line.separator");
+
+    final RegularExpression reS3ETag = new RegularExpression("[0-9a-fA-f]{32,32}");
+
+    @Before
+    public void before() {
         runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(PutS3Object.KEY, SAMPLE_FILE_RESOURCE_NAME);
         runner.setProperty(PutS3Object.REGION, REGION);
         runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+    }
 
-        Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
+    @Test
+    public void testSimplePut() throws IOException {
+        assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
 
         for (int i = 0; i < 3; i++) {
             final Map<String, String> attrs = new HashMap<>();
@@ -53,13 +97,9 @@ public class TestPutS3Object extends AbstractS3Test {
 
     @Test
     public void testPutInFolder() throws IOException {
-        final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
-
-        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
         runner.setProperty(PutS3Object.REGION, REGION);
-        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
 
-        Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
+        assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
 
         final Map<String, String> attrs = new HashMap<>();
         attrs.put("filename", "folder/1.txt");
@@ -72,30 +112,38 @@ public class TestPutS3Object extends AbstractS3Test {
 
     @Test
     public void testStorageClass() throws IOException {
-        final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+        runner.setProperty(PutS3Object.KEY, "${filename}");
+
+        int bytesNeeded = 55 * 1024 * 1024;
+        StringBuilder bldr = new StringBuilder(bytesNeeded + 1000);
+        for (int line = 0; line < 55; line++) {
+            bldr.append(String.format("line %06d This is sixty-three characters plus the EOL marker!\n", line));
+        }
+        String data55mb = bldr.toString();
 
-        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(PutS3Object.REGION, REGION);
-        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
         runner.setProperty(PutS3Object.STORAGE_CLASS, StorageClass.ReducedRedundancy.name());
 
-        Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
+        assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
 
         final Map<String, String> attrs = new HashMap<>();
         attrs.put("filename", "folder/2.txt");
         runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+        attrs.put("filename", "folder/3.txt");
+        runner.enqueue(data55mb.getBytes(), attrs);
 
-        runner.run();
+        runner.run(2);
 
-        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 2);
+        FlowFile file1 = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(0);
+        assertEquals(StorageClass.ReducedRedundancy.toString(),
+                file1.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY));
+        FlowFile file2 = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(1);
+        assertEquals(StorageClass.ReducedRedundancy.toString(),
+                file2.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY));
     }
 
     @Test
     public void testPermissions() throws IOException {
-        final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
-
-        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
         runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST,"28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5");
         runner.setProperty(PutS3Object.REGION, REGION);
 
@@ -107,4 +155,322 @@ public class TestPutS3Object extends AbstractS3Test {
 
         runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
     }
+
+    @Test
+    public void testStateDefaults() {
+        PutS3Object.MultipartState state1 = new PutS3Object.MultipartState();
+        assertEquals(state1.getUploadId(), "");
+        assertEquals(state1.getFilePosition(), (Long) 0L);
+        assertEquals(state1.getPartETags().size(), 0L);
+        assertEquals(state1.getPartSize(), (Long) 0L);
+        assertEquals(state1.getStorageClass().toString(), StorageClass.Standard.toString());
+        assertEquals(state1.getContentLength(), (Long) 0L);
+    }
+
+    @Test
+    public void testStateToString() throws IOException, InitializationException {
+        final String target = "UID-test1234567890#10001#1/PartETag-1,2/PartETag-2,3/PartETag-3,4/PartETag-4#20002#REDUCED_REDUNDANCY#30003";
+        PutS3Object.MultipartState state2 = new PutS3Object.MultipartState();
+        state2.setUploadId("UID-test1234567890");
+        state2.setFilePosition(10001L);
+        for (Integer partNum = 1; partNum < 5; partNum++) {
+            state2.addPartETag(new PartETag(partNum, "PartETag-" + partNum.toString()));
+        }
+        state2.setPartSize(20002L);
+        state2.setStorageClass(StorageClass.ReducedRedundancy);
+        state2.setContentLength(30003L);
+        assertEquals(target, state2.toString());
+    }
+
+    @Test
+    public void testProperties() throws IOException {
+        runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST,
+                "28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5");
+        runner.setProperty(PutS3Object.REGION, REGION);
+        runner.setProperty(PutS3Object.ENDPOINT_OVERRIDE, TEST_ENDPOINT);
+        runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
+
+
+        assertEquals(BUCKET_NAME, context.getProperty(PutS3Object.BUCKET).toString());
+        assertEquals(SAMPLE_FILE_RESOURCE_NAME, context.getProperty(PutS3Object.KEY).evaluateAttributeExpressions().toString());
+        assertEquals(TEST_ENDPOINT, context.getProperty(PutS3Object.ENDPOINT_OVERRIDE).toString());
+        assertEquals(CREDENTIALS_FILE, context.getProperty(PutS3Object.CREDENTIALS_FILE).toString());
+        assertEquals(TEST_PARTSIZE_LONG.longValue(),
+                context.getProperty(PutS3Object.MULTIPART_PART_SIZE).asDataSize(DataUnit.B).longValue());
+    }
+
+    @Test
+    public void testPersistence() throws IOException {
+        final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
+        final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
+        final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key;
+        final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v2";
+        final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v3";
+
+        /*
+         * store 3 versions of state
+         */
+        PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState();
+        processor.persistState(cacheKey1, state1orig);
+
+        PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState();
+        state2orig.setUploadId("1234");
+        state2orig.setContentLength(1234L);
+        processor.persistState(cacheKey2, state2orig);
+
+        PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState();
+        state3orig.setUploadId("5678");
+        state3orig.setContentLength(5678L);
+        processor.persistState(cacheKey3, state3orig);
+
+        /*
+         * reload and validate stored state
+         */
+        final PutS3Object.MultipartState state1new = processor.getState(cacheKey1);
+        assertEquals("", state1new.getUploadId());
+        assertEquals(0L, state1new.getFilePosition().longValue());
+        assertEquals(new ArrayList<PartETag>(), state1new.getPartETags());
+        assertEquals(0L, state1new.getPartSize().longValue());
+        assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state1new.getStorageClass());
+        assertEquals(0L, state1new.getContentLength().longValue());
+
+        final PutS3Object.MultipartState state2new = processor.getState(cacheKey2);
+        assertEquals("1234", state2new.getUploadId());
+        assertEquals(0L, state2new.getFilePosition().longValue());
+        assertEquals(new ArrayList<PartETag>(), state2new.getPartETags());
+        assertEquals(0L, state2new.getPartSize().longValue());
+        assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state2new.getStorageClass());
+        assertEquals(1234L, state2new.getContentLength().longValue());
+
+        final PutS3Object.MultipartState state3new = processor.getState(cacheKey3);
+        assertEquals("5678", state3new.getUploadId());
+        assertEquals(0L, state3new.getFilePosition().longValue());
+        assertEquals(new ArrayList<PartETag>(), state3new.getPartETags());
+        assertEquals(0L, state3new.getPartSize().longValue());
+        assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state3new.getStorageClass());
+        assertEquals(5678L, state3new.getContentLength().longValue());
+    }
+
+    @Test
+    public void testStatePersistsETags() throws IOException {
+        final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
+        final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
+        final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv1";
+        final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv2";
+        final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv3";
+
+        /*
+         * store 3 versions of state
+         */
+        PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState();
+        processor.persistState(cacheKey1, state1orig);
+
+        PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState();
+        state2orig.setUploadId("1234");
+        state2orig.setContentLength(1234L);
+        processor.persistState(cacheKey2, state2orig);
+
+        PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState();
+        state3orig.setUploadId("5678");
+        state3orig.setContentLength(5678L);
+        processor.persistState(cacheKey3, state3orig);
+
+        /*
+         * persist state to caches so that
+         *      1. v2 has 2 and then 4 tags
+         *      2. v3 has 4 and then 2 tags
+         */
+        state2orig.getPartETags().add(new PartETag(1, "state 2 tag one"));
+        state2orig.getPartETags().add(new PartETag(2, "state 2 tag two"));
+        processor.persistState(cacheKey2, state2orig);
+        state2orig.getPartETags().add(new PartETag(3, "state 2 tag three"));
+        state2orig.getPartETags().add(new PartETag(4, "state 2 tag four"));
+        processor.persistState(cacheKey2, state2orig);
+
+        state3orig.getPartETags().add(new PartETag(1, "state 3 tag one"));
+        state3orig.getPartETags().add(new PartETag(2, "state 3 tag two"));
+        state3orig.getPartETags().add(new PartETag(3, "state 3 tag three"));
+        state3orig.getPartETags().add(new PartETag(4, "state 3 tag four"));
+        processor.persistState(cacheKey3, state3orig);
+        state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1);
+        state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1);
+        processor.persistState(cacheKey3, state3orig);
+
+        /*
+         * load state and validate that
+         *     1. v2 restore shows 4 tags
+         *     2. v3 restore shows 2 tags
+         */
+        final PutS3Object.MultipartState state2new = processor.getState(cacheKey2);
+        assertEquals("1234", state2new.getUploadId());
+        assertEquals(4, state2new.getPartETags().size());
+
+        final PutS3Object.MultipartState state3new = processor.getState(cacheKey3);
+        assertEquals("5678", state3new.getUploadId());
+        assertEquals(2, state3new.getPartETags().size());
+    }
+
+    @Test
+    public void testStateRemove() throws IOException {
+        final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
+        final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
+        final String cacheKey = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-sr";
+
+        /*
+         * store state, retrieve and validate, remove and validate
+         */
+        PutS3Object.MultipartState stateOrig = new PutS3Object.MultipartState();
+        stateOrig.setUploadId("1234");
+        stateOrig.setContentLength(1234L);
+        processor.persistState(cacheKey, stateOrig);
+
+        PutS3Object.MultipartState state1 = processor.getState(cacheKey);
+        assertEquals("1234", state1.getUploadId());
+        assertEquals(1234L, state1.getContentLength().longValue());
+
+        processor.persistState(cacheKey, null);
+        PutS3Object.MultipartState state2 = processor.getState(cacheKey);
+        assertNull(state2);
+    }
+
+    @Test
+    public void testApiInteractions() throws IOException {
+        final String FILE1_NAME = "file1";
+        final String ALPHA_LF = "abcdefghijklmnopqrstuvwxyz\n";
+        final String ALPHA_6x = ALPHA_LF + ALPHA_LF + ALPHA_LF + ALPHA_LF + ALPHA_LF + ALPHA_LF;
+        final String FILE1_CONTENTS = ALPHA_6x + ALPHA_6x + ALPHA_6x + ALPHA_6x + ALPHA_6x + ALPHA_6x;
+
+        runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.FILENAME.key(), FILE1_NAME);
+        runner.enqueue(FILE1_CONTENTS.getBytes(), attributes);
+
+        runner.assertValid();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
+        final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+        assertEquals(1, successFiles.size());
+        final List<MockFlowFile> failureFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE);
+        assertEquals(0, failureFiles.size());
+        MockFlowFile ff1 = successFiles.get(0);
+        assertEquals(FILE1_NAME, ff1.getAttribute(CoreAttributes.FILENAME.key()));
+        assertEquals(BUCKET_NAME, ff1.getAttribute(PutS3Object.S3_BUCKET_KEY));
+        assertEquals(SAMPLE_FILE_RESOURCE_NAME, ff1.getAttribute(PutS3Object.S3_OBJECT_KEY));
+        assertTrue(reS3ETag.matches(ff1.getAttribute(PutS3Object.S3_ETAG_ATTR_KEY)));
+    }
+
+    @Test
+    public void testDynamicProperty() throws IOException {
+        final String DYNAMIC_ATTRIB_KEY = "fs.runTimestamp";
+        final String DYNAMIC_ATTRIB_VALUE = "${now():toNumber()}";
+
+        runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
+        PropertyDescriptor testAttrib = processor.getSupportedDynamicPropertyDescriptor(DYNAMIC_ATTRIB_KEY);
+        runner.setProperty(testAttrib, DYNAMIC_ATTRIB_VALUE);
+
+        final String FILE1_NAME = "file1";
+        Map<String, String> attribs = new HashMap<>();
+        attribs.put(CoreAttributes.FILENAME.key(), FILE1_NAME);
+        runner.enqueue("123".getBytes(), attribs);
+
+        runner.assertValid();
+        processor.getPropertyDescriptor(DYNAMIC_ATTRIB_KEY);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
+        final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+        assertEquals(1, successFiles.size());
+        MockFlowFile ff1 = successFiles.get(0);
+
+        Long now = System.currentTimeMillis();
+        String millisNow = Long.toString(now);
+        String millisOneSecAgo = Long.toString(now - 1000L);
+        String usermeta = ff1.getAttribute(PutS3Object.S3_USERMETA_ATTR_KEY);
+        String[] usermetaLine0 = usermeta.split(LINESEP)[0].split("=");
+        String usermetaKey0 = usermetaLine0[0];
+        String usermetaValue0 = usermetaLine0[1];
+        assertEquals(DYNAMIC_ATTRIB_KEY, usermetaKey0);
+        assertTrue(usermetaValue0.compareTo(millisOneSecAgo) >=0 && usermetaValue0.compareTo(millisNow) <= 0);
+    }
+
+    @Test
+    public void testProvenance() throws InitializationException {
+        final String PROV1_FILE = "provfile1";
+        runner.setProperty(PutS3Object.KEY, "${filename}");
+
+        Map<String, String> attribs = new HashMap<>();
+        attribs.put(CoreAttributes.FILENAME.key(), PROV1_FILE);
+        runner.enqueue("prov1 contents".getBytes(), attribs);
+
+        runner.assertValid();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
+        final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+        assertEquals(1, successFiles.size());
+
+        final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        ProvenanceEventRecord provRec1 = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.SEND, provRec1.getEventType());
+        assertEquals(processor.getIdentifier(), provRec1.getComponentId());
+        client.setRegion(Region.fromValue(REGION).toAWSRegion());
+        String targetUri = client.getUrl(BUCKET_NAME, PROV1_FILE).toString();
+        assertEquals(targetUri, provRec1.getTransitUri());
+        assertEquals(6, provRec1.getUpdatedAttributes().size());
+        assertEquals(BUCKET_NAME, provRec1.getUpdatedAttributes().get(PutS3Object.S3_BUCKET_KEY));
+    }
+
+    public class TestablePutS3Object extends PutS3Object {
+        public final String MOCK_CLIENT_PART_ETAG_PREFIX = UUID.nameUUIDFromBytes("PARTETAG".getBytes()).toString();
+        public final String MOCK_CLIENT_FILE_ETAG_PREFIX = UUID.nameUUIDFromBytes("FILEETAG".getBytes()).toString();
+        public final String MOCK_CLIENT_VERSION = "mock-version";
+
+        @Override
+        protected AmazonS3Client createClient(ProcessContext context, AWSCredentials credentials,
+                                              ClientConfiguration config) {
+            return new MockAWSClient(MOCK_CLIENT_PART_ETAG_PREFIX, MOCK_CLIENT_FILE_ETAG_PREFIX, MOCK_CLIENT_VERSION);
+        }
+    }
+
+    private class MockAWSClient extends AmazonS3Client {
+        private String partETag;
+        private String fileETag;
+        private String version;
+
+        public MockAWSClient(String partEtag, String fileEtag, String version) {
+            this.partETag = partEtag;
+            this.fileETag = fileEtag;
+            this.version = version;
+        }
+
+        @Override
+        public InitiateMultipartUploadResult initiateMultipartUpload(InitiateMultipartUploadRequest initiateMultipartUploadRequest) throws AmazonClientException {
+            InitiateMultipartUploadResult result = new InitiateMultipartUploadResult();
+            result.setBucketName(initiateMultipartUploadRequest.getBucketName());
+            result.setKey(initiateMultipartUploadRequest.getKey());
+            return result;
+        }
+
+        @Override
+        public UploadPartResult uploadPart(UploadPartRequest uploadPartRequest) throws AmazonClientException {
+            UploadPartResult result = new UploadPartResult();
+            result.setPartNumber(uploadPartRequest.getPartNumber());
+            result.setETag(partETag + Integer.toString(result.getPartNumber()) + "-" + uploadPartRequest.getUploadId());
+            return result;
+        }
+
+        @Override
+        public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest completeMultipartUploadRequest) throws AmazonClientException {
+            CompleteMultipartUploadResult result = new CompleteMultipartUploadResult();
+            result.setBucketName(completeMultipartUploadRequest.getBucketName());
+            result.setKey(completeMultipartUploadRequest.getKey());
+            result.setETag(fileETag + completeMultipartUploadRequest.getUploadId());
+            Date date1 = new Date();
+            date1.setTime(System.currentTimeMillis());
+            result.setExpirationTime(date1);
+            result.setVersionId(version);
+            return result;
+        }
+    }
+
 }
\ No newline at end of file


[2/2] nifi git commit: NIFI-1107: some code review fixes

Posted by tk...@apache.org.
NIFI-1107: some code review fixes


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/08e65abd
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/08e65abd
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/08e65abd

Branch: refs/heads/NIFI-1107
Commit: 08e65abd2b82d9fdf8b6ecd645e59dc2697adb6d
Parents: 8917df8
Author: Tony Kurc <tr...@gmail.com>
Authored: Wed Nov 25 23:55:35 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Wed Nov 25 23:55:35 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/aws/s3/PutS3Object.java     | 132 +++++++++++--------
 1 file changed, 80 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/08e65abd/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 19bd881..398f4fa 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -31,6 +31,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
@@ -40,6 +43,7 @@ import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
 import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartResult;
+
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -162,6 +166,10 @@ public class PutS3Object extends AbstractS3Processor {
     final static String S3_STORAGECLASS_META_KEY = "x-amz-storage-class";
     final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata";
 
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final Lock readLock = lock.readLock();
+    private final Lock writeLock = lock.writeLock();
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return properties;
@@ -193,54 +201,65 @@ public class PutS3Object extends AbstractS3Processor {
     }
 
     protected MultipartState getState(final String s3ObjectKey) throws IOException {
-        // get local state if it exists
-        MultipartState currState = null;
-        final File persistenceFile = getPersistenceFile();
-        if (persistenceFile.exists()) {
-            try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
-                final Properties props = new Properties();
-                props.load(fis);
-                if (props.containsKey(s3ObjectKey)) {
-                    final String localSerialState = props.getProperty(s3ObjectKey);
-                    if (localSerialState != null) {
-                        currState = new MultipartState(localSerialState);
-                        getLogger().info("Local state for {} loaded with uploadId {} and {} partETags",
-                                new Object[]{s3ObjectKey, currState.getUploadId(), currState.getPartETags().size()});
+        readLock.lock();
+        try {
+            // get local state if it exists
+            MultipartState currState = null;
+            final File persistenceFile = getPersistenceFile();
+            if (persistenceFile.exists()) {
+                try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+                    final Properties props = new Properties();
+                    props.load(fis);
+                    if (props.containsKey(s3ObjectKey)) {
+                        final String localSerialState = props.getProperty(s3ObjectKey);
+                        if (localSerialState != null) {
+                            currState = new MultipartState(localSerialState);
+                            getLogger().info("Local state for {} loaded with uploadId {} and {} partETags",
+                                    new Object[]{s3ObjectKey, currState.getUploadId(), currState.getPartETags().size()});
+                        }
                     }
+                } catch (IOException ioe) {
+                    getLogger().warn("Failed to recover local state for {} due to {}. Assuming no local state and " +
+                            "restarting upload.", new Object[]{s3ObjectKey, ioe.getMessage()});
                 }
-            } catch (IOException ioe) {
-                getLogger().warn("Failed to recover local state for {} due to {}. Assuming no local state and " +
-                        "restarting upload.", new Object[]{s3ObjectKey, ioe.getMessage()});
             }
+            return currState;
+        } finally {
+            readLock.unlock();
         }
-        return currState;
     }
 
     protected void persistState(final String s3ObjectKey, final MultipartState currState) throws IOException {
-        final String currStateStr = (currState == null) ? null : currState.toString();
-        final File persistenceFile = getPersistenceFile();
-        final File parentDir = persistenceFile.getParentFile();
-        if (!parentDir.exists() && !parentDir.mkdirs()) {
-            throw new IOException("Persistence directory (" + parentDir.getAbsolutePath() + ") does not exist and " +
-                    "could not be created.");
-        }
-        final Properties props = new Properties();
-        if (persistenceFile.exists()) {
-            try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
-                props.load(fis);
+        writeLock.lock();
+        try{
+
+            final String currStateStr = (currState == null) ? null : currState.toString();
+            final File persistenceFile = getPersistenceFile();
+            final File parentDir = persistenceFile.getParentFile();
+            if (!parentDir.exists() && !parentDir.mkdirs()) {
+                throw new IOException("Persistence directory (" + parentDir.getAbsolutePath() + ") does not exist and " +
+                        "could not be created.");
+            }
+            final Properties props = new Properties();
+            if (persistenceFile.exists()) {
+                try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+                    props.load(fis);
+                }
+            }
+            if (currStateStr != null) {
+                props.setProperty(s3ObjectKey, currStateStr);
+            } else {
+                props.remove(s3ObjectKey);
             }
-        }
-        if (currStateStr != null) {
-            props.setProperty(s3ObjectKey, currStateStr);
-        } else {
-            props.remove(s3ObjectKey);
-        }
 
-        try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
-            props.store(fos, null);
-        } catch (IOException ioe) {
-            getLogger().error("Could not store state {} due to {}.",
-                    new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
+            try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
+                props.store(fos, null);
+            } catch (IOException ioe) {
+                getLogger().error("Could not store state {} due to {}.",
+                        new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
+            }
+        } finally {
+            writeLock.unlock();
         }
     }
 
@@ -249,19 +268,24 @@ public class PutS3Object extends AbstractS3Processor {
     }
 
     protected void destroyState() {
-        final File persistenceFile = getPersistenceFile();
-        if (persistenceFile.exists()) {
-            if (!persistenceFile.delete()) {
-                getLogger().warn("Could not delete state file {}, attempting to delete contents.",
-                        new Object[]{persistenceFile.getAbsolutePath()});
-            } else {
-                try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
-                    new Properties().store(fos, null);
-                } catch (IOException ioe) {
-                    getLogger().error("Could not store empty state file {} due to {}.",
-                            new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
+        writeLock.lock();
+        try {
+            final File persistenceFile = getPersistenceFile();
+            if (persistenceFile.exists()) {
+                if (!persistenceFile.delete()) {
+                    getLogger().warn("Could not delete state file {}, attempting to delete contents.",
+                            new Object[]{persistenceFile.getAbsolutePath()});
+                } else {
+                    try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
+                        new Properties().store(fos, null);
+                    } catch (IOException ioe) {
+                        getLogger().error("Could not store empty state file {} due to {}.",
+                                new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
+                    }
                 }
-            }
+            } 
+        } finally {
+            writeLock.unlock();
         }
     }
 
@@ -357,7 +381,7 @@ public class PutS3Object extends AbstractS3Processor {
                             }
                         } else {
                             //----------------------------------------
-                            // multippart upload
+                            // multipart upload
                             //----------------------------------------
 
                             // load or create persistent state
@@ -366,6 +390,7 @@ public class PutS3Object extends AbstractS3Processor {
                             try {
                                 currentState = getState(cacheKey);
                                 if (currentState != null) {
+                                    // Log the current state
                                     if (currentState.getPartETags().size() > 0) {
                                         final PartETag lastETag = currentState.getPartETags().get(
                                                 currentState.getPartETags().size() - 1);
@@ -389,6 +414,7 @@ public class PutS3Object extends AbstractS3Processor {
                                                         currentState.getContentLength()});
                                     }
                                 } else {
+                                    // create a new state object
                                     currentState = new MultipartState();
                                     currentState.setPartSize(multipartPartSize);
                                     currentState.setStorageClass(
@@ -564,6 +590,8 @@ public class PutS3Object extends AbstractS3Processor {
 
     protected static class MultipartState implements Serializable {
 
+        private static final long serialVersionUID = 7959345311775848703L;
+
         private static final String SEPARATOR = "#";
 
         private String _uploadId;