You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/08/22 16:16:24 UTC

[88/95] [abbrv] git commit: merged distrbuted and multithreading code

merged distrbuted and multithreading code


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c4950911
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c4950911
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c4950911

Branch: refs/heads/import-feature
Commit: c49509119492bde4ed8fcb3000d73316f8ef6dc8
Parents: b266015 b0e333f
Author: Pooja Jain <pj...@apigee.com>
Authored: Tue Aug 5 16:20:22 2014 -0700
Committer: Pooja Jain <pj...@apigee.com>
Committed: Tue Aug 5 16:20:22 2014 -0700

----------------------------------------------------------------------
 .../cassandra/ManagementServiceImpl.java        |   7 +-
 .../management/importUG/ImportServiceImpl.java  | 571 +++++++++++--------
 .../management/cassandra/ImportServiceIT.java   |   5 +-
 3 files changed, 350 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4950911/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
index fa2221b,100a9a6..69805e9
--- a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
@@@ -16,22 -16,8 +16,21 @@@
   */
  package org.apache.usergrid.management.cassandra;
  
 -import com.google.common.collect.BiMap;
 -import com.google.common.collect.HashBiMap;
 +
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Properties;
 +import java.util.Set;
 +import java.util.UUID;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.springframework.beans.factory.annotation.Autowired;
- 
  import org.apache.commons.codec.digest.DigestUtils;
  import org.apache.commons.lang.text.StrSubstitutor;
  import org.apache.shiro.UnavailableSecurityManagerException;
@@@ -62,20 -48,16 +61,19 @@@ import org.apache.usergrid.security.tok
  import org.apache.usergrid.security.tokens.TokenInfo;
  import org.apache.usergrid.security.tokens.TokenService;
  import org.apache.usergrid.security.tokens.exceptions.TokenException;
 -import org.apache.usergrid.services.*;
 -import org.apache.usergrid.utils.*;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 -import org.springframework.beans.factory.annotation.Autowired;
 -
 -import java.nio.ByteBuffer;
 -import java.util.*;
 -import java.util.Map.Entry;
 +import org.apache.usergrid.services.ServiceAction;
 +import org.apache.usergrid.services.ServiceManager;
 +import org.apache.usergrid.services.ServiceManagerFactory;
 +import org.apache.usergrid.services.ServiceRequest;
 +import org.apache.usergrid.services.ServiceResults;
 +import org.apache.usergrid.utils.ConversionUtils;
 +import org.apache.usergrid.utils.JsonUtils;
 +import org.apache.usergrid.utils.MailUtils;
 +import org.apache.usergrid.utils.StringUtils;
 +import org.apache.usergrid.utils.UUIDUtils;
  
 +import com.google.common.collect.BiMap;
 +import com.google.common.collect.HashBiMap;
- 
  import static java.lang.Boolean.parseBoolean;
  import static org.apache.commons.codec.binary.Base64.encodeBase64URLSafeString;
  import static org.apache.commons.codec.digest.DigestUtils.sha;
@@@ -1952,7 -1934,9 +1950,8 @@@ public class ManagementServiceImpl impl
          return !Boolean.TRUE.equals( em.getProperty( new SimpleEntityRef( User.ENTITY_TYPE, userId ), "disabled" ) );
      }
  
-     protected String emailMsg( Map<String, String> values, String propertyName ) {
+ 
 -
+     public String emailMsg( Map<String, String> values, String propertyName ) {
          return new StrSubstitutor( values ).replace( properties.getProperty( propertyName ) );
      }
  
@@@ -2460,7 -2444,8 +2459,7 @@@
      }
  
  
