You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/02/09 17:28:46 UTC

incubator-usergrid git commit: Files are now downloaded by the workers and not the master, see the new S3ImportImpl and ImportServiceImpl.downloadAndImportFile().

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-import 2e05f2496 -> 18671e7a9


Files are now downloaded by the workers and not the master, see the new S3ImportImpl and ImportServiceImpl.downloadAndImportFile().


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/18671e7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/18671e7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/18671e7a

Branch: refs/heads/two-dot-o-import
Commit: 18671e7a9227babc4b868fc5dee0bbef0d4076ac
Parents: 2e05f24
Author: Dave Johnson <dm...@apigee.com>
Authored: Mon Feb 9 11:09:17 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Mon Feb 9 11:09:17 2015 -0500

----------------------------------------------------------------------
 .../management/importer/FileImportJob.java      |   2 +-
 .../management/importer/ImportService.java      |   2 +-
 .../management/importer/ImportServiceImpl.java  | 458 +++++++------------
 .../usergrid/management/importer/S3Import.java  |   8 +-
 .../management/importer/S3ImportImpl.java       | 224 ++-------
 .../services/queues/ImportQueueListener.java    |   2 +-
 .../management/importer/ImportCollectionIT.java |  12 +-
 .../management/importer/MockS3ImportImpl.java   |  12 +-
 8 files changed, 239 insertions(+), 481 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
