You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/11/26 05:56:37 UTC
[1/2] nifi git commit: NIFI-1107: Integrate Multipart uploads into
the PutS3Object processor * Add nifi-ssl-context-service-nar to nifi-aws-nar
pom. * Add nifi-ssl-context-service-api to nifi-aws-processors pom. * Add SSL
context service and endpoint ove
Repository: nifi
Updated Branches:
refs/heads/NIFI-1107 [created] 08e65abd2
NIFI-1107: Integrate Multipart uploads into the PutS3Object processor
* Add nifi-ssl-context-service-nar to nifi-aws-nar pom.
* Add nifi-ssl-context-service-api to nifi-aws-processors pom.
* Add SSL context service and endpoint override properties to AbstractAWSProcessor to support
non-Amazon S3-compatible endpoints.
* Added Multipart upload API support, multipart threshold and size properties, state tracking for multipart upload,
and expanded flow file attributes to PutS3Object.
* Updated annotations and property list for FetchS3Object and DeleteS3Object.
* Added region to bucket in AbstractS3Test to minimize AWS propogation issues during testing.
* Expanded tests in TestPutS3Object to validate new multipart logic
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8917df88
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8917df88
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8917df88
Branch: refs/heads/NIFI-1107
Commit: 8917df88a40fcf6ac5b142d490c915a365858d48
Parents: a3cb803
Author: Joe Skora <js...@gmail.com>
Authored: Wed Nov 25 23:24:56 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Wed Nov 25 23:24:56 2015 -0500
----------------------------------------------------------------------
.../nifi-aws-bundle/nifi-aws-nar/pom.xml | 5 +
.../nifi-aws-bundle/nifi-aws-processors/pom.xml | 4 +
.../processors/aws/AbstractAWSProcessor.java | 38 +-
.../nifi/processors/aws/s3/DeleteS3Object.java | 7 +-
.../nifi/processors/aws/s3/FetchS3Object.java | 6 +-
.../nifi/processors/aws/s3/PutS3Object.java | 538 ++++++++++++++++++-
.../nifi/processors/aws/s3/AbstractS3Test.java | 13 +-
.../nifi/processors/aws/s3/TestPutS3Object.java | 410 +++++++++++++-
8 files changed, 962 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
index 13bca24..fb066f0 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
@@ -31,6 +31,11 @@
<artifactId>nifi-aws-processors</artifactId>
<version>0.4.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-ssl-context-service-nar</artifactId>
+ <type>nar</type>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index 1e16614..a86af8a 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -50,6 +50,10 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-ssl-context-service-api</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index 61c3c85..896fa7b 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import com.amazonaws.http.conn.ssl.SdkTLSSocketFactory;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
@@ -47,6 +48,11 @@ import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+
+import static org.apache.commons.lang3.StringUtils.trimToEmpty;
public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractProcessor {
@@ -92,6 +98,22 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
.defaultValue("30 secs")
.build();
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+
+ public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
+ .name("Endpoint Override URL")
+ .description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " +
+ "The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " +
+ "the selected endpoint URL, allowing use with other S3-compatible endpoints.")
+ .required(false)
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .build();
+
private volatile ClientType client;
private volatile Region region;
@@ -146,6 +168,13 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
config.setConnectionTimeout(commsTimeout);
config.setSocketTimeout(commsTimeout);
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).evaluateAttributeExpressions().asControllerService(SSLContextService.class);
+ if (sslContextService != null) {
+ final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE);
+ SdkTLSSocketFactory sdkTLSSocketFactory = new SdkTLSSocketFactory(sslContext, null);
+ config.getApacheHttpClientConfig().setSslSocketFactory(sdkTLSSocketFactory);
+ }
+
return config;
}
@@ -160,10 +189,17 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl
if (region != null) {
this.region = Region.getRegion(Regions.fromName(region));
client.setRegion(this.region);
- } else{
+ } else {
this.region = null;
}
}
+
+ // if the endpoint override has been configured, set the endpoint.
+ // (per Amazon docs this should only be configured at client creation)
+ final String urlstr = trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue());
+ if (!urlstr.isEmpty()) {
+ this.client.setEndpoint(urlstr);
+ }
}
protected abstract ClientType createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config);
http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index 6b078d1..fc43492 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -40,7 +40,7 @@ import org.apache.nifi.processor.util.StandardValidators;
@SupportsBatching
-@SeeAlso({PutS3Object.class})
+@SeeAlso({PutS3Object.class, FetchS3Object.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " +
@@ -56,8 +56,9 @@ public class DeleteS3Object extends AbstractS3Processor {
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
- Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, TIMEOUT, VERSION_ID,
- FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
+ Arrays.asList(BUCKET, KEY, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
+ REGION, TIMEOUT, VERSION_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST,
+ WRITE_ACL_LIST, OWNER));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index d7ec88a..fda194a 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -46,10 +46,10 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
@SupportsBatching
-@SeeAlso({PutS3Object.class})
+@SeeAlso({PutS3Object.class, DeleteS3Object.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
-@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile")
+@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile.")
@WritesAttributes({
@WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"),
@WritesAttribute(attribute = "path", description = "The path of the file"),
@@ -73,7 +73,7 @@ public class FetchS3Object extends AbstractS3Processor {
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
- Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT, VERSION_ID));
+ Arrays.asList(BUCKET, KEY, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, REGION, TIMEOUT, VERSION_ID));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index b03d85d..19bd881 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -17,16 +17,29 @@
package org.apache.nifi.processors.aws.s3;
import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -40,6 +53,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
@@ -55,20 +69,48 @@ import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.StorageClass;
@SupportsBatching
-@SeeAlso({FetchS3Object.class})
+@SeeAlso({FetchS3Object.class, DeleteS3Object.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
-@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
-@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", value = "The value of a User-Defined Metadata field to add to the S3 Object",
- description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true)
+@CapabilityDescription("Uploads FlowFiles to an Amazon S3 Bucket.\n" +
+ "The upload uses either the PutS3Object method or PutS3MultipartUpload methods. The PutS3Object method " +
+ "send the file in a single synchronous call, but it has a 5GB size limit. Larger files are sent using the " +
+ "multipart upload methods that initiate, transfer the parts, and complete an upload. This multipart process " +
+ "saves state after each step so that a large upload can be resumed with minimal loss if the processor or " +
+ "cluster is stopped and restarted.\n" +
+ "A multipart upload consists of three steps\n" +
+ " 1) initiate upload,\n" +
+ " 2) upload the parts, and\n" +
+ " 3) complete the upload.\n" +
+ "For multipart uploads, the processor saves state locally tracking the upload ID and parts uploaded, which " +
+ "must both be provided to complete the upload.\n" +
+ "The AWS libraries select an endpoint URL based on the AWS region, but this can be overridden with the " +
+ "'Endpoint Override URL' property for use with other S3-compatible endpoints.\n" +
+ "The S3 API specifies that the maximum file size for a PutS3Object upload is 5GB. It also requires that " +
+ "parts in a multipart upload must be at least 5MB in size, except for the last part. These limits are " +
+ "establish the bounds for the Multipart Upload Threshold and Part Size properties.")
+@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object",
+ value = "The value of a User-Defined Metadata field to add to the S3 Object",
+ description = "Allows user-defined metadata to be added to the S3 object as key/value pairs",
+ supportsExpressionLanguage = true)
@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object")
@WritesAttributes({
+ @WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"),
+ @WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"),
@WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"),
@WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"),
- @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set")
+ @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"),
+ @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of " +
+ "the S3 object, if one is set"),
+ @WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of " +
+ "the S3 object, if any was set")
})
public class PutS3Object extends AbstractS3Processor {
+ public static final long MIN_S3_PART_SIZE = 50L * 1024L * 1024L;
+ public static final long MAX_S3_PUTOBJECT_SIZE = 5L * 1024L * 1024L * 1024L;
+ public static final String PERSISTENCE_ROOT = "conf/state/";
+
public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder()
.name("Expiration Time Rule")
.required(false)
@@ -83,9 +125,42 @@ public class PutS3Object extends AbstractS3Processor {
.defaultValue(StorageClass.Standard.name())
.build();
+ public static final PropertyDescriptor MULTIPART_THRESHOLD = new PropertyDescriptor.Builder()
+ .name("Multipart Threshold")
+ .description("Specifies the file size threshold for switch from the PutS3Object API to the " +
+ "PutS3MultipartUpload API. Flow files bigger than this limit will be sent using the stateful " +
+ "multipart process.\n" +
+ "The valid range is 50MB to 5GB.")
+ .required(true)
+ .defaultValue("5 GB")
+ .addValidator(StandardValidators.createDataSizeBoundsValidator(MIN_S3_PART_SIZE, MAX_S3_PUTOBJECT_SIZE))
+ .build();
+
+ public static final PropertyDescriptor MULTIPART_PART_SIZE = new PropertyDescriptor.Builder()
+ .name("Multipart Part Size")
+ .description("Specifies the part size for use when the PutS3Multipart Upload API is used.\n" +
+ "Flow files will be broken into chunks of this size for the upload process, but the last part " +
+ "sent can be smaller since it is not padded.\n" +
+ "The valid range is 50MB to 5GB.")
+ .required(true)
+ .defaultValue("5 GB")
+ .addValidator(StandardValidators.createDataSizeBoundsValidator(MIN_S3_PART_SIZE, MAX_S3_PUTOBJECT_SIZE))
+ .build();
+
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
- Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
- FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
+ Arrays.asList(BUCKET, KEY, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
+ MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
+ FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
+
+ final static String S3_BUCKET_KEY = "s3.bucket";
+ final static String S3_OBJECT_KEY = "s3.key";
+ final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId";
+ final static String S3_VERSION_ATTR_KEY = "s3.version";
+ final static String S3_ETAG_ATTR_KEY = "s3.etag";
+ final static String S3_EXPIRATION_ATTR_KEY = "s3.expiration";
+ final static String S3_STORAGECLASS_ATTR_KEY = "s3.storeClass";
+ final static String S3_STORAGECLASS_META_KEY = "x-amz-storage-class";
+ final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata";
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -102,6 +177,94 @@ public class PutS3Object extends AbstractS3Processor {
.build();
}
+ protected File getPersistenceFile() {
+ return new File(PERSISTENCE_ROOT + getIdentifier());
+ }
+
+ @Override
+ public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
+ if (descriptor.equals(KEY)
+ || descriptor.equals(BUCKET)
+ || descriptor.equals(ENDPOINT_OVERRIDE)
+ || descriptor.equals(STORAGE_CLASS)
+ || descriptor.equals(REGION)) {
+ destroyState();
+ }
+ }
+
+ protected MultipartState getState(final String s3ObjectKey) throws IOException {
+ // get local state if it exists
+ MultipartState currState = null;
+ final File persistenceFile = getPersistenceFile();
+ if (persistenceFile.exists()) {
+ try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+ final Properties props = new Properties();
+ props.load(fis);
+ if (props.containsKey(s3ObjectKey)) {
+ final String localSerialState = props.getProperty(s3ObjectKey);
+ if (localSerialState != null) {
+ currState = new MultipartState(localSerialState);
+ getLogger().info("Local state for {} loaded with uploadId {} and {} partETags",
+ new Object[]{s3ObjectKey, currState.getUploadId(), currState.getPartETags().size()});
+ }
+ }
+ } catch (IOException ioe) {
+ getLogger().warn("Failed to recover local state for {} due to {}. Assuming no local state and " +
+ "restarting upload.", new Object[]{s3ObjectKey, ioe.getMessage()});
+ }
+ }
+ return currState;
+ }
+
+ protected void persistState(final String s3ObjectKey, final MultipartState currState) throws IOException {
+ final String currStateStr = (currState == null) ? null : currState.toString();
+ final File persistenceFile = getPersistenceFile();
+ final File parentDir = persistenceFile.getParentFile();
+ if (!parentDir.exists() && !parentDir.mkdirs()) {
+ throw new IOException("Persistence directory (" + parentDir.getAbsolutePath() + ") does not exist and " +
+ "could not be created.");
+ }
+ final Properties props = new Properties();
+ if (persistenceFile.exists()) {
+ try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+ props.load(fis);
+ }
+ }
+ if (currStateStr != null) {
+ props.setProperty(s3ObjectKey, currStateStr);
+ } else {
+ props.remove(s3ObjectKey);
+ }
+
+ try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
+ props.store(fos, null);
+ } catch (IOException ioe) {
+ getLogger().error("Could not store state {} due to {}.",
+ new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
+ }
+ }
+
+ protected void removeState(final String s3ObjectKey) throws IOException {
+ persistState(s3ObjectKey, null);
+ }
+
+ protected void destroyState() {
+ final File persistenceFile = getPersistenceFile();
+ if (persistenceFile.exists()) {
+ if (!persistenceFile.delete()) {
+ getLogger().warn("Could not delete state file {}, attempting to delete contents.",
+ new Object[]{persistenceFile.getAbsolutePath()});
+ } else {
+ try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
+ new Properties().store(fos, null);
+ } catch (IOException ioe) {
+ getLogger().error("Could not store empty state file {} due to {}.",
+ new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
+ }
+ }
+ }
+ }
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
@@ -113,10 +276,18 @@ public class PutS3Object extends AbstractS3Processor {
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final String cacheKey = getIdentifier() + "/" + bucket + "/" + key;
final AmazonS3 s3 = getClient();
final FlowFile ff = flowFile;
final Map<String, String> attributes = new HashMap<>();
+ final String ffFilename = ff.getAttributes().get(CoreAttributes.FILENAME.key());
+ attributes.put(S3_BUCKET_KEY, bucket);
+ attributes.put(S3_OBJECT_KEY, key);
+
+ final Long multipartThreshold = context.getProperty(MULTIPART_THRESHOLD).asDataSize(DataUnit.B).longValue();
+ final Long multipartPartSize = context.getProperty(MULTIPART_PART_SIZE).asDataSize(DataUnit.B).longValue();
+
try {
session.read(flowFile, new InputStreamCallback() {
@Override
@@ -126,7 +297,8 @@ public class PutS3Object extends AbstractS3Processor {
objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key()));
objectMetadata.setContentLength(ff.getSize());
- final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue();
+ final String expirationRule = context.getProperty(EXPIRATION_RULE_ID)
+ .evaluateAttributeExpressions(ff).getValue();
if (expirationRule != null) {
objectMetadata.setExpirationTimeRuleId(expirationRule);
}
@@ -134,7 +306,8 @@ public class PutS3Object extends AbstractS3Processor {
final Map<String, String> userMetadata = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (entry.getKey().isDynamic()) {
- final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue();
+ final String value = context.getProperty(
+ entry.getKey()).evaluateAttributeExpressions(ff).getValue();
userMetadata.put(entry.getKey().getName(), value);
}
}
@@ -143,23 +316,223 @@ public class PutS3Object extends AbstractS3Processor {
objectMetadata.setUserMetadata(userMetadata);
}
- final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
- request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
- final AccessControlList acl = createACL(context, ff);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
+ if (ff.getSize() <= multipartThreshold) {
+ //----------------------------------------
+ // single part upload
+ //----------------------------------------
+ final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata);
+ request.setStorageClass(
+ StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+ final AccessControlList acl = createACL(context, ff);
+ if (acl != null) {
+ request.setAccessControlList(acl);
+ }
- final PutObjectResult result = s3.putObject(request);
- if (result.getVersionId() != null) {
- attributes.put("s3.version", result.getVersionId());
- }
+ try {
+ final PutObjectResult result = s3.putObject(request);
+ if (result.getVersionId() != null) {
+ attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId());
+ }
+ if (result.getETag() != null) {
+ attributes.put(S3_ETAG_ATTR_KEY, result.getETag());
+ }
+ if (result.getExpirationTime() != null) {
+ attributes.put(S3_EXPIRATION_ATTR_KEY, result.getExpirationTime().toString());
+ }
+ if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY)) {
+ attributes.put(S3_STORAGECLASS_ATTR_KEY,
+ result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString());
+ }
+ if (userMetadata.size() > 0) {
+ StringBuilder userMetaBldr = new StringBuilder();
+ for (String userKey : userMetadata.keySet()) {
+ userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
+ }
+ attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
+ }
+ } catch (AmazonClientException e) {
+ getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
+ new Object[]{ffFilename, bucket, key, e.getMessage()});
+ throw (e);
+ }
+ } else {
+ //----------------------------------------
+ // multippart upload
+ //----------------------------------------
- attributes.put("s3.etag", result.getETag());
+ // load or create persistent state
+ //------------------------------------------------------------
+ MultipartState currentState;
+ try {
+ currentState = getState(cacheKey);
+ if (currentState != null) {
+ if (currentState.getPartETags().size() > 0) {
+ final PartETag lastETag = currentState.getPartETags().get(
+ currentState.getPartETags().size() - 1);
+ getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
+ "uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
+ "contentLength='{}' partsLoaded={} lastPart={}/{}",
+ new Object[]{ffFilename, bucket, key, currentState.getUploadId(),
+ currentState.getFilePosition(), currentState.getPartSize(),
+ currentState.getStorageClass().toString(),
+ currentState.getContentLength(),
+ currentState.getPartETags().size(),
+ Integer.toString(lastETag.getPartNumber()),
+ lastETag.getETag()});
+ } else {
+ getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " +
+ "uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " +
+ "contentLength='{}' no partsLoaded",
+ new Object[]{ffFilename, bucket, key, currentState.getUploadId(),
+ currentState.getFilePosition(), currentState.getPartSize(),
+ currentState.getStorageClass().toString(),
+ currentState.getContentLength()});
+ }
+ } else {
+ currentState = new MultipartState();
+ currentState.setPartSize(multipartPartSize);
+ currentState.setStorageClass(
+ StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
+ currentState.setContentLength(ff.getSize());
+ persistState(cacheKey, currentState);
+ getLogger().info("Starting new upload for flowfile='{}' bucket='{}' key='{}'",
+ new Object[]{ffFilename, bucket, key});
+ }
+ } catch (IOException e) {
+ getLogger().error("IOException initiating cache state while processing flow files: " +
+ e.getMessage());
+ throw (e);
+ }
+
+ // initiate multipart upload or find position in file
+ //------------------------------------------------------------
+ if (currentState.getUploadId().isEmpty()) {
+ final InitiateMultipartUploadRequest initiateRequest =
+ new InitiateMultipartUploadRequest(bucket, key, objectMetadata);
+ initiateRequest.setStorageClass(currentState.getStorageClass());
+ final AccessControlList acl = createACL(context, ff);
+ if (acl != null) {
+ initiateRequest.setAccessControlList(acl);
+ }
+ try {
+ final InitiateMultipartUploadResult initiateResult =
+ s3.initiateMultipartUpload(initiateRequest);
+ currentState.setUploadId(initiateResult.getUploadId());
+ currentState.getPartETags().clear();
+ try {
+ persistState(cacheKey, currentState);
+ } catch (Exception e) {
+ getLogger().info("Exception saving cache state while processing flow file: " +
+ e.getMessage());
+ throw(new ProcessException("Exception saving cache state", e));
+ }
+ getLogger().info("Success initiating upload flowfile={} available={} position={} " +
+ "length={} bucket={} key={} uploadId={}",
+ new Object[]{ffFilename, in.available(), currentState.getFilePosition(),
+ currentState.getContentLength(), bucket, key,
+ currentState.getUploadId()});
+ if (initiateResult.getUploadId() != null) {
+ attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId());
+ }
+ } catch (AmazonClientException e) {
+ getLogger().info("Failure initiating upload flowfile={} bucket={} key={} reason={}",
+ new Object[]{ffFilename, bucket, key, e.getMessage()});
+ throw(e);
+ }
+ } else {
+ if (currentState.getFilePosition() > 0) {
+ try {
+ final long skipped = in.skip(currentState.getFilePosition());
+ if (skipped != currentState.getFilePosition()) {
+ getLogger().info("Failure skipping to resume upload flowfile={} " +
+ "bucket={} key={} position={} skipped={}",
+ new Object[]{ffFilename, bucket, key,
+ currentState.getFilePosition(), skipped});
+ }
+ } catch (Exception e) {
+ getLogger().info("Failure skipping to resume upload flowfile={} bucket={} " +
+ "key={} position={} reason={}",
+ new Object[]{ffFilename, bucket, key, currentState.getFilePosition(),
+ e.getMessage()});
+ throw(new ProcessException(e));
+ }
+ }
+ }
+
+ // upload parts
+ //------------------------------------------------------------
+ long thisPartSize;
+ for (int part = currentState.getPartETags().size() + 1;
+ currentState.getFilePosition() < currentState.getContentLength(); part++) {
+ if (!PutS3Object.this.isScheduled()) {
+ getLogger().info("Processor unscheduled, stopping upload flowfile={} part={} " +
+ "uploadId={}", new Object[]{ffFilename, part, currentState.getUploadId()});
+ session.rollback();
+ return;
+ }
+ thisPartSize = Math.min(currentState.getPartSize(),
+ (currentState.getContentLength() - currentState.getFilePosition()));
+ UploadPartRequest uploadRequest = new UploadPartRequest()
+ .withBucketName(bucket)
+ .withKey(key)
+ .withUploadId(currentState.getUploadId())
+ .withInputStream(in)
+ .withPartNumber(part)
+ .withPartSize(thisPartSize);
+ try {
+ UploadPartResult uploadPartResult = s3.uploadPart(uploadRequest);
+ currentState.addPartETag(uploadPartResult.getPartETag());
+ currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
+ try {
+ persistState(cacheKey, currentState);
+ } catch (Exception e) {
+ getLogger().info("Exception saving cache state processing flow file: " +
+ e.getMessage());
+ }
+ getLogger().info("Success uploading part flowfile={} part={} available={} " +
+ "etag={} uploadId={}", new Object[]{ffFilename, part, in.available(),
+ uploadPartResult.getETag(), currentState.getUploadId()});
+ } catch (AmazonClientException e) {
+ getLogger().info("Failure uploading part flowfile={} part={} bucket={} key={} " +
+ "reason={}", new Object[]{ffFilename, part, bucket, key, e.getMessage()});
+ throw (e);
+ }
+ }
- final Date expiration = result.getExpirationTime();
- if (expiration != null) {
- attributes.put("s3.expiration", expiration.toString());
+ // complete multipart upload
+ //------------------------------------------------------------
+ CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
+ bucket, key, currentState.getUploadId(), currentState.getPartETags());
+ try {
+ CompleteMultipartUploadResult completeResult =
+ s3.completeMultipartUpload(completeRequest);
+ getLogger().info("Success completing upload flowfile={} etag={} uploadId={}",
+ new Object[]{ffFilename, completeResult.getETag(), currentState.getUploadId()});
+ if (completeResult.getVersionId() != null) {
+ attributes.put(S3_VERSION_ATTR_KEY, completeResult.getVersionId());
+ }
+ if (completeResult.getETag() != null) {
+ attributes.put(S3_ETAG_ATTR_KEY, completeResult.getETag());
+ }
+ if (completeResult.getExpirationTime() != null) {
+ attributes.put(S3_EXPIRATION_ATTR_KEY,
+ completeResult.getExpirationTime().toString());
+ }
+ if (currentState.getStorageClass() != null) {
+ attributes.put(S3_STORAGECLASS_ATTR_KEY, currentState.getStorageClass().toString());
+ }
+ if (userMetadata.size() > 0) {
+ StringBuilder userMetaBldr = new StringBuilder();
+ for (String userKey : userMetadata.keySet()) {
+ userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
+ }
+ attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString());
+ }
+ } catch (AmazonClientException e) {
+ getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}",
+ new Object[]{ffFilename, bucket, key, e.getMessage()});
+ throw (e);
+ }
}
}
}
@@ -170,7 +543,7 @@ public class PutS3Object extends AbstractS3Processor {
}
session.transfer(flowFile, REL_SUCCESS);
- final String url = getUrlForObject(bucket, key);
+ final String url = ((AmazonS3Client) s3).getResourceUrl(bucket, key);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, url, millis);
@@ -180,5 +553,120 @@ public class PutS3Object extends AbstractS3Processor {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}
+
+ try {
+ removeState(cacheKey);
+ } catch (IOException e) {
+ getLogger().info("Error trying to delete key {} from cache: {}",
+ new Object[]{cacheKey, e.getMessage()});
+ }
+ }
+
+ protected static class MultipartState implements Serializable {
+
+ private static final String SEPARATOR = "#";
+
+ private String _uploadId;
+ private Long _filePosition;
+ private List<PartETag> _partETags;
+ private Long _partSize;
+ private StorageClass _storageClass;
+ private Long _contentLength;
+
+ public MultipartState() {
+ _uploadId = "";
+ _filePosition = 0L;
+ _partETags = new ArrayList<>();
+ _partSize = 0L;
+ _storageClass = StorageClass.Standard;
+ _contentLength = 0L;
+ }
+
+ // create from a previous toString() result
+ public MultipartState(String buf) {
+ String[] fields = buf.split(SEPARATOR);
+ _uploadId = fields[0];
+ _filePosition = Long.parseLong(fields[1]);
+ _partETags = new ArrayList<>();
+ for (String part : fields[2].split(",")) {
+ if (part != null && !part.isEmpty()) {
+ String[] partFields = part.split("/");
+ _partETags.add(new PartETag(Integer.parseInt(partFields[0]), partFields[1]));
+ }
+ }
+ _partSize = Long.parseLong(fields[3]);
+ _storageClass = StorageClass.fromValue(fields[4]);
+ _contentLength = Long.parseLong(fields[5]);
+ }
+
+ public String getUploadId() {
+ return _uploadId;
+ }
+
+ public void setUploadId(String id) {
+ _uploadId = id;
+ }
+
+ public Long getFilePosition() {
+ return _filePosition;
+ }
+
+ public void setFilePosition(Long pos) {
+ _filePosition = pos;
+ }
+
+ public List<PartETag> getPartETags() {
+ return _partETags;
+ }
+
+ public void addPartETag(PartETag tag) {
+ _partETags.add(tag);
+ }
+
+ public Long getPartSize() {
+ return _partSize;
+ }
+
+ public void setPartSize(Long size) {
+ _partSize = size;
+ }
+
+ public StorageClass getStorageClass() {
+ return _storageClass;
+ }
+
+ public void setStorageClass(StorageClass aClass) {
+ _storageClass = aClass;
+ }
+
+ public Long getContentLength() {
+ return _contentLength;
+ }
+
+ public void setContentLength(Long length) {
+ _contentLength = length;
+ }
+
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append(_uploadId).append(SEPARATOR)
+ .append(_filePosition.toString()).append(SEPARATOR);
+ if (_partETags.size() > 0) {
+ boolean first = true;
+ for (PartETag tag : _partETags) {
+ if (!first) {
+ buf.append(",");
+ } else {
+ first = false;
+ }
+ buf.append(String.format("%d/%s", tag.getPartNumber(), tag.getETag()));
+ }
+ }
+ buf.append(SEPARATOR)
+ .append(_partSize.toString()).append(SEPARATOR)
+ .append(_storageClass.toString()).append(SEPARATOR)
+ .append(_contentLength.toString());
+ return buf.toString();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3Test.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3Test.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3Test.java
index 167c16b..737ee8c 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3Test.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3Test.java
@@ -24,7 +24,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Iterator;
import org.apache.nifi.util.file.FileUtils;
import org.junit.AfterClass;
@@ -47,9 +46,14 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
*/
public abstract class AbstractS3Test {
protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties";
- protected final static String BUCKET_NAME = "test-bucket-00000000-0000-0000-0000-123456789021";
protected final static String SAMPLE_FILE_RESOURCE_NAME = "/hello.txt";
- protected final static String REGION = "eu-west-1";
+ protected final static String REGION = "us-west-1";
+ // Adding REGION to bucket prevents errors of
+ // "A conflicting conditional operation is currently in progress against this resource."
+ // when bucket is rapidly added/deleted and consistency propogation causes this error.
+ // (Should not be necessary if REGION remains static, but added to prevent future frustration.)
+ // [see http://stackoverflow.com/questions/13898057/aws-error-message-a-conflicting-conditional-operation-is-currently-in-progress]
+ protected final static String BUCKET_NAME = "test-bucket-00000000-0000-0000-0000-123456789021-" + REGION;
// Static so multiple Tests can use same client
protected static AmazonS3Client client;
@@ -96,8 +100,7 @@ public abstract class AbstractS3Test {
ObjectListing objectListing = client.listObjects(BUCKET_NAME);
while (true) {
- for (Iterator<?> iterator = objectListing.getObjectSummaries().iterator(); iterator.hasNext(); ) {
- S3ObjectSummary objectSummary = (S3ObjectSummary) iterator.next();
+ for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) {
client.deleteObject(BUCKET_NAME, objectSummary.getKey());
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/8917df88/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
index 1755e1d..8b0524e 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
@@ -17,29 +17,73 @@
package org.apache.nifi.processors.aws.s3;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.Region;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+import com.sun.org.apache.xerces.internal.impl.xpath.regex.RegularExpression;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
+import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import com.amazonaws.services.s3.model.StorageClass;
-@Ignore("For local testing only - interacts with S3 so the credentials file must be configured and all necessary buckets created")
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@Ignore("For local testing only - interacts with S3 so the credentials file must be configured")
public class TestPutS3Object extends AbstractS3Test {
- @Test
- public void testSimplePut() throws IOException {
- final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+ final PutS3Object processor = new TestablePutS3Object();
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ final ProcessContext context = runner.getProcessContext();
+ final static String TEST_ENDPOINT = "https://endpoint.com";
+ final static String TEST_TRANSIT_URI = "https://" + BUCKET_NAME + "/bucket.endpoint.com";
+ final static String TEST_PARTSIZE_STRING = "50 mb";
+ final static Long TEST_PARTSIZE_LONG = 50L * 1024L * 1024L;
+ final static String LINESEP = System.getProperty("line.separator");
+
+ final RegularExpression reS3ETag = new RegularExpression("[0-9a-fA-f]{32,32}");
+
+ @Before
+ public void before() {
runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+ runner.setProperty(PutS3Object.KEY, SAMPLE_FILE_RESOURCE_NAME);
runner.setProperty(PutS3Object.REGION, REGION);
runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+ }
- Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
+ @Test
+ public void testSimplePut() throws IOException {
+ assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
for (int i = 0; i < 3; i++) {
final Map<String, String> attrs = new HashMap<>();
@@ -53,13 +97,9 @@ public class TestPutS3Object extends AbstractS3Test {
@Test
public void testPutInFolder() throws IOException {
- final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
-
- runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
runner.setProperty(PutS3Object.REGION, REGION);
- runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
- Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
+ assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "folder/1.txt");
@@ -72,30 +112,38 @@ public class TestPutS3Object extends AbstractS3Test {
@Test
public void testStorageClass() throws IOException {
- final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+ runner.setProperty(PutS3Object.KEY, "${filename}");
+
+ int bytesNeeded = 55 * 1024 * 1024;
+ StringBuilder bldr = new StringBuilder(bytesNeeded + 1000);
+ for (int line = 0; line < 55; line++) {
+ bldr.append(String.format("line %06d This is sixty-three characters plus the EOL marker!\n", line));
+ }
+ String data55mb = bldr.toString();
- runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
- runner.setProperty(PutS3Object.REGION, REGION);
- runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.STORAGE_CLASS, StorageClass.ReducedRedundancy.name());
- Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
+ assertTrue(runner.setProperty("x-custom-prop", "hello").isValid());
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "folder/2.txt");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+ attrs.put("filename", "folder/3.txt");
+ runner.enqueue(data55mb.getBytes(), attrs);
- runner.run();
+ runner.run(2);
- runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
+ runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 2);
+ FlowFile file1 = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(0);
+ assertEquals(StorageClass.ReducedRedundancy.toString(),
+ file1.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY));
+ FlowFile file2 = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS).get(1);
+ assertEquals(StorageClass.ReducedRedundancy.toString(),
+ file2.getAttribute(PutS3Object.S3_STORAGECLASS_ATTR_KEY));
}
@Test
public void testPermissions() throws IOException {
- final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
-
- runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
- runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST,"28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5");
runner.setProperty(PutS3Object.REGION, REGION);
@@ -107,4 +155,322 @@ public class TestPutS3Object extends AbstractS3Test {
runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
}
+
+ @Test
+ public void testStateDefaults() {
+ PutS3Object.MultipartState state1 = new PutS3Object.MultipartState();
+ assertEquals(state1.getUploadId(), "");
+ assertEquals(state1.getFilePosition(), (Long) 0L);
+ assertEquals(state1.getPartETags().size(), 0L);
+ assertEquals(state1.getPartSize(), (Long) 0L);
+ assertEquals(state1.getStorageClass().toString(), StorageClass.Standard.toString());
+ assertEquals(state1.getContentLength(), (Long) 0L);
+ }
+
+ @Test
+ public void testStateToString() throws IOException, InitializationException {
+ final String target = "UID-test1234567890#10001#1/PartETag-1,2/PartETag-2,3/PartETag-3,4/PartETag-4#20002#REDUCED_REDUNDANCY#30003";
+ PutS3Object.MultipartState state2 = new PutS3Object.MultipartState();
+ state2.setUploadId("UID-test1234567890");
+ state2.setFilePosition(10001L);
+ for (Integer partNum = 1; partNum < 5; partNum++) {
+ state2.addPartETag(new PartETag(partNum, "PartETag-" + partNum.toString()));
+ }
+ state2.setPartSize(20002L);
+ state2.setStorageClass(StorageClass.ReducedRedundancy);
+ state2.setContentLength(30003L);
+ assertEquals(target, state2.toString());
+ }
+
+ @Test
+ public void testProperties() throws IOException {
+ runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST,
+ "28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5");
+ runner.setProperty(PutS3Object.REGION, REGION);
+ runner.setProperty(PutS3Object.ENDPOINT_OVERRIDE, TEST_ENDPOINT);
+ runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
+
+
+ assertEquals(BUCKET_NAME, context.getProperty(PutS3Object.BUCKET).toString());
+ assertEquals(SAMPLE_FILE_RESOURCE_NAME, context.getProperty(PutS3Object.KEY).evaluateAttributeExpressions().toString());
+ assertEquals(TEST_ENDPOINT, context.getProperty(PutS3Object.ENDPOINT_OVERRIDE).toString());
+ assertEquals(CREDENTIALS_FILE, context.getProperty(PutS3Object.CREDENTIALS_FILE).toString());
+ assertEquals(TEST_PARTSIZE_LONG.longValue(),
+ context.getProperty(PutS3Object.MULTIPART_PART_SIZE).asDataSize(DataUnit.B).longValue());
+ }
+
+ @Test
+ public void testPersistence() throws IOException {
+ final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
+ final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
+ final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key;
+ final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v2";
+ final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-v3";
+
+ /*
+ * store 3 versions of state
+ */
+ PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState();
+ processor.persistState(cacheKey1, state1orig);
+
+ PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState();
+ state2orig.setUploadId("1234");
+ state2orig.setContentLength(1234L);
+ processor.persistState(cacheKey2, state2orig);
+
+ PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState();
+ state3orig.setUploadId("5678");
+ state3orig.setContentLength(5678L);
+ processor.persistState(cacheKey3, state3orig);
+
+ /*
+ * reload and validate stored state
+ */
+ final PutS3Object.MultipartState state1new = processor.getState(cacheKey1);
+ assertEquals("", state1new.getUploadId());
+ assertEquals(0L, state1new.getFilePosition().longValue());
+ assertEquals(new ArrayList<PartETag>(), state1new.getPartETags());
+ assertEquals(0L, state1new.getPartSize().longValue());
+ assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state1new.getStorageClass());
+ assertEquals(0L, state1new.getContentLength().longValue());
+
+ final PutS3Object.MultipartState state2new = processor.getState(cacheKey2);
+ assertEquals("1234", state2new.getUploadId());
+ assertEquals(0L, state2new.getFilePosition().longValue());
+ assertEquals(new ArrayList<PartETag>(), state2new.getPartETags());
+ assertEquals(0L, state2new.getPartSize().longValue());
+ assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state2new.getStorageClass());
+ assertEquals(1234L, state2new.getContentLength().longValue());
+
+ final PutS3Object.MultipartState state3new = processor.getState(cacheKey3);
+ assertEquals("5678", state3new.getUploadId());
+ assertEquals(0L, state3new.getFilePosition().longValue());
+ assertEquals(new ArrayList<PartETag>(), state3new.getPartETags());
+ assertEquals(0L, state3new.getPartSize().longValue());
+ assertEquals(StorageClass.fromValue(StorageClass.Standard.toString()), state3new.getStorageClass());
+ assertEquals(5678L, state3new.getContentLength().longValue());
+ }
+
+ @Test
+ public void testStatePersistsETags() throws IOException {
+ final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
+ final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
+ final String cacheKey1 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv1";
+ final String cacheKey2 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv2";
+ final String cacheKey3 = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-bv3";
+
+ /*
+ * store 3 versions of state
+ */
+ PutS3Object.MultipartState state1orig = new PutS3Object.MultipartState();
+ processor.persistState(cacheKey1, state1orig);
+
+ PutS3Object.MultipartState state2orig = new PutS3Object.MultipartState();
+ state2orig.setUploadId("1234");
+ state2orig.setContentLength(1234L);
+ processor.persistState(cacheKey2, state2orig);
+
+ PutS3Object.MultipartState state3orig = new PutS3Object.MultipartState();
+ state3orig.setUploadId("5678");
+ state3orig.setContentLength(5678L);
+ processor.persistState(cacheKey3, state3orig);
+
+ /*
+ * persist state to caches so that
+ * 1. v2 has 2 and then 4 tags
+ * 2. v3 has 4 and then 2 tags
+ */
+ state2orig.getPartETags().add(new PartETag(1, "state 2 tag one"));
+ state2orig.getPartETags().add(new PartETag(2, "state 2 tag two"));
+ processor.persistState(cacheKey2, state2orig);
+ state2orig.getPartETags().add(new PartETag(3, "state 2 tag three"));
+ state2orig.getPartETags().add(new PartETag(4, "state 2 tag four"));
+ processor.persistState(cacheKey2, state2orig);
+
+ state3orig.getPartETags().add(new PartETag(1, "state 3 tag one"));
+ state3orig.getPartETags().add(new PartETag(2, "state 3 tag two"));
+ state3orig.getPartETags().add(new PartETag(3, "state 3 tag three"));
+ state3orig.getPartETags().add(new PartETag(4, "state 3 tag four"));
+ processor.persistState(cacheKey3, state3orig);
+ state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1);
+ state3orig.getPartETags().remove(state3orig.getPartETags().size() - 1);
+ processor.persistState(cacheKey3, state3orig);
+
+ /*
+ * load state and validate that
+ * 1. v2 restore shows 4 tags
+ * 2. v3 restore shows 2 tags
+ */
+ final PutS3Object.MultipartState state2new = processor.getState(cacheKey2);
+ assertEquals("1234", state2new.getUploadId());
+ assertEquals(4, state2new.getPartETags().size());
+
+ final PutS3Object.MultipartState state3new = processor.getState(cacheKey3);
+ assertEquals("5678", state3new.getUploadId());
+ assertEquals(2, state3new.getPartETags().size());
+ }
+
+ @Test
+ public void testStateRemove() throws IOException {
+ final String bucket = runner.getProcessContext().getProperty(PutS3Object.BUCKET).getValue();
+ final String key = runner.getProcessContext().getProperty(PutS3Object.KEY).getValue();
+ final String cacheKey = runner.getProcessor().getIdentifier() + "/" + bucket + "/" + key + "-sr";
+
+ /*
+ * store state, retrieve and validate, remove and validate
+ */
+ PutS3Object.MultipartState stateOrig = new PutS3Object.MultipartState();
+ stateOrig.setUploadId("1234");
+ stateOrig.setContentLength(1234L);
+ processor.persistState(cacheKey, stateOrig);
+
+ PutS3Object.MultipartState state1 = processor.getState(cacheKey);
+ assertEquals("1234", state1.getUploadId());
+ assertEquals(1234L, state1.getContentLength().longValue());
+
+ processor.persistState(cacheKey, null);
+ PutS3Object.MultipartState state2 = processor.getState(cacheKey);
+ assertNull(state2);
+ }
+
+ @Test
+ public void testApiInteractions() throws IOException {
+ final String FILE1_NAME = "file1";
+ final String ALPHA_LF = "abcdefghijklmnopqrstuvwxyz\n";
+ final String ALPHA_6x = ALPHA_LF + ALPHA_LF + ALPHA_LF + ALPHA_LF + ALPHA_LF + ALPHA_LF;
+ final String FILE1_CONTENTS = ALPHA_6x + ALPHA_6x + ALPHA_6x + ALPHA_6x + ALPHA_6x + ALPHA_6x;
+
+ runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), FILE1_NAME);
+ runner.enqueue(FILE1_CONTENTS.getBytes(), attributes);
+
+ runner.assertValid();
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
+ final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+ assertEquals(1, successFiles.size());
+ final List<MockFlowFile> failureFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_FAILURE);
+ assertEquals(0, failureFiles.size());
+ MockFlowFile ff1 = successFiles.get(0);
+ assertEquals(FILE1_NAME, ff1.getAttribute(CoreAttributes.FILENAME.key()));
+ assertEquals(BUCKET_NAME, ff1.getAttribute(PutS3Object.S3_BUCKET_KEY));
+ assertEquals(SAMPLE_FILE_RESOURCE_NAME, ff1.getAttribute(PutS3Object.S3_OBJECT_KEY));
+ assertTrue(reS3ETag.matches(ff1.getAttribute(PutS3Object.S3_ETAG_ATTR_KEY)));
+ }
+
+ @Test
+ public void testDynamicProperty() throws IOException {
+ final String DYNAMIC_ATTRIB_KEY = "fs.runTimestamp";
+ final String DYNAMIC_ATTRIB_VALUE = "${now():toNumber()}";
+
+ runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, TEST_PARTSIZE_STRING);
+ PropertyDescriptor testAttrib = processor.getSupportedDynamicPropertyDescriptor(DYNAMIC_ATTRIB_KEY);
+ runner.setProperty(testAttrib, DYNAMIC_ATTRIB_VALUE);
+
+ final String FILE1_NAME = "file1";
+ Map<String, String> attribs = new HashMap<>();
+ attribs.put(CoreAttributes.FILENAME.key(), FILE1_NAME);
+ runner.enqueue("123".getBytes(), attribs);
+
+ runner.assertValid();
+ processor.getPropertyDescriptor(DYNAMIC_ATTRIB_KEY);
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
+ final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+ assertEquals(1, successFiles.size());
+ MockFlowFile ff1 = successFiles.get(0);
+
+ Long now = System.currentTimeMillis();
+ String millisNow = Long.toString(now);
+ String millisOneSecAgo = Long.toString(now - 1000L);
+ String usermeta = ff1.getAttribute(PutS3Object.S3_USERMETA_ATTR_KEY);
+ String[] usermetaLine0 = usermeta.split(LINESEP)[0].split("=");
+ String usermetaKey0 = usermetaLine0[0];
+ String usermetaValue0 = usermetaLine0[1];
+ assertEquals(DYNAMIC_ATTRIB_KEY, usermetaKey0);
+ assertTrue(usermetaValue0.compareTo(millisOneSecAgo) >=0 && usermetaValue0.compareTo(millisNow) <= 0);
+ }
+
+ @Test
+ public void testProvenance() throws InitializationException {
+ final String PROV1_FILE = "provfile1";
+ runner.setProperty(PutS3Object.KEY, "${filename}");
+
+ Map<String, String> attribs = new HashMap<>();
+ attribs.put(CoreAttributes.FILENAME.key(), PROV1_FILE);
+ runner.enqueue("prov1 contents".getBytes(), attribs);
+
+ runner.assertValid();
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS);
+ final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS);
+ assertEquals(1, successFiles.size());
+
+ final List<ProvenanceEventRecord> provenanceEvents = runner.getProvenanceEvents();
+ assertEquals(1, provenanceEvents.size());
+ ProvenanceEventRecord provRec1 = provenanceEvents.get(0);
+ assertEquals(ProvenanceEventType.SEND, provRec1.getEventType());
+ assertEquals(processor.getIdentifier(), provRec1.getComponentId());
+ client.setRegion(Region.fromValue(REGION).toAWSRegion());
+ String targetUri = client.getUrl(BUCKET_NAME, PROV1_FILE).toString();
+ assertEquals(targetUri, provRec1.getTransitUri());
+ assertEquals(6, provRec1.getUpdatedAttributes().size());
+ assertEquals(BUCKET_NAME, provRec1.getUpdatedAttributes().get(PutS3Object.S3_BUCKET_KEY));
+ }
+
+ public class TestablePutS3Object extends PutS3Object {
+ public final String MOCK_CLIENT_PART_ETAG_PREFIX = UUID.nameUUIDFromBytes("PARTETAG".getBytes()).toString();
+ public final String MOCK_CLIENT_FILE_ETAG_PREFIX = UUID.nameUUIDFromBytes("FILEETAG".getBytes()).toString();
+ public final String MOCK_CLIENT_VERSION = "mock-version";
+
+ @Override
+ protected AmazonS3Client createClient(ProcessContext context, AWSCredentials credentials,
+ ClientConfiguration config) {
+ return new MockAWSClient(MOCK_CLIENT_PART_ETAG_PREFIX, MOCK_CLIENT_FILE_ETAG_PREFIX, MOCK_CLIENT_VERSION);
+ }
+ }
+
+ private class MockAWSClient extends AmazonS3Client {
+ private String partETag;
+ private String fileETag;
+ private String version;
+
+ public MockAWSClient(String partEtag, String fileEtag, String version) {
+ this.partETag = partEtag;
+ this.fileETag = fileEtag;
+ this.version = version;
+ }
+
+ @Override
+ public InitiateMultipartUploadResult initiateMultipartUpload(InitiateMultipartUploadRequest initiateMultipartUploadRequest) throws AmazonClientException {
+ InitiateMultipartUploadResult result = new InitiateMultipartUploadResult();
+ result.setBucketName(initiateMultipartUploadRequest.getBucketName());
+ result.setKey(initiateMultipartUploadRequest.getKey());
+ return result;
+ }
+
+ @Override
+ public UploadPartResult uploadPart(UploadPartRequest uploadPartRequest) throws AmazonClientException {
+ UploadPartResult result = new UploadPartResult();
+ result.setPartNumber(uploadPartRequest.getPartNumber());
+ result.setETag(partETag + Integer.toString(result.getPartNumber()) + "-" + uploadPartRequest.getUploadId());
+ return result;
+ }
+
+ @Override
+ public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest completeMultipartUploadRequest) throws AmazonClientException {
+ CompleteMultipartUploadResult result = new CompleteMultipartUploadResult();
+ result.setBucketName(completeMultipartUploadRequest.getBucketName());
+ result.setKey(completeMultipartUploadRequest.getKey());
+ result.setETag(fileETag + completeMultipartUploadRequest.getUploadId());
+ Date date1 = new Date();
+ date1.setTime(System.currentTimeMillis());
+ result.setExpirationTime(date1);
+ result.setVersionId(version);
+ return result;
+ }
+ }
+
}
\ No newline at end of file
[2/2] nifi git commit: NIFI-1107: some code review fixes
Posted by tk...@apache.org.
NIFI-1107: some code review fixes
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/08e65abd
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/08e65abd
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/08e65abd
Branch: refs/heads/NIFI-1107
Commit: 08e65abd2b82d9fdf8b6ecd645e59dc2697adb6d
Parents: 8917df8
Author: Tony Kurc <tr...@gmail.com>
Authored: Wed Nov 25 23:55:35 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Wed Nov 25 23:55:35 2015 -0500
----------------------------------------------------------------------
.../nifi/processors/aws/s3/PutS3Object.java | 132 +++++++++++--------
1 file changed, 80 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/08e65abd/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 19bd881..398f4fa 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -31,6 +31,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
@@ -40,6 +43,7 @@ import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
+
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -162,6 +166,10 @@ public class PutS3Object extends AbstractS3Processor {
final static String S3_STORAGECLASS_META_KEY = "x-amz-storage-class";
final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata";
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Lock readLock = lock.readLock();
+ private final Lock writeLock = lock.writeLock();
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
@@ -193,54 +201,65 @@ public class PutS3Object extends AbstractS3Processor {
}
protected MultipartState getState(final String s3ObjectKey) throws IOException {
- // get local state if it exists
- MultipartState currState = null;
- final File persistenceFile = getPersistenceFile();
- if (persistenceFile.exists()) {
- try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
- final Properties props = new Properties();
- props.load(fis);
- if (props.containsKey(s3ObjectKey)) {
- final String localSerialState = props.getProperty(s3ObjectKey);
- if (localSerialState != null) {
- currState = new MultipartState(localSerialState);
- getLogger().info("Local state for {} loaded with uploadId {} and {} partETags",
- new Object[]{s3ObjectKey, currState.getUploadId(), currState.getPartETags().size()});
+ readLock.lock();
+ try {
+ // get local state if it exists
+ MultipartState currState = null;
+ final File persistenceFile = getPersistenceFile();
+ if (persistenceFile.exists()) {
+ try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+ final Properties props = new Properties();
+ props.load(fis);
+ if (props.containsKey(s3ObjectKey)) {
+ final String localSerialState = props.getProperty(s3ObjectKey);
+ if (localSerialState != null) {
+ currState = new MultipartState(localSerialState);
+ getLogger().info("Local state for {} loaded with uploadId {} and {} partETags",
+ new Object[]{s3ObjectKey, currState.getUploadId(), currState.getPartETags().size()});
+ }
}
+ } catch (IOException ioe) {
+ getLogger().warn("Failed to recover local state for {} due to {}. Assuming no local state and " +
+ "restarting upload.", new Object[]{s3ObjectKey, ioe.getMessage()});
}
- } catch (IOException ioe) {
- getLogger().warn("Failed to recover local state for {} due to {}. Assuming no local state and " +
- "restarting upload.", new Object[]{s3ObjectKey, ioe.getMessage()});
}
+ return currState;
+ } finally {
+ readLock.unlock();
}
- return currState;
}
protected void persistState(final String s3ObjectKey, final MultipartState currState) throws IOException {
- final String currStateStr = (currState == null) ? null : currState.toString();
- final File persistenceFile = getPersistenceFile();
- final File parentDir = persistenceFile.getParentFile();
- if (!parentDir.exists() && !parentDir.mkdirs()) {
- throw new IOException("Persistence directory (" + parentDir.getAbsolutePath() + ") does not exist and " +
- "could not be created.");
- }
- final Properties props = new Properties();
- if (persistenceFile.exists()) {
- try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
- props.load(fis);
+ writeLock.lock();
+ try{
+
+ final String currStateStr = (currState == null) ? null : currState.toString();
+ final File persistenceFile = getPersistenceFile();
+ final File parentDir = persistenceFile.getParentFile();
+ if (!parentDir.exists() && !parentDir.mkdirs()) {
+ throw new IOException("Persistence directory (" + parentDir.getAbsolutePath() + ") does not exist and " +
+ "could not be created.");
+ }
+ final Properties props = new Properties();
+ if (persistenceFile.exists()) {
+ try (final FileInputStream fis = new FileInputStream(persistenceFile)) {
+ props.load(fis);
+ }
+ }
+ if (currStateStr != null) {
+ props.setProperty(s3ObjectKey, currStateStr);
+ } else {
+ props.remove(s3ObjectKey);
}
- }
- if (currStateStr != null) {
- props.setProperty(s3ObjectKey, currStateStr);
- } else {
- props.remove(s3ObjectKey);
- }
- try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
- props.store(fos, null);
- } catch (IOException ioe) {
- getLogger().error("Could not store state {} due to {}.",
- new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
+ try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
+ props.store(fos, null);
+ } catch (IOException ioe) {
+ getLogger().error("Could not store state {} due to {}.",
+ new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
+ }
+ } finally {
+ writeLock.unlock();
}
}
@@ -249,19 +268,24 @@ public class PutS3Object extends AbstractS3Processor {
}
protected void destroyState() {
- final File persistenceFile = getPersistenceFile();
- if (persistenceFile.exists()) {
- if (!persistenceFile.delete()) {
- getLogger().warn("Could not delete state file {}, attempting to delete contents.",
- new Object[]{persistenceFile.getAbsolutePath()});
- } else {
- try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
- new Properties().store(fos, null);
- } catch (IOException ioe) {
- getLogger().error("Could not store empty state file {} due to {}.",
- new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
+ writeLock.lock();
+ try {
+ final File persistenceFile = getPersistenceFile();
+ if (persistenceFile.exists()) {
+ if (!persistenceFile.delete()) {
+ getLogger().warn("Could not delete state file {}, attempting to delete contents.",
+ new Object[]{persistenceFile.getAbsolutePath()});
+ } else {
+ try (final FileOutputStream fos = new FileOutputStream(persistenceFile)) {
+ new Properties().store(fos, null);
+ } catch (IOException ioe) {
+ getLogger().error("Could not store empty state file {} due to {}.",
+ new Object[]{persistenceFile.getAbsolutePath(), ioe.getMessage()});
+ }
}
- }
+ }
+ } finally {
+ writeLock.unlock();
}
}
@@ -357,7 +381,7 @@ public class PutS3Object extends AbstractS3Processor {
}
} else {
//----------------------------------------
- // multippart upload
+ // multipart upload
//----------------------------------------
// load or create persistent state
@@ -366,6 +390,7 @@ public class PutS3Object extends AbstractS3Processor {
try {
currentState = getState(cacheKey);
if (currentState != null) {
+ // Log the current state
if (currentState.getPartETags().size() > 0) {
final PartETag lastETag = currentState.getPartETags().get(
currentState.getPartETags().size() - 1);
@@ -389,6 +414,7 @@ public class PutS3Object extends AbstractS3Processor {
currentState.getContentLength()});
}
} else {
+ // create a new state object
currentState = new MultipartState();
currentState.setPartSize(multipartPartSize);
currentState.setStorageClass(
@@ -564,6 +590,8 @@ public class PutS3Object extends AbstractS3Processor {
protected static class MultipartState implements Serializable {
+ private static final long serialVersionUID = 7959345311775848703L;
+
private static final String SEPARATOR = "#";
private String _uploadId;