You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/10 15:37:17 UTC

[11/50] [abbrv] incubator-usergrid git commit: Use ExecutorService to limit number of threads used for Uploads (default is 40) and add property to set max uploaded file size (default is 50mb).

Use ExecutorService to limit number of threads used for Uploads (default is 40) and add property to set max uploaded file size (default is 50mb).


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/49ae4ac5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/49ae4ac5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/49ae4ac5

Branch: refs/heads/two-dot-o-import
Commit: 49ae4ac5b8d5d77e90e6e6c6e9d8b299a5423863
Parents: 6f90eba
Author: Dave Johnson <dm...@apigee.com>
Authored: Tue May 19 10:32:42 2015 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Tue May 19 10:32:42 2015 -0400

----------------------------------------------------------------------
 .../main/resources/usergrid-default.properties  |   2 +
 .../applications/assets/AssetResourceIT.java    |  68 +++++-
 .../src/test/resources/cat-larger-than-6mb.jpg  | Bin 0 -> 9799257 bytes
 .../services/assets/data/S3BinaryStore.java     | 215 +++++++++++++------
 4 files changed, 216 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/49ae4ac5/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index d653b7e..0a4f218 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -160,6 +160,8 @@ swagger.basepath=http://localhost:8080
 AWS_ACCESS_KEY_ID=
 AWS_SECRET_KEY=
 usergrid.binary.bucketname=usergrid-test
+usergrid.binary.max-size-mb=50
+usergrid.binary.upload-workers=40
 
 usergrid.test.sample_data_url=
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/49ae4ac5/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
index 666f95e..c3cab63 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/assets/AssetResourceIT.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.rest.applications.assets;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
@@ -40,10 +41,9 @@ import org.apache.commons.io.IOUtils;
 
 import com.sun.jersey.multipart.FormDataMultiPart;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
+import static org.apache.usergrid.management.AccountCreationProps.PROPERTIES_ADMIN_USERS_REQUIRE_CONFIRMATION;
 import static org.apache.usergrid.utils.MapUtils.hashMap;
+import static org.junit.Assert.*;
 
 
 @Concurrent()
@@ -249,12 +249,11 @@ public class AssetResourceIT extends AbstractRestIT {
         node = resource().path( "/test-organization/test-app/foos/" + uuid ).queryParam( "access_token", access_token )
                 .accept( MediaType.APPLICATION_JSON ).type( MediaType.MULTIPART_FORM_DATA ).put( JsonNode.class, form );
         logNode( node );
-        Assert.assertTrue( lastModified != node.findValue( AssetUtils.LAST_MODIFIED ).getLongValue() );
+        assertTrue( lastModified != node.findValue( AssetUtils.LAST_MODIFIED ).getLongValue() );
     }
 
 
     @Test