-     protected String buildUserAppUrl( UUID applicationId, String url, User user, String token ) throws Exception {
 -
+     public String buildUserAppUrl( UUID applicationId, String url, User user, String token ) throws Exception {
          ApplicationInfo ai = getApplicationInfo( applicationId );
          OrganizationInfo oi = getOrganizationForApplication( applicationId );
          return String.format( url, oi.getName(), StringUtils.stringOrSubstringAfterFirst( ai.getName(), '/' ),

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4950911/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
index 14e5b18,c76420e..eec77a2
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
@@@ -67,11 -68,10 +72,10 @@@ public class ImportServiceImpl implemen
  
      private JsonFactory jsonFactory = new JsonFactory();
  
--    private int entityCount=0;
++    private int entityCount = 0;
 +
  
      /**
--     *
       * @param config configuration of the job to be scheduled
       * @return it returns the UUID of the scheduled job
       * @throws Exception
@@@ -79,21 -79,23 +83,20 @@@
      @Override
      public UUID schedule(Map<String, Object> config) throws Exception {
  
-         if ( config == null ) {
-             logger.error( "import information cannot be null" );
 -        ApplicationInfo defaultImportApp = null;
 -
 -        if ( config == null ) {
 -            logger.error( "import information cannot be null" );
++        if (config == null) {
++            logger.error("import information cannot be null");
              return null;
          }
  
          EntityManager em = null;
          try {
--            em = emf.getEntityManager( MANAGEMENT_APPLICATION_ID );
++            em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
              Set<String> collections = em.getApplicationCollections();
--            if ( !collections.contains( "imports" ) ) {
--                em.createApplicationCollection( "imports" );
++            if (!collections.contains("imports")) {
++                em.createApplicationCollection("imports");
              }
--        }
--        catch ( Exception e ) {
--            logger.error( "application doesn't exist within the current context" );
++        } catch (Exception e) {
++            logger.error("application doesn't exist within the current context");
              return null;
          }
  
@@@ -101,90 -103,34 +104,85 @@@
  
          //update state
          try {
--            importUG = em.create( importUG );
--        }
--        catch ( Exception e ) {
--            logger.error( "Import entity creation failed" );
++            importUG = em.create(importUG);
++        } catch (Exception e) {
++            logger.error("Import entity creation failed");
              return null;
          }
  
--        importUG.setState( Import.State.CREATED );
--        em.update( importUG );
++        importUG.setState(Import.State.CREATED);
++        em.update(importUG);
  
          //set data to be transferred to importInfo
          JobData jobData = new JobData();
--        jobData.setProperty( "importInfo", config );
--        jobData.setProperty( IMPORT_ID, importUG.getUuid() );
++        jobData.setProperty("importInfo", config);
++        jobData.setProperty(IMPORT_ID, importUG.getUuid());
  
          long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
  
          //schedule job
--        sch.createJob( IMPORT_JOB_NAME, soonestPossible, jobData );
++        sch.createJob(IMPORT_JOB_NAME, soonestPossible, jobData);
  
          //update state
--        importUG.setState( Import.State.SCHEDULED );
--        em.update( importUG );
++        importUG.setState(Import.State.SCHEDULED);
++        em.update(importUG);
  
          return importUG.getUuid();
      }
  
+ 
 +    /**
-      *
-      * @param file  file to be scheduled
++     * @param file file to be scheduled
 +     * @return it returns the UUID of the scheduled job
 +     * @throws Exception
 +     */
 +    public UUID scheduleFile(String file, EntityRef importRef) throws Exception {
 +
-         ApplicationInfo defaultImportApp = null;
- 
 +        EntityManager em = null;
 +
 +        try {
-             em = emf.getEntityManager( MANAGEMENT_APPLICATION_ID );
-         }
-         catch ( Exception e ) {
-             logger.error( "application doesn't exist within the current context" );
++            em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
++        } catch (Exception e) {
++            logger.error("application doesn't exist within the current context");
 +            return null;
 +        }
 +
 +        FileImport fileImport = new FileImport();
 +
 +        fileImport.setFileName(file);
 +        fileImport.setCompleted(false);
 +        fileImport.setLastUpdatedUUID(" ");
 +        fileImport.setErrorMessage(" ");
 +        fileImport.setState(FileImport.State.CREATED);
 +        fileImport = em.create(fileImport);
 +
-         Import importUG = em.get(importRef,Import.class);
++        Import importUG = em.get(importRef, Import.class);
 +
 +        try {
-             em.createConnection(importUG,"includes",fileImport);
-         }
-         catch ( Exception e ) {
++            em.createConnection(importUG, "includes", fileImport);
++        } catch (Exception e) {
 +            logger.error(e.getMessage());
 +            return null;
 +        }
-         fileImport.setState( FileImport.State.CREATED );
-         em.update( fileImport );
++        fileImport.setState(FileImport.State.CREATED);
++        em.update(fileImport);
 +
 +        //set data to be transferred to importInfo
 +        JobData jobData = new JobData();
-         jobData.setProperty( "File", file );
-         jobData.setProperty( FILE_IMPORT_ID , fileImport.getUuid() );
++        jobData.setProperty("File", file);
++        jobData.setProperty(FILE_IMPORT_ID, fileImport.getUuid());
 +
 +        long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
 +
 +        //schedule job
-         sch.createJob(FILE_IMPORT_JOB_NAME, soonestPossible, jobData );
++        sch.createJob(FILE_IMPORT_JOB_NAME, soonestPossible, jobData);
 +
 +        //update state
-         fileImport.setState( FileImport.State.SCHEDULED );
-         em.update( fileImport );
++        fileImport.setState(FileImport.State.SCHEDULED);
++        em.update(fileImport);
 +
 +        return fileImport.getUuid();
 +    }
  
      /**
       * Query Entity Manager for the string state of the Import Entity. This corresponds to the GET /import
@@@ -193,18 -139,18 +191,18 @@@
       */
      @Override
      public String getState(UUID uuid) throws Exception {
--        if ( uuid == null ) {
--            logger.error( "UUID passed in cannot be null." );
++        if (uuid == null) {
++            logger.error("UUID passed in cannot be null.");
              return "UUID passed in cannot be null";
          }
  
--        EntityManager rootEm = emf.getEntityManager( MANAGEMENT_APPLICATION_ID );
++        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
  
          //retrieve the import entity.
--        Import importUG = rootEm.get( uuid, Import.class );
++        Import importUG = rootEm.get(uuid, Import.class);
  
--        if ( importUG == null ) {
--            logger.error( "no entity with that uuid was found" );
++        if (importUG == null) {
++            logger.error("no entity with that uuid was found");
              return "No Such Element found";
          }
          return importUG.getState().toString();
@@@ -216,46 -162,39 +214,47 @@@
       * @return String
       */
      @Override
--    public String getErrorMessage(final UUID uuid ) throws Exception {
++    public String getErrorMessage(final UUID uuid) throws Exception {
  
          //get application entity manager
  
--        if ( uuid == null ) {
--            logger.error( "UUID passed in cannot be null." );
++        if (uuid == null) {
++            logger.error("UUID passed in cannot be null.");
              return "UUID passed in cannot be null";
          }
  
--        EntityManager rootEm = emf.getEntityManager(  MANAGEMENT_APPLICATION_ID );
++        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
  
          //retrieve the import entity.
--        Import importUG = rootEm.get( uuid, Import.class );
++        Import importUG = rootEm.get(uuid, Import.class);
  
--        if ( importUG == null ) {
--            logger.error( "no entity with that uuid was found" );
++        if (importUG == null) {
++            logger.error("no entity with that uuid was found");
              return "No Such Element found";
          }
          return importUG.getErrorMessage().toString();
      }
  
 +    @Override
-     public Import getImportEntity( final JobExecution jobExecution ) throws Exception {
++    public Import getImportEntity(final JobExecution jobExecution) throws Exception {
  
-         UUID importId = ( UUID ) jobExecution.getJobData().getProperty( IMPORT_ID );
 -    //@Override
 -    public Import getImportEntity( final JobExecution jobExecution ) throws Exception {
 -
 -        UUID importId = ( UUID ) jobExecution.getJobData().getProperty( IMPORT_ID );
++        UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
          EntityManager importManager = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
  
--        return importManager.get( importId, Import.class );
++        return importManager.get(importId, Import.class);
      }
  
+ 
      @Override
-     public FileImport getFileImportEntity( final JobExecution jobExecution ) throws Exception {
++    public FileImport getFileImportEntity(final JobExecution jobExecution) throws Exception {
 +
-         UUID fileImportId = ( UUID ) jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
++        UUID fileImportId = (UUID) jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
 +        EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
 +
-         return em.get( fileImportId, FileImport.class );
++        return em.get(fileImportId, FileImport.class);
 +    }
 +
 +    @Override
      public ArrayList<File> getEphemeralFile() {
          return files;
      }
@@@ -265,7 -204,7 +264,7 @@@
      }
  
  
--    public void setSch( final SchedulerService sch ) {
++    public void setSch(final SchedulerService sch) {
          this.sch = sch;
      }
  
@@@ -275,7 -214,7 +274,7 @@@
      }
  
  
--    public void setEmf( final EntityManagerFactory emf ) {
++    public void setEmf(final EntityManagerFactory emf) {
          this.emf = emf;
      }
  
@@@ -286,12 -225,12 +285,11 @@@
      }
  
  
--    public void setManagementService( final ManagementService managementService ) {
++    public void setManagementService(final ManagementService managementService) {
          this.managementService = managementService;
      }
  
      /**
--     *
       * @param jobExecution the job created by the scheduler with all the required config data
       * @throws Exception
       */
@@@ -332,128 -269,126 +330,136 @@@
              return;
          }
  
+         try {
 +
-         if (config.get("organizationId") == null) {
-             logger.error("No organization could be found");
-             importUG.setErrorMessage("No organization could be found");
-             importUG.setState(Import.State.FAILED);
+             if (config.get("organizationId") == null) {
+                 logger.error("No organization could be found");
++                importUG.setErrorMessage("No organization could be found");
+                 importUG.setState(Import.State.FAILED);
+                 em.update(importUG);
+                 return;
+             } else if (config.get("applicationId") == null) {
+                 //import All the applications from an organization
+                 importApplicationsFromOrg((UUID) config.get("organizationId"), config, jobExecution, s3Import);
+             } else if (config.get("collectionName") == null) {
+                 //imports an Application from a single organization
+                 importApplicationFromOrg((UUID) config.get("organizationId"), (UUID) config.get("applicationId"), config, jobExecution, s3Import);
+             } else {
+                 //imports a single collection from an app org combo
+                 importCollectionFromOrgApp((UUID) config.get("applicationId"), config, jobExecution, s3Import);
+             }
+         }
+         catch (OrganizationNotFoundException e) {
+             importUG.setErrorMessage(e.getMessage());
 -            importUG.setState(Import.State.FAILED);
++            importUG.setState(Import.State.FINISHED);
+             em.update(importUG);
+             return;
 -        }
 -        catch (ApplicationNotFoundException e) {
++        } catch (ApplicationNotFoundException e) {
+             importUG.setErrorMessage(e.getMessage());
 -            importUG.setState(Import.State.FAILED);
++            importUG.setState(Import.State.FINISHED);
              em.update(importUG);
              return;
-         } else if (config.get("applicationId") == null) {
-             //import All the applications from an organization
-             importApplicationsFromOrg((UUID) config.get("organizationId"), config, jobExecution, s3Import);
-         } else if (config.get("collectionName") == null) {
-             //imports an Application from a single organization
-             importApplicationFromOrg((UUID) config.get("organizationId"), (UUID) config.get("applicationId"), config, jobExecution, s3Import);
-         } else {
-             //imports a single collection from an app org combo
-             importCollectionFromOrgApp((UUID) config.get("applicationId"), config, jobExecution, s3Import);
          }
 -        catch (Exception e) {
 -            // the case where job will be retried i.e. resumed from the failed point
 -            importUG.setErrorMessage(e.getMessage());
 -            importUG.setState(Import.State.FAILED);
 +
-         if(files.size() == 0)
-         {
++        if (files.size() == 0) {
 +            importUG.setState(Import.State.FINISHED);
 +            importUG.setErrorMessage("no files found in the bucket with the relevant context");
              em.update(importUG);
-         }
-         else
-         {
-             Map<String,Object> fileMetadata = new HashMap<String, Object>();
 -            throw e;
 -        }
++        } else {
++            Map<String, Object> fileMetadata = new HashMap<String, Object>();
 +
-             ArrayList<Map<String,Object>> value = new ArrayList<Map<String, Object>>();
++            ArrayList<Map<String, Object>> value = new ArrayList<Map<String, Object>>();
 +
-             for(File eachfile: files) {
++            for (File eachfile : files) {
 +
 +                UUID jobID = scheduleFile(eachfile.getPath(), em.getRef(importId));
-                 Map<String,Object> fileJobID = new HashMap<String,Object>();
-                 fileJobID.put("FileName",eachfile.getName());
++                Map<String, Object> fileJobID = new HashMap<String, Object>();
++                fileJobID.put("FileName", eachfile.getName());
 +                fileJobID.put("JobID", jobID.toString());
 +                value.add(fileJobID);
 +            }
  
 -        importUG.setState( Import.State.FINISHED );
 -        em.update( importUG );
 +            fileMetadata.put("files", value);
 +            importUG.addProperties(fileMetadata);
 +            em.update(importUG);
 +        }
 +        return;
      }
  
      /**
       * Imports a specific collection from an org-app combo.
       */
--    private void importCollectionFromOrgApp( UUID applicationUUID, final Map<String, Object> config,
--                                             final JobExecution jobExecution, S3Import s3Import ) throws Exception {
++    private void importCollectionFromOrgApp(UUID applicationUUID, final Map<String, Object> config,
++                                            final JobExecution jobExecution, S3Import s3Import) throws Exception {
  
 -        //retrieves export entity
 +        //retrieves import entity
          Import importUG = getImportEntity(jobExecution);
          ApplicationInfo application = managementService.getApplicationInfo(applicationUUID);
  
--        if(application == null) {
++        if (application == null) {
              throw new ApplicationNotFoundException("Application Not Found");
          }
          String collectionName = config.get("collectionName").toString();
  
  
--        String appFileName = prepareInputFileName("application", application.getName(),collectionName);
++        String appFileName = prepareInputFileName("application", application.getName(), collectionName);
  
--        files = fileTransfer( importUG, appFileName, config, s3Import, 0 );
++        files = fileTransfer(importUG, appFileName, config, s3Import, 0);
  
 -        FileParser(jobExecution);
      }
  
      /**
       * Imports a specific applications from an organization
       */
--    private void importApplicationFromOrg( UUID organizationUUID, UUID applicationId, final Map<String, Object> config,
--                                           final JobExecution jobExecution, S3Import s3Import ) throws Exception {
++    private void importApplicationFromOrg(UUID organizationUUID, UUID applicationId, final Map<String, Object> config,
++                                          final JobExecution jobExecution, S3Import s3Import) throws Exception {
  
          //retrieves import entity
          Import importUG = getImportEntity(jobExecution);
  
--        ApplicationInfo application = managementService.getApplicationInfo( applicationId );
++        ApplicationInfo application = managementService.getApplicationInfo(applicationId);
  
--        if(application == null) {
++        if (application == null) {
              throw new ApplicationNotFoundException("Application Not Found");
          }
  
          String appFileName = prepareInputFileName("application", application.getName(), null);
  
--        files = fileTransfer( importUG, appFileName, config, s3Import, 1 );
++        files = fileTransfer(importUG, appFileName, config, s3Import, 1);
  
 -        FileParser(jobExecution);
      }
  
      /**
       * Imports All Applications from an Organization
       */
--    private void importApplicationsFromOrg( UUID organizationUUID, final Map<String, Object> config,
--                                            final JobExecution jobExecution, S3Import s3Import ) throws Exception {
++    private void importApplicationsFromOrg(UUID organizationUUID, final Map<String, Object> config,
++                                           final JobExecution jobExecution, S3Import s3Import) throws Exception {
  
          // retrieves import entity
          Import importUG = getImportEntity(jobExecution);
          String appFileName = null;
  
          OrganizationInfo organizationInfo = managementService.getOrganizationByUuid(organizationUUID);
--        if(organizationInfo == null) {
++        if (organizationInfo == null) {
              throw new OrganizationNotFoundException("Organization Not Found");
          }
  
--        appFileName = prepareInputFileName( "organization", organizationInfo.getName() , null );
--        files = fileTransfer( importUG, appFileName, config, s3Import, 2 );
++        appFileName = prepareInputFileName("organization", organizationInfo.getName(), null);
++        files = fileTransfer(importUG, appFileName, config, s3Import, 2);
  
 -        FileParser(jobExecution);
      }
  
      /**
       * @param type just a label such us: organization, application.
--     *
       * @return the file name concatenated with the type and the name of the collection
       */
--    protected String prepareInputFileName( String type, String name, String CollectionName ) {
++    protected String prepareInputFileName(String type, String name, String CollectionName) {
          StringBuilder str = new StringBuilder();
          // in case of type organization --> the file name will be "<org_name>/"
--        if(type.equals("organization")) {
++        if (type.equals("organization")) {
              str.append(name);
              str.append("/");
--        }
--        else if(type.equals("application")) {
++        } else if (type.equals("application")) {
              // in case of type application --> the file name will be "<org_name>/<app_name>."
              str.append(name);
              str.append(".");
@@@ -470,21 -405,21 +476,19 @@@
      }
  
      /**
--     *
--     * @param importUG Import instance
--     * @param appFileName   the base file name for the files to be downloaded
--     * @param config    the config information for the import job
--     * @param s3Import  s3import instance
--     * @param type  it indicates the type of import. 0 - Collection , 1 - Application and 2 - Organization
++     * @param importUG    Import instance
++     * @param appFileName the base file name for the files to be downloaded
++     * @param config      the config information for the import job
++     * @param s3Import    s3import instance
++     * @param type        it indicates the type of import. 0 - Collection , 1 - Application and 2 - Organization
       * @return
       */
--    public ArrayList<File> fileTransfer( Import importUG, String appFileName, Map<String, Object> config,
--                                         S3Import s3Import , int type) {
++    public ArrayList<File> fileTransfer(Import importUG, String appFileName, Map<String, Object> config,
++                                        S3Import s3Import, int type) {
          ArrayList<File> files = new ArrayList<File>();
          try {
--            files  =  s3Import.copyFromS3(config, appFileName , type);
--        }
--        catch ( Exception e ) {
++            files = s3Import.copyFromS3(config, appFileName, type);
++        } catch (Exception e) {
              importUG.setErrorMessage(e.getMessage());
              importUG.setState(Import.State.FAILED);
          }
@@@ -493,32 -428,50 +497,32 @@@
  
      /**
       * The loops through each temp file and parses it to store the entities from the json back into usergrid
++     *
       * @throws Exception
       */
 -    private void FileParser(JobExecution jobExecution) throws Exception {
 -
 -        // add properties to the import entity
 -        Import importUG = getImportEntity(jobExecution);
 -        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
 -
 +    @Override
 +    public void FileParser(JobExecution jobExecution) throws Exception {
  
  
- 
 -        Map<String,Object> fileMetadata = new HashMap<String, Object>();
 -        ArrayList<Map<String,Object>> value = new ArrayList<Map<String, Object>>();
 +        // add properties to the import entity
 +        FileImport fileImport = getFileImportEntity(jobExecution);
  
 -        if (!((Map<String,Object>)importUG.getDynamicProperties()).containsKey("files")) {
 -            // create the structure for file metadata and initialize it
 -            for (File collectionFile : files) {
 -                Map<String, Object> singleFile = new HashMap<String, Object>();
 -                singleFile.put("name", collectionFile.getName());
 -                singleFile.put("completed", new Boolean(false));
 -                singleFile.put("lastUpdatedUUID", new String(""));
 -                value.add(singleFile);
 -            }
 +        File file = new File(jobExecution.getJobData().getProperty("File").toString());
  
 -            fileMetadata.put("files", value);
 -            importUG.addProperties(fileMetadata);
 -            rootEm.update(importUG);
 -        }
 +        EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
 +        rootEm.update(fileImport);
  
 -        ArrayList fileNames = (ArrayList)importUG.getDynamicProperties().get("files");
 -        int i=0;
 +        boolean completed = fileImport.getCompleted();
  
 -        for(File collectionFile : files) {
 +        // on resume, completed files will not be traversed again
-         if(!completed) {
++        if (!completed) {
  
 -            Map<String,Object> fileInfo = (Map<String,Object>)fileNames.get(i);
 -            boolean completed = ((Boolean)fileInfo.get("completed")).booleanValue();
 -            // on resume, completed files will not be traversed again
 -            if(!completed) {
 +            if (isValidJSON(file, rootEm, fileImport)) {
  
 -                if(!isValidJSON(collectionFile,rootEm,importUG,i)){
 -                    i++;
 -                    continue;
 -                }
 +                fileImport.setState(FileImport.State.STARTED);
 +                rootEm.update(fileImport);
  
 -                String applicationName = collectionFile.getPath().split("\\.")[0];
 +                String applicationName = file.getPath().split("\\.")[0];
  
                  ApplicationInfo application = managementService.getApplicationInfo(applicationName);
  
@@@ -544,71 -498,23 +548,50 @@@
                  }
  
                  EntityManager em = emf.getEntityManager(application.getId());
-                 try {
  
-                     String uuid=" ";
-                     while (jp.nextToken() != JsonToken.END_ARRAY) {
-                         uuid = importEntityStuff(jp, em, rootEm, fileImport, jobExecution);
-                     }
-                     if(!uuid.equals(" "))
-                     {
-                         fileImport.setLastUpdatedUUID(uuid);
-                         rootEm.update(fileImport);
-                     }
-                     jp.close();
-                 }
-                 catch (OrganizationNotFoundException e) {
-                     fileImport.setErrorMessage(e.getMessage());
-                     fileImport.setState(FileImport.State.FINISHED);
-                     rootEm.update(fileImport);
-                     return;
-                 }
-                 catch (ApplicationNotFoundException e) {
-                     fileImport.setErrorMessage(e.getMessage());
-                     fileImport.setState(FileImport.State.FINISHED);
-                     rootEm.update(fileImport);
-                     return;
 -                //TODO: remove roles and take care of it later when importing applications
+                 while (jp.nextToken() != JsonToken.END_ARRAY) {
 -                    importEntityStuff(jp, em, rootEm, importUG, i);
++                    importEntityStuff(jp, em, rootEm, fileImport, jobExecution);
                  }
+                 jp.close();
 +
-                 if(!fileImport.getState().equals("FAILED")) {
++                if (!fileImport.getState().equals("FAILED")) {
 +
 +                    // mark file as completed
 +                    fileImport.setCompleted(true);
 +                    fileImport.setState(FileImport.State.FINISHED);
 +                    rootEm.update(fileImport);
 +
 +                    //check other files status and mark the status of import Job.
 +                    Results ImportJobResults = rootEm.getConnectingEntities(fileImport.getUuid(), "includes", null, Results.Level.ALL_PROPERTIES);
 +                    List<Entity> importEntity = ImportJobResults.getEntities();
 +                    UUID importId = importEntity.get(0).getUuid();
 +                    Import importUG = rootEm.get(importId, Import.class);
 +
-                     Results entities = rootEm.getConnectedEntities(importId,"includes",null,Results.Level.ALL_PROPERTIES);
++                    Results entities = rootEm.getConnectedEntities(importId, "includes", null, Results.Level.ALL_PROPERTIES);
 +                    List<Entity> importFile = entities.getEntities();
 +
 +                    int count = 0;
-                     for(Entity eachEntity: importFile) {
-                         FileImport fi = rootEm.get(eachEntity.getUuid(),FileImport.class);
-                         if(fi.getState().toString().equals("FINISHED")) {
++                    for (Entity eachEntity : importFile) {
++                        FileImport fi = rootEm.get(eachEntity.getUuid(), FileImport.class);
++                        if (fi.getState().toString().equals("FINISHED")) {
 +                            count++;
-                         }
-                         else if(fi.getState().toString().equals("FAILED")) {
++                        } else if (fi.getState().toString().equals("FAILED")) {
 +                            importUG.setState(Import.State.FAILED);
 +                            rootEm.update(importUG);
 +                            break;
 +                        }
 +                    }
-                     if(count == importFile.size()) {
++                    if (count == importFile.size()) {
 +                        importUG.setState(Import.State.FINISHED);
 +                        rootEm.update(importUG);
 +                    }
 +                }
              }
          }
      }
  
-     private boolean isValidJSON( File collectionFile, EntityManager rootEm, FileImport fileImport) throws Exception  {
 -    private boolean isValidJSON( File collectionFile, EntityManager rootEm, Import importUG, int index) throws Exception  {
++    private boolean isValidJSON(File collectionFile, EntityManager rootEm, FileImport fileImport) throws Exception {
  
 -        ArrayList fileNames = (ArrayList) importUG.getDynamicProperties().get("files");
          boolean valid = false;
          try {
              final JsonParser jp = jsonFactory.createJsonParser(collectionFile);
@@@ -626,9 -533,10 +609,10 @@@
          return valid;
      }
  
-     private JsonParser getJsonParserForFile( File collectionFile ) throws Exception {
-         JsonParser jp = jsonFactory.createJsonParser( collectionFile );
-         jp.setCodec( new ObjectMapper() );
+ 
 -    private JsonParser getJsonParserForFile( File collectionFile ) throws Exception {
 -        JsonParser jp = jsonFactory.createJsonParser( collectionFile );
 -        jp.setCodec( new ObjectMapper() );
++    private JsonParser getJsonParserForFile(File collectionFile) throws Exception {
++        JsonParser jp = jsonFactory.createJsonParser(collectionFile);
++        jp.setCodec(new ObjectMapper());
          return jp;
      }
  
@@@ -637,108 -546,229 +622,242 @@@
       *
       * @param jp JsonPrser pointing to the beginning of the object.
       */
-     private String importEntityStuff( JsonParser jp, EntityManager em, EntityManager rootEm, FileImport fileImport,  JobExecution jobExecution) throws Exception {
 -    private void importEntityStuff(final JsonParser jp, final EntityManager em, EntityManager rootEm, Import importUG, int index) throws Exception {
  
-         Entity entity = null;
-         EntityRef ownerEntityRef = null;
-         String entityUuid = "";
-         String entityType = "";
 -        final JsonParserObservable subscribe = new JsonParserObservable(jp,em,rootEm,importUG, index);
++    private void importEntityStuff(final JsonParser jp, final EntityManager em, EntityManager rootEm, final FileImport fileImport, final JobExecution jobExecution) throws Exception {
 +
-         // Go inside the value after getting the owner entity id.
-         while (jp.nextToken() != JsonToken.END_OBJECT) {
++        final JsonParserObservable subscribe = new JsonParserObservable(jp, em, rootEm, fileImport);
  
-             String collectionName = jp.getCurrentName();
+         final Observable<WriteEvent> observable = Observable.create(subscribe);
  
-             try {
-                 // create the connections
-                 if (collectionName.equals("connections")) {
+         /**
+          * This is the action we want to perform for every UUID we receive
+          */
+         final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
+             @Override
+             public void call(WriteEvent writeEvent) {
 -                writeEvent.doWrite(em);
++                writeEvent.doWrite(em, jobExecution, fileImport);
+             }
  
-                     jp.nextToken(); // START_OBJECT
-                     while (jp.nextToken() != JsonToken.END_OBJECT) {
-                         String connectionType = jp.getCurrentName();
+         };
  
-                         jp.nextToken(); // START_ARRAY
-                         while (jp.nextToken() != JsonToken.END_ARRAY) {
-                             String entryId = jp.getText();
  
-                             EntityRef entryRef = em.getRef(UUID.fromString(entryId));
-                             // Store in DB
-                             em.createConnection(ownerEntityRef, connectionType, entryRef);
-                         }
+         /**
+          * This is boilerplate glue code.  We have to follow this for the parallel operation.  In the "call"
+          * method we want to simply return the input observable + the chain of operations we want to invoke
+          */
+         observable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+             @Override
 -            public Observable< WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
++            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+                 return entityWrapperObservable.doOnNext(doWork);
+             }
 -        }, Schedulers.io() ).toBlocking().last();
++        }, Schedulers.io()).toBlocking().last();
+ 
+ 
+     }
+ 
 -    private interface WriteEvent{
 -        public void doWrite(EntityManager em);
++    private interface WriteEvent {
++        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport);
+     }
+ 
 -    private final class EntityEvent implements WriteEvent{
++    private final class EntityEvent implements WriteEvent {
+         UUID entityUuid;
+         String entityType;
+         Map<String, Object> properties;
 -        EntityEvent(UUID entityUuid, String entityType,Map<String, Object> properties){
++
++        EntityEvent(UUID entityUuid, String entityType, Map<String, Object> properties) {
+             this.entityUuid = entityUuid;
+             this.entityType = entityType;
+             this.properties = properties;
+         }
++
+         @Override
 -        public void doWrite(EntityManager em) {
++        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
++            EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
++
+             try {
 -                em.create(entityUuid, entityType, properties);
 -                System.out.println("Emitting UUID " + entityUuid + " on thread " + Thread.currentThread().getName() );
 -                }catch (Exception e) {
 -                    System.out.println("something went wrong while creating this - " + e);
++                Entity entity = em.create(entityUuid, entityType, properties);
++                // update the last updated entity
++                if (entity != null) {
++                    entityCount++;
++                    if ((entityCount % 10) == 0) {
++                        jobExecution.heartbeat();
++                    }
++                    if (entityCount == 2000) {
++                        fileImport.setLastUpdatedUUID(entityUuid.toString());
++                        rootEm.update(fileImport);
++                        entityCount = 0;
 +                    }
                  }
-                 // add dictionaries
-                 else if (collectionName.equals("dictionaries")) {
 +
-                     jp.nextToken(); // START_OBJECT
-                     while (jp.nextToken() != JsonToken.END_OBJECT) {
++            } catch (Exception e) {
++                logger.error("something went wrong while creating this - " + e);
++                fileImport.setErrorMessage(e.getMessage());
++                try {
++                    rootEm.update(fileImport);
++                } catch (Exception ex) { }
++            }
+         }
+     }
 -    private final class ConnectionEvent implements WriteEvent{
 +
-                         String dictionaryName = jp.getCurrentName();
++    private final class ConnectionEvent implements WriteEvent {
+         EntityRef ownerEntityRef;
+         String connectionType;
+         EntityRef entryRef;
  
-                         jp.nextToken();
 -        ConnectionEvent(EntityRef ownerEntityRef, String connectionType, EntityRef entryRef){
++        ConnectionEvent(EntityRef ownerEntityRef, String connectionType, EntityRef entryRef) {
+             this.ownerEntityRef = ownerEntityRef;
+             this.connectionType = connectionType;
+             this.entryRef = entryRef;
  
-                         Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
+         }
+ 
+         @Override
 -        public void doWrite(EntityManager em) {
++        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
++            EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
++
+             try {
+                 em.createConnection(ownerEntityRef, connectionType, entryRef);
 -                System.out.println("creating connection " + connectionType + " on thread " + Thread.currentThread().getName() );
 -            }catch (Exception e) {
 -                System.out.println("something went wrong while creating this - " + e);
++
++            } catch (Exception e) {
++
++                logger.error("something went wrong while creating this - " + e);
++                fileImport.setErrorMessage(e.getMessage());
++                try {
++                    rootEm.update(fileImport);
++                } catch (Exception ex) { }
+             }
+         }
+     }
 -    private final class DictionaryEvent implements WriteEvent{
++
++    private final class DictionaryEvent implements WriteEvent {
+ 
+         EntityRef ownerEntityRef;
+         String dictionaryName;
+         Map<String, Object> dictionary;
+ 
 -        DictionaryEvent(EntityRef ownerEntityRef, String dictionaryName, Map<String, Object> dictionary){
++        DictionaryEvent(EntityRef ownerEntityRef, String dictionaryName, Map<String, Object> dictionary) {
+             this.ownerEntityRef = ownerEntityRef;
+             this.dictionaryName = dictionaryName;
+             this.dictionary = dictionary;
+         }
+ 
+         @Override
 -        public void doWrite(EntityManager em) {
++        public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
++            EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+             try {
++
+                 em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
 -                System.out.println("creating dictionary  " + dictionaryName + " on thread " + Thread.currentThread().getName() );
 -            }catch (Exception e) {
 -                System.out.println("something went wrong while creating this - " + e);
++
++            } catch (Exception e) {
++
++                logger.error("something went wrong while creating this - " + e);
++                fileImport.setErrorMessage(e.getMessage());
++                try {
++                    rootEm.update(fileImport);
++                } catch (Exception ex) { }
+             }
+         }
+     }
  
-                         em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
-                     }
-                 } else {
-                     // Regular collections
-                     jp.nextToken(); // START_OBJECT
  
-                     Map<String, Object> properties = new HashMap<String, Object>();
 -
+     private final class JsonParserObservable implements Observable.OnSubscribe<WriteEvent> {
+         private final JsonParser jp;
+         EntityManager em;
+         EntityManager rootEm;
 -        Import importUG;
 -        int index;
++        FileImport fileImport;
 +
-                     JsonToken token = jp.nextToken();
  
-                     while (token != JsonToken.END_OBJECT) {
-                         if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
-                             String key = jp.getCurrentName();
-                             if (key.equals("uuid")) {
-                                 entityUuid = jp.getText();
 -        private int entityCount = 0;
  
-                             } else if (key.equals("type")) {
-                                 entityType = jp.getText();
-                             } else if (key.length() != 0 && jp.getText().length() != 0) {
-                                 String value = jp.getText();
-                                 properties.put(key, value);
 -        JsonParserObservable(JsonParser parser, EntityManager em, EntityManager rootEm, Import importUG, int index ) {
++        JsonParserObservable(JsonParser parser, EntityManager em, EntityManager rootEm, FileImport fileImport) {
+             this.jp = parser;
+             this.em = em;
+             this.rootEm = rootEm;
 -            this.importUG = importUG;
 -            this.index = index;
++            this.fileImport = fileImport;
+         }
+ 
+         @Override
+         public void call(final Subscriber<? super WriteEvent> subscriber) {
 -            ArrayList fileNames = (ArrayList) importUG.getDynamicProperties().get("files");
+ 
+             WriteEvent entityWrapper = null;
 -            Entity entity = null;
+             EntityRef ownerEntityRef = null;
+             String entityUuid = "";
+             String entityType = "";
+             try {
+                 while (!subscriber.isUnsubscribed() && jp.nextToken() != JsonToken.END_OBJECT) {
+                     String collectionName = jp.getCurrentName();
 -                    try {
 -                        // create the connections
 -                        if (collectionName.equals("connections")) {
+ 
 -                            jp.nextToken(); // START_OBJECT
 -                            while (jp.nextToken() != JsonToken.END_OBJECT) {
 -                                String connectionType = jp.getCurrentName();
++                    // create the connections
++                    if (collectionName.equals("connections")) {
+ 
 -                                jp.nextToken(); // START_ARRAY
 -                                while (jp.nextToken() != JsonToken.END_ARRAY) {
 -                                    String entryId = jp.getText();
++                        jp.nextToken(); // START_OBJECT
++                        while (jp.nextToken() != JsonToken.END_OBJECT) {
++                            String connectionType = jp.getCurrentName();
+ 
 -                                    EntityRef entryRef = new SimpleEntityRef(UUID.fromString(entryId));
 -                                    entityWrapper = new ConnectionEvent(ownerEntityRef, connectionType, entryRef);
 -                                    subscriber.onNext(entityWrapper);
 -                                }
++                            jp.nextToken(); // START_ARRAY
++                            while (jp.nextToken() != JsonToken.END_ARRAY) {
++                                String entryId = jp.getText();
++
++                                EntityRef entryRef = new SimpleEntityRef(UUID.fromString(entryId));
++                                entityWrapper = new ConnectionEvent(ownerEntityRef, connectionType, entryRef);
++                                subscriber.onNext(entityWrapper);
                              }
                          }
-                         token = jp.nextToken();
 -                        // add dictionaries
 -                        else if (collectionName.equals("dictionaries")) {
 +                    }
++                    // add dictionaries
++                    else if (collectionName.equals("dictionaries")) {
+ 
 -                            jp.nextToken(); // START_OBJECT
 -                            while (jp.nextToken() != JsonToken.END_OBJECT) {
++                        jp.nextToken(); // START_OBJECT
++                        while (jp.nextToken() != JsonToken.END_OBJECT) {
+ 
 -                                String dictionaryName = jp.getCurrentName();
++                            String dictionaryName = jp.getCurrentName();
  
-                     entity = em.create(UUID.fromString(entityUuid), entityType, properties);
-                     ownerEntityRef = em.getRef(UUID.fromString(entityUuid));
 -                                jp.nextToken();
++                            jp.nextToken();
+ 
 -                                Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
 -                                entityWrapper = new DictionaryEvent(ownerEntityRef, dictionaryName, dictionary);
 -                                subscriber.onNext(entityWrapper);
 -                            }
 -                            subscriber.onCompleted();
 -                        } else {
 -                            // Regular collections
 -                            jp.nextToken(); // START_OBJECT
 -
 -                            Map<String, Object> properties = new HashMap<String, Object>();
 -
 -                            JsonToken token = jp.nextToken();
 -
 -                            while (token != JsonToken.END_OBJECT) {
 -                                if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
 -                                    String key = jp.getCurrentName();
 -                                    if (key.equals("uuid")) {
 -                                        entityUuid = jp.getText();
 -
 -                                    } else if (key.equals("type")) {
 -                                        entityType = jp.getText();
 -                                    } else if (key.length() != 0 && jp.getText().length() != 0) {
 -                                        String value = jp.getText();
 -                                        properties.put(key, value);
 -                                    }
++                            Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
++                            entityWrapper = new DictionaryEvent(ownerEntityRef, dictionaryName, dictionary);
++                            subscriber.onNext(entityWrapper);
++                        }
++                        subscriber.onCompleted();
++                    } else {
++                        // Regular collections
++                        jp.nextToken(); // START_OBJECT
++
++                        Map<String, Object> properties = new HashMap<String, Object>();
++
++                        JsonToken token = jp.nextToken();
++
++                        while (token != JsonToken.END_OBJECT) {
++                            if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
++                                String key = jp.getCurrentName();
++                                if (key.equals("uuid")) {
++                                    entityUuid = jp.getText();
++
++                                } else if (key.equals("type")) {
++                                    entityType = jp.getText();
++                                } else if (key.length() != 0 && jp.getText().length() != 0) {
++                                    String value = jp.getText();
++                                    properties.put(key, value);
+                                 }
 -                                token = jp.nextToken();
+                             }
 -                            entityWrapper = new EntityEvent(UUID.fromString(entityUuid), entityType, properties);
 -                            subscriber.onNext(entityWrapper);
 -                            ownerEntityRef = new SimpleEntityRef(entityType,UUID.fromString(entityUuid));
++                            token = jp.nextToken();
+                         }
 -                    } catch (IllegalArgumentException e) {
 -                        // skip illegal entity UUID and go to next one
 -                        ((Map<String, Object>) fileNames.get(index)).put("Entity Creation Error", e.getMessage());
 -                        rootEm.update(importUG);
 -                        subscriber.onError( e );
 -                    } catch (Exception e) {
 -                        // skip illegal entity UUID and go to next one
 -                        ((Map<String, Object>) fileNames.get(index)).put("Miscellaneous Error", e.getMessage());
 -                        rootEm.update(importUG);
 -                        subscriber.onError( e );
 -                    }
 -                }
 -
 -                // update the last updated entity
 -                if (entity != null) {
 -                    entityCount++;
 -                    if (entityCount == 2000) {
 -                        ((Map<String, Object>) fileNames.get(index)).put("lastUpdatedUUID", entityUuid);
 -                        rootEm.update(importUG);
 -                        entityCount = 0;
++                        entityWrapper = new EntityEvent(UUID.fromString(entityUuid), entityType, properties);
++                        subscriber.onNext(entityWrapper);
++                        ownerEntityRef = new SimpleEntityRef(entityType, UUID.fromString(entityUuid));
+                     }
                  }
-             } catch (IllegalArgumentException e) {
 -            } catch (Exception e) {
 -                System.out.println("something went wrong in observable json parser - " + e);
++            }  catch (Exception e) {
 +                // skip illegal entity UUID and go to next one
 +                fileImport.setErrorMessage(e.getMessage());
-                 rootEm.update(fileImport);
-             } catch (Exception e) {
-                 // skip illegal entity and go to next one
-                 fileImport.setErrorMessage(e.getMessage());
-                 rootEm.update(fileImport);
-             }
-         }
- 
-         // update the last updated entity
-         if (entity != null) {
-             entityCount++;
-             if( (entityCount%10) == 0) {
-                 jobExecution.heartbeat();
-             }
-             if (entityCount == 1000) {
-                 fileImport.setLastUpdatedUUID(entityUuid);
-                 rootEm.update(fileIsmport);
-                 entityCount = 0;
++                try {
++                    rootEm.update(fileImport);
++                } catch(Exception ex) {}
++                subscriber.onError(e);
              }
-             return entityUuid;
 -
          }
-         return new String(" ");
      }
 -
  }
  
- /**
-  * custom exceptions
-  */
 -
  class OrganizationNotFoundException extends Exception {
      OrganizationNotFoundException(String s) {
          super(s);