index 089c8e5..a77bb99 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
@@ -67,7 +67,7 @@ public class FileImportJob extends OnlyOnceJob {
             jobExecution.heartbeat();
 
             // call the File Parser for the file set in job execution
-            importService.parseFileToEntities(jobExecution);
+            importService.downloadAndImportFile(jobExecution);
 
         } catch ( Throwable t ) {
             logger.debug("Error importing file", t);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
index 58334c9..c464a18 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
@@ -48,7 +48,7 @@ public interface ImportService {
     /**
      * Parses the input file and creates entities
      */
-    void parseFileToEntities(JobExecution jobExecution) throws Exception;
+    void downloadAndImportFile(JobExecution jobExecution) throws Exception;
 
     /**
      * Get the state for the Job with UUID

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 51e9a09..f724536 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -17,18 +17,24 @@
 
 package org.apache.usergrid.management.importer;
 
+import com.amazonaws.SDKGlobalConfiguration;
 import org.apache.usergrid.batch.JobExecution;
 import org.apache.usergrid.batch.service.SchedulerService;
 import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.management.ApplicationInfo;
 import org.apache.usergrid.management.ManagementService;
-import org.apache.usergrid.management.OrganizationInfo;
 import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.entities.FileImport;
 import org.apache.usergrid.persistence.entities.Import;
 import org.apache.usergrid.persistence.entities.JobData;
-
+import org.apache.usergrid.persistence.index.query.Query.Level;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.QueueScopeFactory;
+import org.apache.usergrid.services.ServiceManagerFactory;
+import org.apache.usergrid.services.queues.ImportQueueListener;
+import org.apache.usergrid.services.queues.ImportQueueMessage;
 import org.apache.usergrid.utils.InflectionUtils;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonParser;
@@ -43,21 +49,10 @@ import rx.functions.Func1;
 import rx.functions.Func2;
 import rx.schedulers.Schedulers;
 
+import javax.annotation.PostConstruct;
 import java.io.File;
 import java.util.*;
 
-import javax.annotation.PostConstruct;
-
-
-import org.apache.usergrid.persistence.index.query.Query.Level;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.QueueScopeFactory;
-import org.apache.usergrid.services.ServiceManagerFactory;
-import org.apache.usergrid.services.queues.ImportQueueListener;
-import org.apache.usergrid.services.queues.ImportQueueMessage;
-
 
 public class ImportServiceImpl implements ImportService {
 
@@ -104,7 +99,7 @@ public class ImportServiceImpl implements ImportService {
     }
 
     /**
-     * This schedules the main import Job
+     * This schedules the main import Job.
      *
      * @param config configuration of the job to be scheduled
      * @return it returns the UUID of the scheduled job
@@ -153,7 +148,6 @@ public class ImportServiceImpl implements ImportService {
         // schedule import job
         sch.createJob(IMPORT_JOB_NAME, soonestPossible, jobData);
 
-
         // update state for import job to created
         importUG.setState(Import.State.SCHEDULED);
         rootEm.update(importUG);
@@ -161,6 +155,7 @@ public class ImportServiceImpl implements ImportService {
         return importUG.getUuid();
     }
 
+
     /**
      * This schedules the sub  FileImport Job
      *
@@ -202,18 +197,18 @@ public class ImportServiceImpl implements ImportService {
         fileImport.setState( FileImport.State.CREATED );
         rootEm.update( fileImport );
 
-        //set data to be transferred to the FileImport Job
+        // set data to be transferred to the FileImport Job
         JobData jobData = new JobData();
         jobData.setProperty( "File", file );
         jobData.setProperty(FILE_IMPORT_ID, fileImport.getUuid());
         jobData.addProperties(config);
 
-
-
-        //update state of the job to Scheduled
+        // update state of the job to Scheduled
         fileImport.setState(FileImport.State.SCHEDULED);
         rootEm.update(fileImport);
 
+        rootEm.refreshIndex();
+
         return jobData;
     }
 
@@ -224,18 +219,14 @@ public class ImportServiceImpl implements ImportService {
      */
     public JobData scheduleFileTasks( final JobData jobData ) {
 
-
         long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
 
-        // TODO SQS: Tear this part out and set the new job to be taken in here
         // schedule file import job
-        return sch.createJob( FILE_IMPORT_JOB_NAME, soonestPossible, jobData );
+        return sch.createJob(FILE_IMPORT_JOB_NAME, soonestPossible, jobData);
     }
 
     /**
      * Query Entity Manager for the state of the Import Entity. This corresponds to the GET /import
-     *
-     * @return String
      */
     @Override
     public String getState(UUID uuid) throws Exception {
@@ -258,8 +249,6 @@ public class ImportServiceImpl implements ImportService {
 
     /**
      * Query Entity Manager for the error message generated for an import job.
-     *
-     * @return String
      */
     @Override
     public String getErrorMessage(final UUID uuid) throws Exception {
@@ -288,7 +277,6 @@ public class ImportServiceImpl implements ImportService {
      *
      * @param jobExecution the import job details
      * @return Import Entity
-     * @throws Exception
      */
     @Override
     public Import getImportEntity(final JobExecution jobExecution) throws Exception {
@@ -360,7 +348,7 @@ public class ImportServiceImpl implements ImportService {
 
 
     /**
-     * This method gets the files from s3 and also creates sub-jobs for each file i.e. File Import Jobs
+     * This method creates sub-jobs for each file i.e. File Import Jobs.
      *
      * @param jobExecution the job created by the scheduler with all the required config data
      */
@@ -369,26 +357,38 @@ public class ImportServiceImpl implements ImportService {
 
         logger.debug("doImport()");
 
-        Map<String, Object> config = (Map<String, Object>) jobExecution.getJobData().getProperty("importInfo");
-        Object s3PlaceHolder = jobExecution.getJobData().getProperty("s3Import");
-        S3Import s3Import = null;
-
+        Map<String, Object> config =
+            (Map<String, Object>) jobExecution.getJobData().getProperty("importInfo");
         if (config == null) {
             logger.error("doImport(): Import Information passed through is null");
             return;
         }
 
-        //get the entity manager for the application, and the entity that this Import corresponds to.
-        UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
+        Map<String, Object> properties =
+            (Map<String, Object>)config.get("properties");
+        Map<String, Object> storage_info =
+            (Map<String, Object>) properties.get("storage_info");
+
+        String bucketName = (String) storage_info.get("bucket_location");
+        String accessId = (String) storage_info.get( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR);
+        String secretKey = (String) storage_info.get( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR);
 
-        EntityManager rooteEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
-        Import rootImportTask = rooteEm.get(importId, Import.class);
+        // get Import Entity from the management app, update it to show that job has started
 
-        //update the entity state to show that the job has officially started.
-        rootImportTask.setState( Import.State.STARTED );
-        rootImportTask.setStarted( System.currentTimeMillis() );
+        EntityManager emManagementApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+        UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
+        Import rootImportTask = emManagementApp.get(importId, Import.class);
+
+        rootImportTask.setState(Import.State.STARTED);
+        rootImportTask.setStarted(System.currentTimeMillis());
         rootImportTask.setErrorMessage( " " );
-        rooteEm.update(rootImportTask);
+        emManagementApp.update(rootImportTask);
+        logger.debug("doImport(): updated state");
+
+        // if no S3 importer was passed in then create one
+
+        S3Import s3Import;
+        Object s3PlaceHolder = jobExecution.getJobData().getProperty("s3Import");
         try {
             if (s3PlaceHolder != null) {
                 s3Import = (S3Import) s3PlaceHolder;
@@ -396,256 +396,117 @@ public class ImportServiceImpl implements ImportService {
                 s3Import = new S3ImportImpl();
             }
         } catch (Exception e) {
-            logger.error("doImport(): S3Import doesn't exist");
-            rootImportTask.setErrorMessage( e.getMessage() );
+            logger.error("doImport(): Error creating S3Import", e);
+            rootImportTask.setErrorMessage(e.getMessage());
             rootImportTask.setState( Import.State.FAILED );
-            rooteEm.update(rootImportTask);
+            emManagementApp.update(rootImportTask);
             return;
         }
 
-        logger.debug("doImport(): updated state");
-
-        final List<File> files;
+        // get list of all JSON files in S3 bucket
 
+        final List<String> bucketFiles;
         try {
 
             if (config.get("organizationId") == null) {
                 logger.error("doImport(): No organization could be found");
                 rootImportTask.setErrorMessage( "No organization could be found" );
                 rootImportTask.setState( Import.State.FAILED );
-                rooteEm.update(rootImportTask);
+                emManagementApp.update(rootImportTask);
                 return;
 
             } else {
 
 
                 if (config.get("applicationId") == null) {
-
                     throw new UnsupportedOperationException("Import applications not supported");
 
-                    // import All the applications from an organization
-                    //importApplicationsFromOrg(
-                    //(UUID) config.get("organizationId"), config, jobExecution, s3Import);
-
                 } else if (config.get("collectionName") == null) {
-
                     throw new UnsupportedOperationException("Import application not supported");
 
-                    // imports an Application from a single organization
-                    //importApplicationFromOrg( (UUID) config.get("organizationId"),
-                    // (UUID) config.get("applicationId"), config, jobExecution, s3Import);
-
                 } else {
-
-                    // imports a single collection from an app org combo
-                    files = importCollectionFromOrgApp(
-                        (UUID) config.get("organizationId"), (UUID) config.get("applicationId"),
-                        config, jobExecution, s3Import);
+                    bucketFiles = s3Import.getBucketFileNames( bucketName, ".json", accessId, secretKey );
                 }
             }
 
         } catch (OrganizationNotFoundException | ApplicationNotFoundException e) {
             rootImportTask.setErrorMessage( e.getMessage() );
             rootImportTask.setState( Import.State.FAILED );
-            rooteEm.update(rootImportTask);
+            emManagementApp.update(rootImportTask);
             return;
         }
 
-        if (files.size() == 0) {
-            rootImportTask.setState( Import.State.FINISHED );
-            rootImportTask.setErrorMessage( "no files found in the bucket with the relevant context" );
-            rooteEm.update(rootImportTask);
-
-        } else {
 
-            Map<String, Object> fileMetadata = new HashMap<String, Object>();
+        // schedule a FileImport job for each file found in the bucket
 
-            ArrayList<Map<String, Object>> value = new ArrayList<Map<String, Object>>();
+        if ( bucketFiles.isEmpty() )  {
+            rootImportTask.setState( Import.State.FINISHED );
+            rootImportTask.setErrorMessage( "No files found in the bucket: " + bucketName );
+            emManagementApp.update(rootImportTask);
 
-            final List<JobData> fileJobs = new ArrayList<>(files.size());
+        } else {
 
-            //create the connection and set up metadata
-            for (File file : files) {
+            Map<String, Object> fileMetadata = new HashMap<>();
+            ArrayList<Map<String, Object>> value = new ArrayList<>();
+            final List<JobData> fileJobs = new ArrayList<>(bucketFiles.size());
 
-                // TODO SQS: replace the method inside here so that it uses sqs instead of internal q
-                final JobData jobData = createFileTask( config, file.getPath(), rootImportTask );
+            // create the Entity Connection and set up metadata for each job
 
+            for ( String bucketFile : bucketFiles ) {
+                final JobData jobData = createFileTask( config, bucketFile, rootImportTask );
                 fileJobs.add( jobData) ;
-
             }
 
-            //schedule each job
-            for(JobData jobData: fileJobs){
+            // schedule each job
+
+            for ( JobData jobData: fileJobs ) {
 
                 final JobData scheduled = scheduleFileTasks( jobData );
 
-                Map<String, Object> fileJobID = new HashMap<String, Object>();
-                            fileJobID.put("FileName", scheduled.getProperty( "File" ));
-                            fileJobID.put("JobID", scheduled.getUuid());
+                Map<String, Object> fileJobID = new HashMap<>();
+                    fileJobID.put("FileName", scheduled.getProperty( "File" ));
+                    fileJobID.put("JobID", scheduled.getUuid());
                 value.add(fileJobID);
             }
 
-
-
             fileMetadata.put("files", value);
             rootImportTask.addProperties( fileMetadata );
-            rooteEm.update(rootImportTask);
-        }
-    }
-
-
-    /**
-     * Imports a specific collection from an org-app combo.
-     */
-    private List<File> importCollectionFromOrgApp(
-        UUID organizationUUID, UUID applicationUUID, final Map<String, Object> config,
-        final JobExecution jobExecution, S3Import s3Import) throws Exception {
-
-        logger.debug("importCollectionFromOrgApp()");
-
-        //retrieves import entity
-        Import importUG = getImportEntity( jobExecution );
-
-        ApplicationInfo application = managementService.getApplicationInfo(applicationUUID);
-        if (application == null) {
-            throw new ApplicationNotFoundException("Application Not Found");
-        }
-
-        OrganizationInfo organizationInfo = managementService.getOrganizationByUuid(organizationUUID);
-        if (organizationInfo == null) {
-            throw new OrganizationNotFoundException("Organization Not Found");
-        }
-
-        String collectionName = config.get("collectionName").toString();
-
-        String appFileName = prepareCollectionInputFileName( organizationInfo.getName(), application.getName(),
-            collectionName );
-
-        return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.COLLECTION);
-
-    }
-
-    /**
-     * Imports a specific applications from an organization
-     */
-    private List<File> importApplicationFromOrg(
-        UUID organizationUUID, UUID applicationId, final Map<String, Object> config,
-        final JobExecution jobExecution, S3Import s3Import) throws Exception {
-
-        //retrieves import entity
-        Import importUG = getImportEntity(jobExecution);
-
-        ApplicationInfo application = managementService.getApplicationInfo( applicationId );
-        if (application == null) {
-            throw new ApplicationNotFoundException("Application Not Found");
-        }
-
-        OrganizationInfo organizationInfo = managementService.getOrganizationByUuid(organizationUUID);
-        if (organizationInfo == null) {
-            throw new OrganizationNotFoundException("Organization Not Found");
-        }
-
-        String appFileName = prepareApplicationInputFileName( organizationInfo.getName(), application.getName() );
-
-        return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.APPLICATION);
-
-    }
-
-    /**
-     * Imports All Applications from an Organization
-     */
-    private List<File> importApplicationsFromOrg(
-        UUID organizationUUID, final Map<String, Object> config,
-        final JobExecution jobExecution, S3Import s3Import) throws Exception {
-
-        // retrieves import entity
-        Import importUG = getImportEntity(jobExecution);
-
-        OrganizationInfo organizationInfo = managementService.getOrganizationByUuid( organizationUUID );
-        if (organizationInfo == null) {
-            throw new OrganizationNotFoundException("Organization Not Found");
+            emManagementApp.update(rootImportTask);
         }
-
-        // prepares the prefix path for the files to be import depending on the endpoint being hit
-        String appFileName = prepareOrganizationInputFileName( organizationInfo.getName() );
-
-        return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.ORGANIZATION);
-
-    }
-
-
-    protected String prepareCollectionInputFileName(String orgName, String appName, String collectionName) {
-        return orgName + "/" + appName + "." + collectionName + ".";
-    }
-
-
-    protected String prepareApplicationInputFileName(String orgName, String appName) {
-        return orgName + "/" + appName + ".";
-    }
-
-
-    protected String prepareOrganizationInputFileName(String orgName) {
-        return orgName + "/";
     }
 
 
-    /**
-     * Copies file from S3.
-     *
-     * @param importUG    Import instance
-     * @param appFileName the base file name for the files to be downloaded
-     * @param config      the config information for the import job
-     * @param s3Import    s3import instance
-     * @param type        it indicates the type of import
-     */
-    public ArrayList<File> copyFileFromS3(Import importUG, String appFileName,
-        Map<String, Object> config, S3Import s3Import, ImportType type) throws Exception {
-
-        EntityManager rootEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
-        ArrayList<File> copyFiles = new ArrayList<>();
-
-        try {
-            copyFiles = s3Import.copyFromS3(config, appFileName, type);
+    @Override
+    public void downloadAndImportFile(JobExecution jobExecution) throws Exception {
 
-        } catch (Exception e) {
-            logger.debug("Error copying from S3, continuing...", e);
-            importUG.setErrorMessage(e.getMessage());
-            importUG.setState(Import.State.FAILED);
-            rootEm.update(importUG);
+        Map<String, Object> properties =
+            (Map<String, Object>)jobExecution.getJobData().getProperty("properties");
+        if (properties == null) {
+            logger.error("downloadAndImportFile(): Import Information passed through is null");
+            return;
         }
-        return copyFiles;
-    }
-
+        Map<String, Object> storage_info =
+            (Map<String, Object>) properties.get("storage_info");
 
+        String bucketName = (String) storage_info.get("bucket_location");
+        String accessId = (String) storage_info.get( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR);
+        String secretKey = (String) storage_info.get( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR);
 
-    @Override
-    // TODO: ImportService should not have to know about JobExecution
-    public void parseFileToEntities(JobExecution jobExecution) throws Exception {
-
-        FileImport fileImport = getFileImportEntity( jobExecution );
-        File file = new File(jobExecution.getJobData().getProperty("File").toString());
+        FileImport fileImport = getFileImportEntity(jobExecution);
+        String fileName = jobExecution.getJobData().getProperty("File").toString();
         UUID targetAppId = (UUID) jobExecution.getJobData().getProperty("applicationId");
 
-        parseFileToEntities( jobExecution, fileImport, file, targetAppId );
-    }
-
-
-    public void parseFileToEntities( final JobExecution execution, final FileImport fileImport, final File file,
-                                     final UUID targetAppId ) throws Exception {
-
-        logger.debug( "parseFileToEntities() for file {} ", file.getAbsolutePath() );
-
-        EntityManager emManagementApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
-        emManagementApp.update( fileImport );
+        logger.debug("downloadAndImportFile() for file {} ", fileName);
 
-        //already done, nothing to do
-        if ( FileImport.State.FAILED.equals( fileImport.getState() ) || FileImport.State.FINISHED
-            .equals( fileImport.getState() ) ) {
+        if (   FileImport.State.FAILED.equals( fileImport.getState() )
+            || FileImport.State.FINISHED .equals(fileImport.getState()) ) {
             return;
         }
 
+        // update FileImport Entity to indicate that we have started
 
-        // mark the File import job as started
+        EntityManager emManagementApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
+        emManagementApp.update( fileImport );
         fileImport.setState( FileImport.State.STARTED );
         emManagementApp.update( fileImport );
 
@@ -653,64 +514,70 @@ public class ImportServiceImpl implements ImportService {
             throw new IllegalArgumentException( "Application does not exist: " + targetAppId.toString() );
         }
         EntityManager targetEm = emf.getEntityManager( targetAppId );
-        logger.debug( "   importing into app {} file {}", targetAppId.toString(), file.getAbsolutePath() );
 
-        importEntitiesFromFile( execution, file, targetEm, emManagementApp, fileImport );
+        // download file from S3, if no S3 importer was passed in then create one
 
+        S3Import s3Import;
+        Object s3PlaceHolder = jobExecution.getJobData().getProperty("s3Import");
+        try {
+            if (s3PlaceHolder != null) {
+                s3Import = (S3Import) s3PlaceHolder;
+            } else {
+                s3Import = new S3ImportImpl();
+            }
+        } catch (Exception e) {
+            logger.error("doImport(): Error creating S3Import", e);
+            fileImport.setErrorMessage(e.getMessage());
+            fileImport.setState( FileImport.State.FAILED );
+            emManagementApp.update(fileImport);
+            return;
+        }
+        File downloadedFile = s3Import.copyFileFromBucket(
+            fileName, bucketName, accessId, secretKey );
 
-        // Updates the state of file import job
-
+        // parse JSON data, create Entities and Connections from import data
 
-        // check other files status and mark the status of
-        // import Job as Finished if all others are finished
-        Results ImportJobResults =
-            emManagementApp.getConnectingEntities( fileImport, "includes", null, Level.ALL_PROPERTIES );
+        parseEntitiesAndConnectionsFromJson(
+            jobExecution, downloadedFile, targetEm, emManagementApp, fileImport);
 
-        List<Entity> importEntity = ImportJobResults.getEntities();
-        UUID importId = importEntity.get( 0 ).getUuid();
-        Import importUG = emManagementApp.get( importId, Import.class );
+        // mark ImportJob FINISHED but only if all other FileImportJobs are complete
 
-        Results entities = emManagementApp.getConnectedEntities( importUG, "includes", null, Level.ALL_PROPERTIES );
+        // get parent import job of this file import job
+        Results importJobResults =
+            emManagementApp.getConnectingEntities( fileImport, "includes", null, Level.ALL_PROPERTIES );
+        List<Entity> importEntities = importJobResults.getEntities();
+        UUID importId = importEntities.get( 0 ).getUuid();
+        Import importEntity = emManagementApp.get( importId, Import.class );
 
+        // get all file import job siblings of the current job we're working now
+        Results entities = emManagementApp.getConnectedEntities( importEntity, "includes", null, Level.ALL_PROPERTIES );
         PagingResultsIterator itr = new PagingResultsIterator( entities );
 
-
         int failCount = 0;
-
         while ( itr.hasNext() ) {
             FileImport fi = ( FileImport ) itr.next();
-
-
             switch ( fi.getState() ) {
-                //failed, but we may not be complete so continue checking
-                case FAILED:
+                case FAILED:     // failed, but we may not be complete so continue checking
                     failCount++;
                     break;
-                //finished, we can continue checking
-                case FINISHED:
+                case FINISHED:   // finished, we can continue checking
                     break;
-                //not something we recognize as complete, short circuit
-                default:
+                default:         // not something we recognize as complete, short circuit
                     return;
             }
         }
 
-
         if ( failCount == 0 ) {
-            importUG.setState( Import.State.FINISHED );
-        }
-
-        //we had failures, set it to failed
-        else {
-            importUG.setState( Import.State.FAILED );
+            importEntity.setState(Import.State.FINISHED);
+        }  else {
+            // we had failures, set it to failed
+            importEntity.setState(Import.State.FAILED);
         }
 
-        emManagementApp.update( importUG );
+        emManagementApp.update( importEntity );
     }
 
 
-
-
     /**
      * Gets the JSON parser for given file
      *
@@ -726,13 +593,13 @@ public class ImportServiceImpl implements ImportService {
     /**
      * Imports the entity's connecting references (collections, connections and dictionaries)
      *
-     * @param execution     The job execution currently running
+     * @param execution     The job jobExecution currently running
      * @param file         The file to be imported
      * @param em           Entity Manager for the application being imported
      * @param rootEm       Entity manager for the root applicaition
      * @param fileImport   The file import entity
      */
-    private void importEntitiesFromFile(
+    private void parseEntitiesAndConnectionsFromJson(
         final JobExecution execution,
         final File file,
         final EntityManager em,
@@ -781,21 +648,30 @@ public class ImportServiceImpl implements ImportService {
         // potentially skip the first n if this is a resume operation
         final int entityNumSkip = (int)tracker.getTotalEntityCount();
 
-       final int entityCount =  entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
-            @Override
-            public Boolean call( final WriteEvent writeEvent ) {
-                return !tracker.shouldStopProcessingEntities();
-            }
-        } ).skip(entityNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
-            @Override
-            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
-                return entityWrapperObservable.doOnNext(doWork);
-            }
-        }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+        // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
+//       final int entityCount =  entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
+//            @Override
+//            public Boolean call( final WriteEvent writeEvent ) {
+//                return !tracker.shouldStopProcessingEntities();
+//            }
+//        } ).skip(entityNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+//            @Override
+//            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+//                return entityWrapperObservable.doOnNext(doWork);
+//            }
+//        }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+
+        entityEventObservable.parallel(
+            new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+                @Override
+                public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+                    return entityWrapperObservable.doOnNext(doWork);
+                }
+            }, Schedulers.io()).toBlocking().last();
 
         jp.close();
 
-        logger.debug("\n\nimportEntitiesFromFile(): Wrote entities\n");
+        logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote entities\n");
 
 
         // SECOND PASS: import all connections and dictionaries
@@ -816,21 +692,31 @@ public class ImportServiceImpl implements ImportService {
         // potentially skip the first n if this is a resume operation
         final int connectionNumSkip = (int)tracker.getTotalConnectionCount();
 
-        final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
-            @Override
-            public Boolean call( final WriteEvent writeEvent ) {
-                return !tracker.shouldStopProcessingConnections();
-            }
-        } ).skip(connectionNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
-            @Override
-            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
-                return entityWrapperObservable.doOnNext(doWork);
-            }
-        }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+        // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
+//        final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
+//            @Override
+//            public Boolean call( final WriteEvent writeEvent ) {
+//                return !tracker.shouldStopProcessingConnections();
+//            }
+//        } ).skip(connectionNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+//            @Override
+//            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+//                return entityWrapperObservable.doOnNext(doWork);
+//            }
+//        }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+
+       otherEventObservable.parallel(
+            new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+                @Override
+                public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+                    return entityWrapperObservable.doOnNext(doWork);
+                }
+            }, Schedulers.io()).toBlocking().last();
 
         jp.close();
 
-        logger.debug("\n\nimportEntitiesFromFile(): Wrote others\n");
+        logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote others for file {}\n",
+            fileImport.getFileName());
 
 
         // flush the job statistics

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
index b0a0720..7e09051 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
@@ -18,7 +18,9 @@
 package org.apache.usergrid.management.importer;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 
@@ -27,7 +29,9 @@ import java.util.Map;
  */
 public interface S3Import {
 
-    ArrayList<File> copyFromS3(
-        Map<String, Object> exportInfo, String filename, ImportService.ImportType type);
+    List<String> getBucketFileNames(
+        String bucketName, String endsWith, String accessId, String secretKey ) throws Exception;
 
+    public File copyFileFromBucket(
+        String blobFileName, String bucketName, String accessId, String secretKey ) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
index 0fefeb0..f10eea9 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.management.importer;
 import com.amazonaws.SDKGlobalConfiguration;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Module;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.lucene.document.StringField;
 import org.jclouds.ContextBuilder;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
@@ -35,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.*;
@@ -44,29 +47,10 @@ public class S3ImportImpl implements S3Import {
     private static final Logger logger = LoggerFactory.getLogger(S3ImportImpl.class);
 
 
-    /**
-     * Downloads the files from s3 into temp local files.
-     *
-     * @param importInfo     the information entered by the user required to perform import from S3
-     * @param filenamePrefix generated based on the request URI
-     * @param type           indicates the type of import
-     * @return An ArrayList of files i.e. the files downloaded from s3
-     */
-    public ArrayList<File> copyFromS3(
-        final Map<String, Object> importInfo, String filenamePrefix, ImportService.ImportType type) {
+    public File copyFileFromBucket(
+        String blobFileName, String bucketName, String accessId, String secretKey ) throws Exception {
 
-        logger.debug("copyFileFromS3(): copying file={} type={}", filenamePrefix, type.toString());
-
-        ArrayList<File> files = new ArrayList<>();
-
-        Map<String, Object> properties = (Map<String, Object>) importInfo.get("properties");
-
-        Map<String, Object> storage_info = (Map<String, Object>) properties.get("storage_info");
-
-        String bucketName = (String) storage_info.get("bucket_location");
-        //TODO: have this support the alternate configurations as well
-        String accessId = (String) storage_info.get( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR);
-        String secretKey = (String) storage_info.get(SDKGlobalConfiguration.SECRET_KEY_ENV_VAR);
+        // setup to use JCloud BlobStore interface to AWS S3
 
         Properties overrides = new Properties();
         overrides.setProperty("s3" + ".identity", accessId);
@@ -82,182 +66,64 @@ public class S3ImportImpl implements S3Import {
             .modules(MODULES)
             .overrides(overrides)
             .buildView(BlobStoreContext.class);
+        BlobStore blobStore = context.getBlobStore();
 
-        try {
-
-            BlobStore blobStore = context.getBlobStore();
-
-            // gets all the files in the bucket recursively
-            PageSet<? extends StorageMetadata> pageSet =
-                blobStore.list(bucketName, new ListContainerOptions().recursive());
-
-            logger.debug("   Found {} files in bucket {}", pageSet.size(), bucketName);
-
-            Iterator itr = pageSet.iterator();
-
-            while (itr.hasNext()) {
-
-                String blobStoreFileName = ((MutableBlobMetadata) itr.next()).getName();
-                ParsedFileName pfn = new ParsedFileName(blobStoreFileName);
-
-                switch (type) {
-
-                    // collection file in format <org_name>/<app_name>.<collection_name>.[0-9]+.json
-                    case COLLECTION: {
-                        List<String> errors = new ArrayList<>();
-                        if (pfn.organizationName == null) {
-                            errors.add("Filename does not specify organization name");
-                        }
-                        if (pfn.applicationName == null) {
-                            errors.add("Filename does not specify application name");
-                        }
-                        if (pfn.collectionName == null) {
-                            errors.add("Filename does not specify collection name");
-
-                            // we shouldn't care what collection name is specified in the import file.
-//                        } else if (!pfn.collectionName.equals(importInfo.get("collectionName"))) {
-//                            errors.add("Collection name in input file should be " + pfn.collectionName);
-                        }
-                        if (!errors.isEmpty()) {
-                            throw new IllegalArgumentException("Input errors " + errors.toString());
-                        }
-                        files.add(copyFile(blobStore, bucketName, blobStoreFileName));
-                        break;
-                    }
-
-                    // application file in format <org_name>/<app_name>.[0-9]+.json
-                    case APPLICATION: {
-                        List<String> errors = new ArrayList<>();
-                        if (pfn.organizationName == null) {
-                            errors.add("Filename does not specify organization name");
-                        }
-                        if (pfn.applicationName == null) {
-                            errors.add("Filename does not specify application name");
-                        }
-                        if (!errors.isEmpty()) {
-                            throw new IllegalArgumentException("Input errors " + errors.toString());
-                        }
-
-                        files.add(copyFile(blobStore, bucketName, blobStoreFileName));
-                        break;
-                    }
-
-                    // is an application file in format <org_name>/[-a-zA-Z0-9]+.[0-9]+.json
-                    case ORGANIZATION: {
-                        List<String> errors = new ArrayList<>();
-                        if (pfn.organizationName == null) {
-                            errors.add("Filename does not specify organization name");
-                        }
-                        if (!errors.isEmpty()) {
-                            throw new IllegalArgumentException("Input errors " + errors.toString());
-                        }
-                        files.add(copyFile(blobStore, bucketName, blobStoreFileName));
-                        break;
-                    }
-
-                    default: {
-                        throw new IllegalArgumentException(
-                            "Unrecognized import type " + type.toString());
-                    }
-                }
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-        logger.debug("   Returning {} files", files.size());
-        return files;
-    }
-
-
-    /**
-     * Copy the file from s3 into a temp local file.
-     *
-     * @param bucketName the S3 bucket name from where files need to be imported
-     * @param fileName   the filename by which the temp file should be created
-     */
-    private File copyFile(BlobStore blobStore, String bucketName, String fileName) throws IOException {
-
-        Blob blob = blobStore.getBlob(bucketName, fileName);
+        // get file from configured bucket, copy it to local temp file
 
-        String[] fileOrg = fileName.split("/");
-        File organizationDirectory = new File(fileOrg[0]);
-
-        if (!organizationDirectory.exists()) {
-            try {
-                organizationDirectory.mkdir();
-
-            } catch (SecurityException se) {
-                logger.error(se.getMessage());
-            }
+        Blob blob = blobStore.getBlob(bucketName, blobFileName);
+        if ( blob == null) {
+            throw new RuntimeException(
+                "Blob file name " + blobFileName + " not found in bucket " + bucketName );
         }
-
-        File ephemeral = new File(fileName);
-        FileOutputStream fop = new FileOutputStream(ephemeral);
+        File tempFile = File.createTempFile( bucketName, RandomStringUtils.randomAlphabetic(10));
+        FileOutputStream fop = new FileOutputStream(tempFile);
         blob.getPayload().writeTo(fop);
         fop.close();
+        tempFile.deleteOnExit();
 
-        organizationDirectory.deleteOnExit();
-        ephemeral.deleteOnExit();
-
-        return ephemeral;
+        return tempFile;
     }
 
 
-    /**
-     * Break filename down into parts.
-     */
-    class ParsedFileName {
-        String fileName;
-        String applicationName;
-        String collectionName;
-        String organizationName;
-        long fileNumber = -1L;
-
-        public ParsedFileName(String fileName) {
+    @Override
+    public List<String> getBucketFileNames(
+        String bucketName, String endsWith, String accessId, String secretKey ) {
 
-            this.fileName = fileName;
+        // get setup to use JCloud BlobStore interface to AWS S3
 
-            if (fileName.endsWith("\\.json")) {
-                logger.debug("Bad filename " + fileName);
-                throw new IllegalArgumentException("Import filenames must end with .json");
-            }
+        Properties overrides = new Properties();
+        overrides.setProperty("s3" + ".identity", accessId);
+        overrides.setProperty("s3" + ".credential", secretKey);
 
-            if (fileName.contains("/")) {
-                String[] parts = fileName.split("/");
-                organizationName = parts[0];
+        final Iterable<? extends Module> MODULES = ImmutableSet.of(
+            new JavaUrlHttpCommandExecutorServiceModule(),
+            new Log4JLoggingModule(),
+            new NettyPayloadModule());
 
-                if (parts.length > 1) {
-                    String[] secondParts = parts[1].split("\\.");
-                    applicationName = secondParts[0];
+        BlobStoreContext context = ContextBuilder.newBuilder("s3")
+            .credentials(accessId, secretKey)
+            .modules(MODULES)
+            .overrides(overrides)
+            .buildView(BlobStoreContext.class);
+        BlobStore blobStore = context.getBlobStore();
 
-                    if (secondParts.length > 1) {
-                        collectionName = secondParts[1];
-                    }
+        // gets all the files in the configured bucket recursively
 
-                    if (secondParts.length > 2) {
-                        fileNumber = Long.parseLong(secondParts[2]);
-                    }
-                }
-            }
+        PageSet<? extends StorageMetadata> pageSets =
+            blobStore.list(bucketName, new ListContainerOptions().recursive());
+        logger.debug("   Found {} files in bucket {}", pageSets.size(), bucketName);
 
-            if (applicationName == null
-                && collectionName == null
-                && organizationName == null) {
-                throw new IllegalArgumentException("Unable to parse import filename " + fileName);
+        List<String> blobFileNames = new ArrayList<>();
+        for ( Object pageSet : pageSets ) {
+            String blobFileName = ((MutableBlobMetadata)pageSet).getName();
+            if ( blobFileName.endsWith( endsWith )) {
+                blobFileNames.add(blobFileName);
             }
-
-            logger.debug("Parsed " + toString());
         }
 
-        public String toString() {
-            StringBuilder sb = new StringBuilder();
-            sb.append("org: ").append(organizationName);
-            sb.append(" app: ").append(applicationName);
-            sb.append(" col: ").append(collectionName);
-            sb.append(" num: ").append(fileNumber);
-            return sb.toString();
-        }
+        return blobFileNames;
     }
+
+
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
index 0757863..9223ac2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
@@ -75,7 +75,7 @@ public class ImportQueueListener extends QueueListener {
         for (QueueMessage message : messages) {
             ImportQueueMessage queueMessage = ( ImportQueueMessage ) message.getBody();
 
-//        TODO   We still need to hide this queue behind the scheduler importService.parseFileToEntities( queueMessage );
+//        TODO   We still need to hide this queue behind the scheduler importService.downloadAndImportFile( queueMessage );
 
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index ef335c8..9a8c977 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@ -401,7 +401,6 @@ public class ImportCollectionIT {
 
         logger.debug("\n\nImport into new app {}\n", em.getApplication().getName() );
 
-
         ImportService importService = setup.getImportService();
         UUID importUUID = importService.schedule( new HashMap<String, Object>() {{
             put( "path", organization.getName() + em.getApplication().getName());
@@ -420,15 +419,13 @@ public class ImportCollectionIT {
             }});
         }});
 
-        //  listener.start();
-
-//        int maxRetries = 20;
-//        int retries = 0;
-        while ( !importService.getState( importUUID ).equals( "FINISHED" ) ) {
+        int maxRetries = 20;
+        int retries = 0;
+        while ( !importService.getState( importUUID ).equals( "FINISHED" ) && retries++ < maxRetries ) {
+            logger.debug("Waiting for import...");
             Thread.sleep(1000);
         }
 
-
         em.refreshIndex();
     }
 
@@ -465,6 +462,7 @@ public class ImportCollectionIT {
         int maxRetries = 20;
         int retries = 0;
         while ( !exportService.getState( exportUUID ).equals( "FINISHED" ) && retries++ < maxRetries ) {
+            logger.debug("Waiting for export...");
             Thread.sleep(1000);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/18671e7a/stack/services/src/test/java/org/apache/usergrid/management/importer/MockS3ImportImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/MockS3ImportImpl.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/MockS3ImportImpl.java
index caf7fd2..16096b6 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/MockS3ImportImpl.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/MockS3ImportImpl.java
@@ -21,18 +21,22 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Map;
+import java.util.List;
 
 
 public class MockS3ImportImpl implements S3Import{
     private static final Logger logger = LoggerFactory.getLogger(MockS3ImportImpl.class);
 
     @Override
-    public ArrayList<File> copyFromS3(
-        final Map<String,Object> exportInfo, String filename , ImportService.ImportType type ) {
-        logger.info("Copying from S3 file {} with import type {}", filename, type.toString());
+    public List<String> getBucketFileNames(String bucketName, String endsWith, String accessId, String secretKey) {
         return new ArrayList<>();
     }
 
+    @Override
+    public File copyFileFromBucket(String blobFileName, String bucketName, String accessId, String secretKey) throws IOException {
+        return File.createTempFile("test","tmp");
+    }
+
 }