-    @Ignore("Just enable and run when testing S3 large file upload specifically")
     public void largeFileInS3() throws Exception {
         UserRepo.INSTANCE.load( resource(), access_token );
 
@@ -302,6 +301,63 @@ public class AssetResourceIT extends AbstractRestIT {
                 .accept( MediaType.APPLICATION_JSON_TYPE ).delete( JsonNode.class );
     }
 
+    @Test
+    public void fileTooLargeShouldResultInError() throws Exception {
+
+        Map<String, String> props = new HashMap<String, String>();
+        props.put( "usergrid.binary.max-size-mb", "6" );
+        resource().path( "/testproperties" )
+                .queryParam( "access_token", access_token )
+                .accept( MediaType.APPLICATION_JSON )
+                .type( MediaType.APPLICATION_JSON_TYPE ).post( props );
+
+        try {
+
+            UserRepo.INSTANCE.load( resource(), access_token );
+
+            byte[] data = IOUtils.toByteArray( this.getClass().getResourceAsStream( "/cat-larger-than-6mb.jpg" ) );
+            FormDataMultiPart form = new FormDataMultiPart().field( "file", data, MediaType.MULTIPART_FORM_DATA_TYPE );
+
+            // send data
+            JsonNode node = resource().path( "/test-organization/test-app/bars" ).queryParam( "access_token", access_token )
+                    .accept( MediaType.APPLICATION_JSON ).type( MediaType.MULTIPART_FORM_DATA )
+                    .post( JsonNode.class, form );
+            //logNode( node );
+            JsonNode idNode = node.get( "entities" ).get( 0 ).get( "uuid" );
+            String uuid = idNode.getTextValue();
+
+            // get entity
+            String errorMessage = null;
+            long timeout = System.currentTimeMillis() + 60000;
+            while (true) {
+                LOG.info( "Waiting for upload to finish..." );
+                Thread.sleep( 2000 );
+                node = resource().path( "/test-organization/test-app/bars/" + uuid )
+                        .queryParam( "access_token", access_token ).accept( MediaType.APPLICATION_JSON_TYPE )
+                        .get( JsonNode.class );
+                //logNode( node );
+
+                // poll for the error to happen
+                if (node.findValue( "error" ) != null) {
+                    errorMessage = node.findValue("error").asText();
+                    break;
+                }
+                if (System.currentTimeMillis() > timeout) {
+                    throw new TimeoutException();
+                }
+            }
+
+            assertTrue( errorMessage.startsWith("Asset size "));
+
+        } finally {
+            props = new HashMap<String, String>();
+            props.put( "usergrid.binary.max-size-mb", "25" );
+            resource().path( "/testproperties" )
+                    .queryParam( "access_token", access_token )
+                    .accept( MediaType.APPLICATION_JSON )
+                    .type( MediaType.APPLICATION_JSON_TYPE ).post( props );
+        }
+    }
 
     /**
      * Deleting a connection to an asset should not delete the asset or the asset's data
@@ -317,7 +373,7 @@ public class AssetResourceIT extends AbstractRestIT {
 
         Map<String, String> payload = hashMap("name", "cassandra_eye.jpg");
 
-        JsonNode node = resource().path("/test-organization/test-app/foos")
+        JsonNode node = resource().path("/test-organization/test-app/bars")
                 .queryParam("access_token", access_token)
                 .accept(MediaType.APPLICATION_JSON)
                 .type(MediaType.APPLICATION_JSON_TYPE)

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/49ae4ac5/stack/rest/src/test/resources/cat-larger-than-6mb.jpg
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/resources/cat-larger-than-6mb.jpg b/stack/rest/src/test/resources/cat-larger-than-6mb.jpg
new file mode 100644
index 0000000..d45435a
Binary files /dev/null and b/stack/rest/src/test/resources/cat-larger-than-6mb.jpg differ

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/49ae4ac5/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java
index 29b5e47..f59e79c 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java
@@ -25,10 +25,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Map;
+import java.util.Properties;
+import java.util.Stack;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
 
+import org.apache.usergrid.utils.StringUtils;
 import org.jclouds.ContextBuilder;
 import org.jclouds.blobstore.AsyncBlobStore;
 import org.jclouds.blobstore.BlobStore;
@@ -63,12 +65,16 @@ public class S3BinaryStore implements BinaryStore {
 
     private static final Logger LOG = LoggerFactory.getLogger( S3BinaryStore.class );
     private static final long FIVE_MB = ( FileUtils.ONE_MB * 5 );
+    private static String WORKERS_PROP_NAME = "usergrid.binary.upload-workers";
 
     private BlobStoreContext context;
     private String accessId;
     private String secretKey;
     private String bucketName;
-    private ExecutorService executor = Executors.newFixedThreadPool( 10 );
+    private ExecutorService executorService;
+
+    @Autowired
+    private Properties properties;
 
     @Autowired
     private EntityManagerFactory emf;
@@ -104,17 +110,19 @@ public class S3BinaryStore implements BinaryStore {
     @Override
     public void write( final UUID appId, final Entity entity, InputStream inputStream ) throws IOException {
 
-        final String uploadFileName = AssetUtils.buildAssetKey( appId, entity );
+        // write up to 5mb of data to an byte array
+
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         long written = IOUtils.copyLarge( inputStream, baos, 0, FIVE_MB );
         byte[] data = baos.toByteArray();
 
-        final Map<String, Object> fileMetadata = AssetUtils.getFileMetadata( entity );
-        fileMetadata.put( AssetUtils.LAST_MODIFIED, System.currentTimeMillis() );
+        if ( written < FIVE_MB ) { // total smaller than 5mb
 
-        final String mimeType = AssetMimeHandler.get().getMimeType( entity, data );
+            final String uploadFileName = AssetUtils.buildAssetKey( appId, entity );
+            final String mimeType = AssetMimeHandler.get().getMimeType( entity, data );
 
-        if ( written < FIVE_MB ) { // total smaller than 5mb
+            final Map<String, Object> fileMetadata = AssetUtils.getFileMetadata( entity );
+            fileMetadata.put( AssetUtils.LAST_MODIFIED, System.currentTimeMillis() );
 
             BlobStore blobStore = getContext().getBlobStore();
             BlobBuilder.PayloadBlobBuilder bb = blobStore.blobBuilder( uploadFileName )
@@ -134,69 +142,31 @@ public class S3BinaryStore implements BinaryStore {
         }
         else { // bigger than 5mb... dump 5 mb tmp files and upload from them
 
-            // create temp file and copy entire file to that temp file
-
-            LOG.debug( "Writing temp file for S3 upload" );
-
-            final File tempFile = File.createTempFile( entity.getUuid().toString(), "tmp" );
-            tempFile.deleteOnExit();
-            OutputStream os = null;
-            try {
-                os = new BufferedOutputStream( new FileOutputStream( tempFile.getAbsolutePath() ) );
-                os.write( data );
-                written += IOUtils.copyLarge( inputStream, os, 0, ( FileUtils.ONE_GB * 5 ) );
-            }
-            finally {
-                IOUtils.closeQuietly( os );
-            }
-
-            fileMetadata.put( AssetUtils.CONTENT_LENGTH, written );
-
-            // JClouds no longer supports async blob store, so we have to do this fun stuff
-
-            LOG.debug( "Starting upload thread" );
-
-            Thread uploadThread = new Thread( new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        LOG.debug( "S3 upload thread started" );
+            ExecutorService executors = getExecutorService();
 
-                        BlobStore blobStore = getContext().getBlobStore();
-
-                        BlobBuilder.PayloadBlobBuilder bb =  blobStore.blobBuilder( uploadFileName )
-                                .payload( tempFile ).calculateMD5().contentType( mimeType );
-
-                        if ( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ) != null ) {
-                            bb.contentDisposition( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ).toString() );
-                        }
-                        final Blob blob = bb.build();
+            executors.submit( new UploadWorker( appId, entity, inputStream, data, written ) );
+        }
+    }
 
-                        String md5sum = Hex.encodeHexString( blob.getMetadata().getContentMetadata().getContentMD5() );
-                        fileMetadata.put( AssetUtils.CHECKSUM, md5sum );
 
-                        LOG.debug( "S3 upload starting" );
+    private ExecutorService getExecutorService() {
 
-                        String eTag = blobStore.putBlob( bucketName, blob );
-                        fileMetadata.put( AssetUtils.E_TAG, eTag );
+        if ( executorService == null ) {
+            synchronized (this) {
 
-                        LOG.debug( "S3 upload complete eTag=" + eTag);
+                int workers = 40;
+                String workersString = properties.getProperty( WORKERS_PROP_NAME, "40");
 
-                        EntityManager em = emf.getEntityManager( appId );
-                        em.update( entity );
-                        tempFile.delete();
-                    }
-                    catch ( Exception e ) {
-                        LOG.error( "error uploading", e );
-                    }
-                    if ( tempFile != null && tempFile.exists() ) {
-                        tempFile.delete();
-                    }
+                if ( StringUtils.isNumeric( workersString ) ) {
+                    workers = Integer.parseInt( workersString );
+                } else if ( !StringUtils.isEmpty( workersString )) {
+                    LOG.error("Ignoring invalid setting for {}", WORKERS_PROP_NAME);
                 }
-            });
-
-            uploadThread.start();
+                executorService = Executors.newFixedThreadPool( workers );
+            }
         }
+
+        return executorService;
     }
 
 
@@ -229,5 +199,124 @@ public class S3BinaryStore implements BinaryStore {
         BlobStore blobStore = getContext().getBlobStore();
         blobStore.removeBlob( bucketName, AssetUtils.buildAssetKey( appId, entity ) );
     }
+
+    class UploadWorker implements Callable<Void> {
+
+        private UUID appId;
+        private Entity entity;
+        private InputStream inputStream;
+        private byte[] data;
+        private long written;
+
+
+        public UploadWorker( UUID appId, Entity entity, InputStream is, byte[] data, long written ) {
+            this.appId = appId;
+            this.entity = entity;
+            this.inputStream = is;
+            this.data = data;
+            this.written = written;
+        }
+
+        @Override
+        public Void call() {
+
+            LOG.debug( "Writing temp file for S3 upload" );
+
+            // determine max size file allowed, default to 50mb
+            long maxSizeBytes = 50 * FileUtils.ONE_MB;
+            String maxSizeMbString = properties.getProperty( "usergrid.binary.max-size-mb", "50" );
+            if (StringUtils.isNumeric( maxSizeMbString )) {
+                maxSizeBytes = Long.parseLong( maxSizeMbString ) * FileUtils.ONE_MB;
+            }
+
+            // always allow files up to 5mb
+            if (maxSizeBytes < 5 * FileUtils.ONE_MB ) {
+                maxSizeBytes = 5 * FileUtils.ONE_MB;
+            }
+
+            // write temporary file, slightly larger than our size limit
+            OutputStream os = null;
+            File tempFile;
+            try {
+                tempFile = File.createTempFile( entity.getUuid().toString(), "tmp" );
+                tempFile.deleteOnExit();
+                os = new BufferedOutputStream( new FileOutputStream( tempFile.getAbsolutePath() ) );
+                os.write( data );
+                written += IOUtils.copyLarge( inputStream, os, 0, maxSizeBytes + 1 );
+
+            } catch ( IOException e ) {
+                throw new RuntimeException( "Error creating temp file", e );
+
+            } finally {
+                if ( os != null ) {
+                    IOUtils.closeQuietly( os );
+                }
+            }
+
+            // if tempFile is too large, delete it, add error to entity file metadata and abort
+
+            Map<String, Object> fileMetadata = AssetUtils.getFileMetadata( entity );
+
+            if ( tempFile.length() > maxSizeBytes ) {
+                LOG.debug("File too large. Temp file size (bytes) = {}, " +
+                        "Max file size (bytes) = {} ", tempFile.length(), maxSizeBytes);
+                try {
+                    EntityManager em = emf.getEntityManager( appId );
+                    fileMetadata.put( "error", "Asset size " + tempFile.length()
+                                    + " is larger than max size of " + maxSizeBytes );
+                    em.update( entity );
+                    tempFile.delete();
+
+                } catch ( Exception e ) {
+                    LOG.error( "Error updating entity with error message", e);
+                }
+                return null;
+            }
+
+            String uploadFileName = AssetUtils.buildAssetKey( appId, entity );
+            String mimeType = AssetMimeHandler.get().getMimeType( entity, data );
+
+            try {  // start the upload
+
+                LOG.debug( "S3 upload thread started" );
+
+                BlobStore blobStore = getContext().getBlobStore();
+
+                BlobBuilder.PayloadBlobBuilder bb =  blobStore.blobBuilder( uploadFileName )
+                        .payload( tempFile ).calculateMD5().contentType( mimeType );
+
+                if ( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ) != null ) {
+                    bb.contentDisposition( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ).toString() );
+                }
+                final Blob blob = bb.build();
+
+                String md5sum = Hex.encodeHexString( blob.getMetadata().getContentMetadata().getContentMD5() );
+                fileMetadata.put( AssetUtils.CHECKSUM, md5sum );
+
+                LOG.debug( "S3 upload starting" );
+
+                String eTag = blobStore.putBlob( bucketName, blob );
+
+                LOG.debug( "S3 upload complete eTag=" + eTag);
+
+                // update entity with information about uploaded asset
+
+                EntityManager em = emf.getEntityManager( appId );
+                fileMetadata.put( AssetUtils.E_TAG, eTag );
+                fileMetadata.put( AssetUtils.LAST_MODIFIED, System.currentTimeMillis() );
+                fileMetadata.put( AssetUtils.CONTENT_LENGTH, written );
+                em.update( entity );
+            }
+            catch ( Exception e ) {
+                LOG.error( "error uploading", e );
+            }
+
+            if ( tempFile != null && tempFile.exists() ) {
+                tempFile.delete();
+            }
+
+            return null;
+        }
+    }
 }