You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/07/30 19:50:08 UTC
incubator-usergrid git commit: Added additional tests to verify that
the error handling works and returns a 500 error as it should in the request.
Added fix to infinite looping of multipart uploads since amazon won't always
close those in a timely manner
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-898 f02cf24f5 -> 101ae8566
Added additional tests to verify that the error handling works and returns a 500 error as it should in the request.
Added fix to infinite looping of multipart uploads since amazon won't always close those in a timely manner.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/101ae856
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/101ae856
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/101ae856
Branch: refs/heads/USERGRID-898
Commit: 101ae8566706f2d40ebc4238c94d8867bad9e346
Parents: f02cf24
Author: GERey <gr...@apigee.com>
Authored: Thu Jul 30 10:50:07 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Thu Jul 30 10:50:07 2015 -0700
----------------------------------------------------------------------
.../applications/assets/AwsAssetResourceIT.java | 114 ++++++++++++++++++-
.../assets/data/AwsSdkS3BinaryStore.java | 79 +++++++------
2 files changed, 158 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/101ae856/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
index 2bdc5d5..9cf719b 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AwsAssetResourceIT.java
@@ -46,8 +46,11 @@ import org.apache.usergrid.rest.test.resource.model.ApiResponse;
import org.apache.usergrid.rest.test.resource.model.Entity;
import org.apache.usergrid.services.assets.data.AssetUtils;
import org.apache.usergrid.services.assets.data.BinaryStore;
+import org.apache.usergrid.services.exceptions.AwsPropertiesNotFoundException;
import org.apache.usergrid.setup.ConcurrentProcessSingleton;
+import com.amazonaws.SDKGlobalConfiguration;
+import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.multipart.FormDataMultiPart;
import net.jcip.annotations.NotThreadSafe;
@@ -58,6 +61,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
@NotThreadSafe
@@ -89,6 +93,114 @@ public class AwsAssetResourceIT extends AbstractRestIT {
}
@Test
+ public void errorCheckingMissingProperties() throws Exception {
+ Map<String, Object> errorTestProperties;
+ errorTestProperties = getRemoteTestProperties();
+ //test that we fail gracefully if we have missing properties
+ setTestProperty( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR, "xxx" );
+
+ try {
+
+ Map<String, String> payload = hashMap( "name", "assetname" );
+ ApiResponse postResponse = pathResource( getOrgAppPath( "foos" ) ).post( payload );
+ UUID assetId = postResponse.getEntities().get( 0 ).getUuid();
+ assertNotNull( assetId );
+
+ // post a binary asset to that entity
+
+ byte[] data = IOUtils.toByteArray( getClass().getResourceAsStream( "/cassandra_eye.jpg" ) );
+ ApiResponse putResponse =
+ pathResource( getOrgAppPath( "foos/" + assetId ) ).put( data, MediaType.APPLICATION_OCTET_STREAM_TYPE );
+
+
+ }catch ( AwsPropertiesNotFoundException e ){
+ fail("Shouldn't interrupt runtime if access key isnt found.");
+ }
+ catch( UniformInterfaceException uie){
+ assertEquals(500,uie.getResponse().getStatus());
+ }
+ finally{
+ setTestProperties( errorTestProperties );
+ }
+ }
+
+ @Test
+ public void errorCheckingInvalidProperties() throws Exception {
+ Map<String, Object> errorTestProperties;
+ errorTestProperties = getRemoteTestProperties();
+ //test that we fail gracefully if we have missing properties
+ setTestProperty( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR, "xxx");
+ setTestProperty( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR, "xxx" );
+ setTestProperty( "usergrid.binary.bucketname", "xxx" );
+
+ try {
+
+ Map<String, String> payload = hashMap( "name", "assetname" );
+ ApiResponse postResponse = pathResource( getOrgAppPath( "foos" ) ).post( payload );
+ UUID assetId = postResponse.getEntities().get( 0 ).getUuid();
+ assertNotNull( assetId );
+
+ // post a binary asset to that entity
+
+ byte[] data = IOUtils.toByteArray( getClass().getResourceAsStream( "/cassandra_eye.jpg" ) );
+ ApiResponse putResponse =
+ pathResource( getOrgAppPath( "foos/" + assetId ) ).put( data, MediaType.APPLICATION_OCTET_STREAM_TYPE );
+
+
+ }catch ( AwsPropertiesNotFoundException e ){
+ fail("Shouldn't interrupt runtime if access key isnt found.");
+ }
+ catch( UniformInterfaceException uie){
+ assertEquals( 500, uie.getResponse().getStatus() );
+ }
+ finally{
+ setTestProperties( errorTestProperties );
+ }
+ }
+
+ @Test
+ public void errorCheckingInvalidPropertiesMultipartUpload() throws Exception {
+ Map<String, Object> errorTestProperties;
+ errorTestProperties = getRemoteTestProperties();
+ //test that we fail gracefully if we have missing properties
+ setTestProperty( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR, "xxx");
+ setTestProperty( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR, "xxx" );
+ setTestProperty( "usergrid.binary.bucketname", "xxx" );
+
+ try {
+
+ byte[] data = IOUtils.toByteArray( this.getClass().getResourceAsStream( "/file-bigger-than-5M" ) );
+ FormDataMultiPart form = new FormDataMultiPart().field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
+ ApiResponse postResponse = pathResource( getOrgAppPath( "foos" ) ).post( form );
+ UUID assetId = postResponse.getEntities().get(0).getUuid();
+ LOG.info( "Waiting for upload to finish..." );
+ Thread.sleep( 5000 );
+
+ // check that entire file was uploaded
+
+ ApiResponse getResponse = pathResource( getOrgAppPath( "foos/" +assetId ) ).get( ApiResponse.class );
+ LOG.info( "Upload complete!" );
+ InputStream is = pathResource( getOrgAppPath( "foos/" + assetId ) ).getAssetAsStream();
+ byte[] foundData = IOUtils.toByteArray( is );
+ assertEquals( data.length, foundData.length );
+
+ // delete file
+
+ pathResource( getOrgAppPath( "foos/" + assetId ) ).delete();
+
+
+ }catch ( AwsPropertiesNotFoundException e ){
+ fail("Shouldn't interrupt runtime if access key isnt found.");
+ }
+ catch( UniformInterfaceException uie){
+ assertEquals( 500, uie.getResponse().getStatus() );
+ }
+ finally{
+ setTestProperties( errorTestProperties );
+ }
+ }
+
+ @Test
public void octetStreamOnDynamicEntity() throws Exception {
this.refreshIndex();
@@ -258,7 +370,7 @@ public class AwsAssetResourceIT extends AbstractRestIT {
// upload a file larger than 6mb
- byte[] data = IOUtils.toByteArray( this.getClass().getResourceAsStream( "/cat-larger-than-6mb.jpg" ) );
+ byte[] data = IOUtils.toByteArray( this.getClass().getResourceAsStream( "/ship-larger-than-6mb.gif" ) );
FormDataMultiPart form = new FormDataMultiPart().field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
ApiResponse postResponse = pathResource( getOrgAppPath( "bars" ) ).post( form );
UUID assetId = postResponse.getEntities().get(0).getUuid();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/101ae856/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
index c69d44e..a0fd361 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/AwsSdkS3BinaryStore.java
@@ -24,6 +24,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
@@ -44,6 +45,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.SDKGlobalConfiguration;
@@ -61,6 +63,7 @@ import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
+import com.amazonaws.services.s3.model.MultipartUpload;
import com.amazonaws.services.s3.model.MultipartUploadListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
@@ -76,7 +79,7 @@ import java.util.List;
import org.apache.commons.codec.binary.Base64;
-public class AwsSdkS3BinaryStore implements BinaryStore {
+public class AwsSdkS3BinaryStore implements BinaryStore {
private static final Logger logger = LoggerFactory.getLogger(AwsSdkS3BinaryStore.class );
private static final long FIVE_MB = ( FileUtils.ONE_MB * 5 );
@@ -93,38 +96,32 @@ public class AwsSdkS3BinaryStore implements BinaryStore {
@Autowired
private Properties properties;
-
- public AwsSdkS3BinaryStore( String accessId, String secretKey, String bucketName, String regionName ) {
- this.accessId = accessId;
- this.secretKey = secretKey;
- this.bucketName = bucketName;
- this.regionName = regionName;
- }
-
- public AwsSdkS3BinaryStore( String accessId, String secretKey, String bucketName ) {
-// this.accessId = accessId;
-// this.secretKey = secretKey;
-// this.bucketName = bucketName;
- }
-
public AwsSdkS3BinaryStore( ) {
-
- //need to add bucket name here
}
- private AmazonS3 getS3Client() {
- if ( s3Client == null ) {
-
+ //TODO: GREY rework how the s3 client works because currently it handles initlization and returning of the client
+ //ideally it should only do one. and the client should be initlized at the beginning of the run.
+ private AmazonS3 getS3Client() throws Exception{
this.accessId = properties.getProperty( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR );
+ if(accessId == null){
+ logger.error( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR + " not properly set so amazon access key is null" );
+ throw new AwsPropertiesNotFoundException( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR );
+
+ }
this.secretKey = properties.getProperty( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR );
- this.bucketName = properties.getProperty( "usergrid.binary.bucketname" );
- if(accessId==null||secretKey==null||bucketName==null){
- throw new AwsPropertiesNotFoundException( "Access Keys" );
+ if(secretKey == null){
+ logger.error( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR + " not properly set so amazon secret key is null" );
+ throw new AwsPropertiesNotFoundException( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR );
}
+ this.bucketName = properties.getProperty( "usergrid.binary.bucketname" );
+ if(bucketName == null){
+ logger.error( "usergrid.binary.bucketname not properly set so amazon bucket is null" );
+ throw new AwsPropertiesNotFoundException( "usergrid.binary.bucketname" );
+ }
AWSCredentials credentials = new BasicAWSCredentials(accessId, secretKey);
ClientConfiguration clientConfig = new ClientConfiguration();
@@ -133,14 +130,13 @@ public class AwsSdkS3BinaryStore implements BinaryStore {
s3Client = new AmazonS3Client(credentials, clientConfig);
if(regionName != null)
s3Client.setRegion( Region.getRegion(Regions.fromName(regionName)) );
- }
return s3Client;
}
@Override
- public void write( final UUID appId, final Entity entity, InputStream inputStream ) throws IOException {
+ public void write( final UUID appId, final Entity entity, InputStream inputStream ) throws Exception {
String uploadFileName = AssetUtils.buildAssetKey( appId, entity );
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -161,8 +157,9 @@ public class AwsSdkS3BinaryStore implements BinaryStore {
ObjectMetadata om = new ObjectMetadata();
om.setContentLength(written);
- om.setContentType(mimeType);
- PutObjectResult result = getS3Client().putObject(bucketName, uploadFileName, awsInputStream, om);
+ om.setContentType( mimeType );
+ PutObjectResult result = null;
+ result = getS3Client().putObject( bucketName, uploadFileName, awsInputStream, om );
String md5sum = Hex.encodeHexString( Base64.decodeBase64(result.getContentMd5()) );
String eTag = result.getETag();
@@ -180,10 +177,12 @@ public class AwsSdkS3BinaryStore implements BinaryStore {
Boolean isFirstChunck = true;
List<PartETag> partETags = new ArrayList<PartETag>();
+
//get the s3 client in order to initialize the multipart request
getS3Client();
- InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, uploadFileName);
- InitiateMultipartUploadResult initResponse = getS3Client().initiateMultipartUpload(initRequest);
+ InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest( bucketName, uploadFileName );
+ InitiateMultipartUploadResult initResponse = getS3Client().initiateMultipartUpload( initRequest );
+
InputStream firstChunck = new ByteArrayInputStream(data);
PushbackInputStream chunckableInputStream = new PushbackInputStream(inputStream, 1);
@@ -268,13 +267,24 @@ public class AwsSdkS3BinaryStore implements BinaryStore {
logger.error( "Error updating entity with error message", e );
}
+ int timesIterated = 20;
//loop and abort all the multipart uploads
- while ( listResult.getMultipartUploads().size()!=0 ) {
+ while ( listResult.getMultipartUploads().size()!=0 && timesIterated > 0) {
getS3Client().abortMultipartUpload( abortRequest );
+ Thread.sleep( 1000 );
+ timesIterated--;
listResult = getS3Client().listMultipartUploads( listRequest );
+ logger.debug( "Files that haven't been aborted are: ",listResult.getMultipartUploads().listIterator().toString() );
}
+ if ( timesIterated == 0 ){
+ logger.error( "Files parts that couldn't be aborted in 20 seconds are:" );
+ Iterator<MultipartUpload> multipartUploadIterator = listResult.getMultipartUploads().iterator();
+ while(multipartUploadIterator.hasNext()){
+ logger.error( multipartUploadIterator.next().getKey() );
+ }
+ }
}
else {
CompleteMultipartUploadRequest request =
@@ -289,9 +299,10 @@ public class AwsSdkS3BinaryStore implements BinaryStore {
@Override
- public InputStream read( UUID appId, Entity entity, long offset, long length ) throws IOException {
+ public InputStream read( UUID appId, Entity entity, long offset, long length ) throws Exception {
+
+ S3Object object = getS3Client().getObject( bucketName, AssetUtils.buildAssetKey( appId, entity ) );
- S3Object object = getS3Client().getObject(bucketName, AssetUtils.buildAssetKey( appId, entity ));
byte data[] = null;
if ( offset == 0 && length == FIVE_MB ) {
@@ -306,13 +317,13 @@ public class AwsSdkS3BinaryStore implements BinaryStore {
@Override
- public InputStream read( UUID appId, Entity entity ) throws IOException {
+ public InputStream read( UUID appId, Entity entity ) throws Exception {
return read( appId, entity, 0, FIVE_MB );
}
@Override
- public void delete( UUID appId, Entity entity ) {
+ public void delete( UUID appId, Entity entity ) throws Exception {
getS3Client().deleteObject(new DeleteObjectRequest(bucketName, AssetUtils.buildAssetKey( appId, entity )));
}
}