You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/08/22 16:16:24 UTC
[88/95] [abbrv] git commit: merged distrbuted and multithreading code
merged distrbuted and multithreading code
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c4950911
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c4950911
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c4950911
Branch: refs/heads/import-feature
Commit: c49509119492bde4ed8fcb3000d73316f8ef6dc8
Parents: b266015 b0e333f
Author: Pooja Jain <pj...@apigee.com>
Authored: Tue Aug 5 16:20:22 2014 -0700
Committer: Pooja Jain <pj...@apigee.com>
Committed: Tue Aug 5 16:20:22 2014 -0700
----------------------------------------------------------------------
.../cassandra/ManagementServiceImpl.java | 7 +-
.../management/importUG/ImportServiceImpl.java | 571 +++++++++++--------
.../management/cassandra/ImportServiceIT.java | 5 +-
3 files changed, 350 insertions(+), 233 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4950911/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
index fa2221b,100a9a6..69805e9
--- a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
@@@ -16,22 -16,8 +16,21 @@@
*/
package org.apache.usergrid.management.cassandra;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
-
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.text.StrSubstitutor;
import org.apache.shiro.UnavailableSecurityManagerException;
@@@ -62,20 -48,16 +61,19 @@@ import org.apache.usergrid.security.tok
import org.apache.usergrid.security.tokens.TokenInfo;
import org.apache.usergrid.security.tokens.TokenService;
import org.apache.usergrid.security.tokens.exceptions.TokenException;
-import org.apache.usergrid.services.*;
-import org.apache.usergrid.utils.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.Map.Entry;
+import org.apache.usergrid.services.ServiceAction;
+import org.apache.usergrid.services.ServiceManager;
+import org.apache.usergrid.services.ServiceManagerFactory;
+import org.apache.usergrid.services.ServiceRequest;
+import org.apache.usergrid.services.ServiceResults;
+import org.apache.usergrid.utils.ConversionUtils;
+import org.apache.usergrid.utils.JsonUtils;
+import org.apache.usergrid.utils.MailUtils;
+import org.apache.usergrid.utils.StringUtils;
+import org.apache.usergrid.utils.UUIDUtils;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
-
import static java.lang.Boolean.parseBoolean;
import static org.apache.commons.codec.binary.Base64.encodeBase64URLSafeString;
import static org.apache.commons.codec.digest.DigestUtils.sha;
@@@ -1952,7 -1934,9 +1950,8 @@@ public class ManagementServiceImpl impl
return !Boolean.TRUE.equals( em.getProperty( new SimpleEntityRef( User.ENTITY_TYPE, userId ), "disabled" ) );
}
- protected String emailMsg( Map<String, String> values, String propertyName ) {
+
-
+ public String emailMsg( Map<String, String> values, String propertyName ) {
return new StrSubstitutor( values ).replace( properties.getProperty( propertyName ) );
}
@@@ -2460,7 -2444,8 +2459,7 @@@
}
- protected String buildUserAppUrl( UUID applicationId, String url, User user, String token ) throws Exception {
-
+ public String buildUserAppUrl( UUID applicationId, String url, User user, String token ) throws Exception {
ApplicationInfo ai = getApplicationInfo( applicationId );
OrganizationInfo oi = getOrganizationForApplication( applicationId );
return String.format( url, oi.getName(), StringUtils.stringOrSubstringAfterFirst( ai.getName(), '/' ),
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c4950911/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
index 14e5b18,c76420e..eec77a2
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
@@@ -67,11 -68,10 +72,10 @@@ public class ImportServiceImpl implemen
private JsonFactory jsonFactory = new JsonFactory();
-- private int entityCount=0;
++ private int entityCount = 0;
+
/**
-- *
* @param config configuration of the job to be scheduled
* @return it returns the UUID of the scheduled job
* @throws Exception
@@@ -79,21 -79,23 +83,20 @@@
@Override
public UUID schedule(Map<String, Object> config) throws Exception {
- if ( config == null ) {
- logger.error( "import information cannot be null" );
- ApplicationInfo defaultImportApp = null;
-
- if ( config == null ) {
- logger.error( "import information cannot be null" );
++ if (config == null) {
++ logger.error("import information cannot be null");
return null;
}
EntityManager em = null;
try {
-- em = emf.getEntityManager( MANAGEMENT_APPLICATION_ID );
++ em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
Set<String> collections = em.getApplicationCollections();
-- if ( !collections.contains( "imports" ) ) {
-- em.createApplicationCollection( "imports" );
++ if (!collections.contains("imports")) {
++ em.createApplicationCollection("imports");
}
-- }
-- catch ( Exception e ) {
-- logger.error( "application doesn't exist within the current context" );
++ } catch (Exception e) {
++ logger.error("application doesn't exist within the current context");
return null;
}
@@@ -101,90 -103,34 +104,85 @@@
//update state
try {
-- importUG = em.create( importUG );
-- }
-- catch ( Exception e ) {
-- logger.error( "Import entity creation failed" );
++ importUG = em.create(importUG);
++ } catch (Exception e) {
++ logger.error("Import entity creation failed");
return null;
}
-- importUG.setState( Import.State.CREATED );
-- em.update( importUG );
++ importUG.setState(Import.State.CREATED);
++ em.update(importUG);
//set data to be transferred to importInfo
JobData jobData = new JobData();
-- jobData.setProperty( "importInfo", config );
-- jobData.setProperty( IMPORT_ID, importUG.getUuid() );
++ jobData.setProperty("importInfo", config);
++ jobData.setProperty(IMPORT_ID, importUG.getUuid());
long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
//schedule job
-- sch.createJob( IMPORT_JOB_NAME, soonestPossible, jobData );
++ sch.createJob(IMPORT_JOB_NAME, soonestPossible, jobData);
//update state
-- importUG.setState( Import.State.SCHEDULED );
-- em.update( importUG );
++ importUG.setState(Import.State.SCHEDULED);
++ em.update(importUG);
return importUG.getUuid();
}
+
+ /**
- *
- * @param file file to be scheduled
++ * @param file file to be scheduled
+ * @return it returns the UUID of the scheduled job
+ * @throws Exception
+ */
+ public UUID scheduleFile(String file, EntityRef importRef) throws Exception {
+
- ApplicationInfo defaultImportApp = null;
-
+ EntityManager em = null;
+
+ try {
- em = emf.getEntityManager( MANAGEMENT_APPLICATION_ID );
- }
- catch ( Exception e ) {
- logger.error( "application doesn't exist within the current context" );
++ em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
++ } catch (Exception e) {
++ logger.error("application doesn't exist within the current context");
+ return null;
+ }
+
+ FileImport fileImport = new FileImport();
+
+ fileImport.setFileName(file);
+ fileImport.setCompleted(false);
+ fileImport.setLastUpdatedUUID(" ");
+ fileImport.setErrorMessage(" ");
+ fileImport.setState(FileImport.State.CREATED);
+ fileImport = em.create(fileImport);
+
- Import importUG = em.get(importRef,Import.class);
++ Import importUG = em.get(importRef, Import.class);
+
+ try {
- em.createConnection(importUG,"includes",fileImport);
- }
- catch ( Exception e ) {
++ em.createConnection(importUG, "includes", fileImport);
++ } catch (Exception e) {
+ logger.error(e.getMessage());
+ return null;
+ }
- fileImport.setState( FileImport.State.CREATED );
- em.update( fileImport );
++ fileImport.setState(FileImport.State.CREATED);
++ em.update(fileImport);
+
+ //set data to be transferred to importInfo
+ JobData jobData = new JobData();
- jobData.setProperty( "File", file );
- jobData.setProperty( FILE_IMPORT_ID , fileImport.getUuid() );
++ jobData.setProperty("File", file);
++ jobData.setProperty(FILE_IMPORT_ID, fileImport.getUuid());
+
+ long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
+
+ //schedule job
- sch.createJob(FILE_IMPORT_JOB_NAME, soonestPossible, jobData );
++ sch.createJob(FILE_IMPORT_JOB_NAME, soonestPossible, jobData);
+
+ //update state
- fileImport.setState( FileImport.State.SCHEDULED );
- em.update( fileImport );
++ fileImport.setState(FileImport.State.SCHEDULED);
++ em.update(fileImport);
+
+ return fileImport.getUuid();
+ }
/**
* Query Entity Manager for the string state of the Import Entity. This corresponds to the GET /import
@@@ -193,18 -139,18 +191,18 @@@
*/
@Override
public String getState(UUID uuid) throws Exception {
-- if ( uuid == null ) {
-- logger.error( "UUID passed in cannot be null." );
++ if (uuid == null) {
++ logger.error("UUID passed in cannot be null.");
return "UUID passed in cannot be null";
}
-- EntityManager rootEm = emf.getEntityManager( MANAGEMENT_APPLICATION_ID );
++ EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
//retrieve the import entity.
-- Import importUG = rootEm.get( uuid, Import.class );
++ Import importUG = rootEm.get(uuid, Import.class);
-- if ( importUG == null ) {
-- logger.error( "no entity with that uuid was found" );
++ if (importUG == null) {
++ logger.error("no entity with that uuid was found");
return "No Such Element found";
}
return importUG.getState().toString();
@@@ -216,46 -162,39 +214,47 @@@
* @return String
*/
@Override
-- public String getErrorMessage(final UUID uuid ) throws Exception {
++ public String getErrorMessage(final UUID uuid) throws Exception {
//get application entity manager
-- if ( uuid == null ) {
-- logger.error( "UUID passed in cannot be null." );
++ if (uuid == null) {
++ logger.error("UUID passed in cannot be null.");
return "UUID passed in cannot be null";
}
-- EntityManager rootEm = emf.getEntityManager( MANAGEMENT_APPLICATION_ID );
++ EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
//retrieve the import entity.
-- Import importUG = rootEm.get( uuid, Import.class );
++ Import importUG = rootEm.get(uuid, Import.class);
-- if ( importUG == null ) {
-- logger.error( "no entity with that uuid was found" );
++ if (importUG == null) {
++ logger.error("no entity with that uuid was found");
return "No Such Element found";
}
return importUG.getErrorMessage().toString();
}
+ @Override
- public Import getImportEntity( final JobExecution jobExecution ) throws Exception {
++ public Import getImportEntity(final JobExecution jobExecution) throws Exception {
- UUID importId = ( UUID ) jobExecution.getJobData().getProperty( IMPORT_ID );
- //@Override
- public Import getImportEntity( final JobExecution jobExecution ) throws Exception {
-
- UUID importId = ( UUID ) jobExecution.getJobData().getProperty( IMPORT_ID );
++ UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
EntityManager importManager = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-- return importManager.get( importId, Import.class );
++ return importManager.get(importId, Import.class);
}
+
@Override
- public FileImport getFileImportEntity( final JobExecution jobExecution ) throws Exception {
++ public FileImport getFileImportEntity(final JobExecution jobExecution) throws Exception {
+
- UUID fileImportId = ( UUID ) jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
++ UUID fileImportId = (UUID) jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
+ EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
- return em.get( fileImportId, FileImport.class );
++ return em.get(fileImportId, FileImport.class);
+ }
+
+ @Override
public ArrayList<File> getEphemeralFile() {
return files;
}
@@@ -265,7 -204,7 +264,7 @@@
}
-- public void setSch( final SchedulerService sch ) {
++ public void setSch(final SchedulerService sch) {
this.sch = sch;
}
@@@ -275,7 -214,7 +274,7 @@@
}
-- public void setEmf( final EntityManagerFactory emf ) {
++ public void setEmf(final EntityManagerFactory emf) {
this.emf = emf;
}
@@@ -286,12 -225,12 +285,11 @@@
}
-- public void setManagementService( final ManagementService managementService ) {
++ public void setManagementService(final ManagementService managementService) {
this.managementService = managementService;
}
/**
-- *
* @param jobExecution the job created by the scheduler with all the required config data
* @throws Exception
*/
@@@ -332,128 -269,126 +330,136 @@@
return;
}
+ try {
+
- if (config.get("organizationId") == null) {
- logger.error("No organization could be found");
- importUG.setErrorMessage("No organization could be found");
- importUG.setState(Import.State.FAILED);
+ if (config.get("organizationId") == null) {
+ logger.error("No organization could be found");
++ importUG.setErrorMessage("No organization could be found");
+ importUG.setState(Import.State.FAILED);
+ em.update(importUG);
+ return;
+ } else if (config.get("applicationId") == null) {
+ //import All the applications from an organization
+ importApplicationsFromOrg((UUID) config.get("organizationId"), config, jobExecution, s3Import);
+ } else if (config.get("collectionName") == null) {
+ //imports an Application from a single organization
+ importApplicationFromOrg((UUID) config.get("organizationId"), (UUID) config.get("applicationId"), config, jobExecution, s3Import);
+ } else {
+ //imports a single collection from an app org combo
+ importCollectionFromOrgApp((UUID) config.get("applicationId"), config, jobExecution, s3Import);
+ }
+ }
+ catch (OrganizationNotFoundException e) {
+ importUG.setErrorMessage(e.getMessage());
- importUG.setState(Import.State.FAILED);
++ importUG.setState(Import.State.FINISHED);
+ em.update(importUG);
+ return;
- }
- catch (ApplicationNotFoundException e) {
++ } catch (ApplicationNotFoundException e) {
+ importUG.setErrorMessage(e.getMessage());
- importUG.setState(Import.State.FAILED);
++ importUG.setState(Import.State.FINISHED);
em.update(importUG);
return;
- } else if (config.get("applicationId") == null) {
- //import All the applications from an organization
- importApplicationsFromOrg((UUID) config.get("organizationId"), config, jobExecution, s3Import);
- } else if (config.get("collectionName") == null) {
- //imports an Application from a single organization
- importApplicationFromOrg((UUID) config.get("organizationId"), (UUID) config.get("applicationId"), config, jobExecution, s3Import);
- } else {
- //imports a single collection from an app org combo
- importCollectionFromOrgApp((UUID) config.get("applicationId"), config, jobExecution, s3Import);
}
- catch (Exception e) {
- // the case where job will be retried i.e. resumed from the failed point
- importUG.setErrorMessage(e.getMessage());
- importUG.setState(Import.State.FAILED);
+
- if(files.size() == 0)
- {
++ if (files.size() == 0) {
+ importUG.setState(Import.State.FINISHED);
+ importUG.setErrorMessage("no files found in the bucket with the relevant context");
em.update(importUG);
- }
- else
- {
- Map<String,Object> fileMetadata = new HashMap<String, Object>();
- throw e;
- }
++ } else {
++ Map<String, Object> fileMetadata = new HashMap<String, Object>();
+
- ArrayList<Map<String,Object>> value = new ArrayList<Map<String, Object>>();
++ ArrayList<Map<String, Object>> value = new ArrayList<Map<String, Object>>();
+
- for(File eachfile: files) {
++ for (File eachfile : files) {
+
+ UUID jobID = scheduleFile(eachfile.getPath(), em.getRef(importId));
- Map<String,Object> fileJobID = new HashMap<String,Object>();
- fileJobID.put("FileName",eachfile.getName());
++ Map<String, Object> fileJobID = new HashMap<String, Object>();
++ fileJobID.put("FileName", eachfile.getName());
+ fileJobID.put("JobID", jobID.toString());
+ value.add(fileJobID);
+ }
- importUG.setState( Import.State.FINISHED );
- em.update( importUG );
+ fileMetadata.put("files", value);
+ importUG.addProperties(fileMetadata);
+ em.update(importUG);
+ }
+ return;
}
/**
* Imports a specific collection from an org-app combo.
*/
-- private void importCollectionFromOrgApp( UUID applicationUUID, final Map<String, Object> config,
-- final JobExecution jobExecution, S3Import s3Import ) throws Exception {
++ private void importCollectionFromOrgApp(UUID applicationUUID, final Map<String, Object> config,
++ final JobExecution jobExecution, S3Import s3Import) throws Exception {
- //retrieves export entity
+ //retrieves import entity
Import importUG = getImportEntity(jobExecution);
ApplicationInfo application = managementService.getApplicationInfo(applicationUUID);
-- if(application == null) {
++ if (application == null) {
throw new ApplicationNotFoundException("Application Not Found");
}
String collectionName = config.get("collectionName").toString();
-- String appFileName = prepareInputFileName("application", application.getName(),collectionName);
++ String appFileName = prepareInputFileName("application", application.getName(), collectionName);
-- files = fileTransfer( importUG, appFileName, config, s3Import, 0 );
++ files = fileTransfer(importUG, appFileName, config, s3Import, 0);
- FileParser(jobExecution);
}
/**
* Imports a specific applications from an organization
*/
-- private void importApplicationFromOrg( UUID organizationUUID, UUID applicationId, final Map<String, Object> config,
-- final JobExecution jobExecution, S3Import s3Import ) throws Exception {
++ private void importApplicationFromOrg(UUID organizationUUID, UUID applicationId, final Map<String, Object> config,
++ final JobExecution jobExecution, S3Import s3Import) throws Exception {
//retrieves import entity
Import importUG = getImportEntity(jobExecution);
-- ApplicationInfo application = managementService.getApplicationInfo( applicationId );
++ ApplicationInfo application = managementService.getApplicationInfo(applicationId);
-- if(application == null) {
++ if (application == null) {
throw new ApplicationNotFoundException("Application Not Found");
}
String appFileName = prepareInputFileName("application", application.getName(), null);
-- files = fileTransfer( importUG, appFileName, config, s3Import, 1 );
++ files = fileTransfer(importUG, appFileName, config, s3Import, 1);
- FileParser(jobExecution);
}
/**
* Imports All Applications from an Organization
*/
-- private void importApplicationsFromOrg( UUID organizationUUID, final Map<String, Object> config,
-- final JobExecution jobExecution, S3Import s3Import ) throws Exception {
++ private void importApplicationsFromOrg(UUID organizationUUID, final Map<String, Object> config,
++ final JobExecution jobExecution, S3Import s3Import) throws Exception {
// retrieves import entity
Import importUG = getImportEntity(jobExecution);
String appFileName = null;
OrganizationInfo organizationInfo = managementService.getOrganizationByUuid(organizationUUID);
-- if(organizationInfo == null) {
++ if (organizationInfo == null) {
throw new OrganizationNotFoundException("Organization Not Found");
}
-- appFileName = prepareInputFileName( "organization", organizationInfo.getName() , null );
-- files = fileTransfer( importUG, appFileName, config, s3Import, 2 );
++ appFileName = prepareInputFileName("organization", organizationInfo.getName(), null);
++ files = fileTransfer(importUG, appFileName, config, s3Import, 2);
- FileParser(jobExecution);
}
/**
* @param type just a label such us: organization, application.
-- *
* @return the file name concatenated with the type and the name of the collection
*/
-- protected String prepareInputFileName( String type, String name, String CollectionName ) {
++ protected String prepareInputFileName(String type, String name, String CollectionName) {
StringBuilder str = new StringBuilder();
// in case of type organization --> the file name will be "<org_name>/"
-- if(type.equals("organization")) {
++ if (type.equals("organization")) {
str.append(name);
str.append("/");
-- }
-- else if(type.equals("application")) {
++ } else if (type.equals("application")) {
// in case of type application --> the file name will be "<org_name>/<app_name>."
str.append(name);
str.append(".");
@@@ -470,21 -405,21 +476,19 @@@
}
/**
-- *
-- * @param importUG Import instance
-- * @param appFileName the base file name for the files to be downloaded
-- * @param config the config information for the import job
-- * @param s3Import s3import instance
-- * @param type it indicates the type of import. 0 - Collection , 1 - Application and 2 - Organization
++ * @param importUG Import instance
++ * @param appFileName the base file name for the files to be downloaded
++ * @param config the config information for the import job
++ * @param s3Import s3import instance
++ * @param type it indicates the type of import. 0 - Collection , 1 - Application and 2 - Organization
* @return
*/
-- public ArrayList<File> fileTransfer( Import importUG, String appFileName, Map<String, Object> config,
-- S3Import s3Import , int type) {
++ public ArrayList<File> fileTransfer(Import importUG, String appFileName, Map<String, Object> config,
++ S3Import s3Import, int type) {
ArrayList<File> files = new ArrayList<File>();
try {
-- files = s3Import.copyFromS3(config, appFileName , type);
-- }
-- catch ( Exception e ) {
++ files = s3Import.copyFromS3(config, appFileName, type);
++ } catch (Exception e) {
importUG.setErrorMessage(e.getMessage());
importUG.setState(Import.State.FAILED);
}
@@@ -493,32 -428,50 +497,32 @@@
/**
* The loops through each temp file and parses it to store the entities from the json back into usergrid
++ *
* @throws Exception
*/
- private void FileParser(JobExecution jobExecution) throws Exception {
-
- // add properties to the import entity
- Import importUG = getImportEntity(jobExecution);
- EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-
+ @Override
+ public void FileParser(JobExecution jobExecution) throws Exception {
-
- Map<String,Object> fileMetadata = new HashMap<String, Object>();
- ArrayList<Map<String,Object>> value = new ArrayList<Map<String, Object>>();
+ // add properties to the import entity
+ FileImport fileImport = getFileImportEntity(jobExecution);
- if (!((Map<String,Object>)importUG.getDynamicProperties()).containsKey("files")) {
- // create the structure for file metadata and initialize it
- for (File collectionFile : files) {
- Map<String, Object> singleFile = new HashMap<String, Object>();
- singleFile.put("name", collectionFile.getName());
- singleFile.put("completed", new Boolean(false));
- singleFile.put("lastUpdatedUUID", new String(""));
- value.add(singleFile);
- }
+ File file = new File(jobExecution.getJobData().getProperty("File").toString());
- fileMetadata.put("files", value);
- importUG.addProperties(fileMetadata);
- rootEm.update(importUG);
- }
+ EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+ rootEm.update(fileImport);
- ArrayList fileNames = (ArrayList)importUG.getDynamicProperties().get("files");
- int i=0;
+ boolean completed = fileImport.getCompleted();
- for(File collectionFile : files) {
+ // on resume, completed files will not be traversed again
- if(!completed) {
++ if (!completed) {
- Map<String,Object> fileInfo = (Map<String,Object>)fileNames.get(i);
- boolean completed = ((Boolean)fileInfo.get("completed")).booleanValue();
- // on resume, completed files will not be traversed again
- if(!completed) {
+ if (isValidJSON(file, rootEm, fileImport)) {
- if(!isValidJSON(collectionFile,rootEm,importUG,i)){
- i++;
- continue;
- }
+ fileImport.setState(FileImport.State.STARTED);
+ rootEm.update(fileImport);
- String applicationName = collectionFile.getPath().split("\\.")[0];
+ String applicationName = file.getPath().split("\\.")[0];
ApplicationInfo application = managementService.getApplicationInfo(applicationName);
@@@ -544,71 -498,23 +548,50 @@@
}
EntityManager em = emf.getEntityManager(application.getId());
- try {
- String uuid=" ";
- while (jp.nextToken() != JsonToken.END_ARRAY) {
- uuid = importEntityStuff(jp, em, rootEm, fileImport, jobExecution);
- }
- if(!uuid.equals(" "))
- {
- fileImport.setLastUpdatedUUID(uuid);
- rootEm.update(fileImport);
- }
- jp.close();
- }
- catch (OrganizationNotFoundException e) {
- fileImport.setErrorMessage(e.getMessage());
- fileImport.setState(FileImport.State.FINISHED);
- rootEm.update(fileImport);
- return;
- }
- catch (ApplicationNotFoundException e) {
- fileImport.setErrorMessage(e.getMessage());
- fileImport.setState(FileImport.State.FINISHED);
- rootEm.update(fileImport);
- return;
- //TODO: remove roles and take care of it later when importing applications
+ while (jp.nextToken() != JsonToken.END_ARRAY) {
- importEntityStuff(jp, em, rootEm, importUG, i);
++ importEntityStuff(jp, em, rootEm, fileImport, jobExecution);
}
+ jp.close();
+
- if(!fileImport.getState().equals("FAILED")) {
++ if (!fileImport.getState().equals("FAILED")) {
+
+ // mark file as completed
+ fileImport.setCompleted(true);
+ fileImport.setState(FileImport.State.FINISHED);
+ rootEm.update(fileImport);
+
+ //check other files status and mark the status of import Job.
+ Results ImportJobResults = rootEm.getConnectingEntities(fileImport.getUuid(), "includes", null, Results.Level.ALL_PROPERTIES);
+ List<Entity> importEntity = ImportJobResults.getEntities();
+ UUID importId = importEntity.get(0).getUuid();
+ Import importUG = rootEm.get(importId, Import.class);
+
- Results entities = rootEm.getConnectedEntities(importId,"includes",null,Results.Level.ALL_PROPERTIES);
++ Results entities = rootEm.getConnectedEntities(importId, "includes", null, Results.Level.ALL_PROPERTIES);
+ List<Entity> importFile = entities.getEntities();
+
+ int count = 0;
- for(Entity eachEntity: importFile) {
- FileImport fi = rootEm.get(eachEntity.getUuid(),FileImport.class);
- if(fi.getState().toString().equals("FINISHED")) {
++ for (Entity eachEntity : importFile) {
++ FileImport fi = rootEm.get(eachEntity.getUuid(), FileImport.class);
++ if (fi.getState().toString().equals("FINISHED")) {
+ count++;
- }
- else if(fi.getState().toString().equals("FAILED")) {
++ } else if (fi.getState().toString().equals("FAILED")) {
+ importUG.setState(Import.State.FAILED);
+ rootEm.update(importUG);
+ break;
+ }
+ }
- if(count == importFile.size()) {
++ if (count == importFile.size()) {
+ importUG.setState(Import.State.FINISHED);
+ rootEm.update(importUG);
+ }
+ }
}
}
}
- private boolean isValidJSON( File collectionFile, EntityManager rootEm, FileImport fileImport) throws Exception {
- private boolean isValidJSON( File collectionFile, EntityManager rootEm, Import importUG, int index) throws Exception {
++ private boolean isValidJSON(File collectionFile, EntityManager rootEm, FileImport fileImport) throws Exception {
- ArrayList fileNames = (ArrayList) importUG.getDynamicProperties().get("files");
boolean valid = false;
try {
final JsonParser jp = jsonFactory.createJsonParser(collectionFile);
@@@ -626,9 -533,10 +609,10 @@@
return valid;
}
- private JsonParser getJsonParserForFile( File collectionFile ) throws Exception {
- JsonParser jp = jsonFactory.createJsonParser( collectionFile );
- jp.setCodec( new ObjectMapper() );
+
- private JsonParser getJsonParserForFile( File collectionFile ) throws Exception {
- JsonParser jp = jsonFactory.createJsonParser( collectionFile );
- jp.setCodec( new ObjectMapper() );
++ private JsonParser getJsonParserForFile(File collectionFile) throws Exception {
++ JsonParser jp = jsonFactory.createJsonParser(collectionFile);
++ jp.setCodec(new ObjectMapper());
return jp;
}
@@@ -637,108 -546,229 +622,242 @@@
*
* @param jp JsonPrser pointing to the beginning of the object.
*/
- private String importEntityStuff( JsonParser jp, EntityManager em, EntityManager rootEm, FileImport fileImport, JobExecution jobExecution) throws Exception {
- private void importEntityStuff(final JsonParser jp, final EntityManager em, EntityManager rootEm, Import importUG, int index) throws Exception {
- Entity entity = null;
- EntityRef ownerEntityRef = null;
- String entityUuid = "";
- String entityType = "";
- final JsonParserObservable subscribe = new JsonParserObservable(jp,em,rootEm,importUG, index);
++ private void importEntityStuff(final JsonParser jp, final EntityManager em, EntityManager rootEm, final FileImport fileImport, final JobExecution jobExecution) throws Exception {
+
- // Go inside the value after getting the owner entity id.
- while (jp.nextToken() != JsonToken.END_OBJECT) {
++ final JsonParserObservable subscribe = new JsonParserObservable(jp, em, rootEm, fileImport);
- String collectionName = jp.getCurrentName();
+ final Observable<WriteEvent> observable = Observable.create(subscribe);
- try {
- // create the connections
- if (collectionName.equals("connections")) {
+ /**
+ * This is the action we want to perform for every UUID we receive
+ */
+ final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
+ @Override
+ public void call(WriteEvent writeEvent) {
- writeEvent.doWrite(em);
++ writeEvent.doWrite(em, jobExecution, fileImport);
+ }
- jp.nextToken(); // START_OBJECT
- while (jp.nextToken() != JsonToken.END_OBJECT) {
- String connectionType = jp.getCurrentName();
+ };
- jp.nextToken(); // START_ARRAY
- while (jp.nextToken() != JsonToken.END_ARRAY) {
- String entryId = jp.getText();
- EntityRef entryRef = em.getRef(UUID.fromString(entryId));
- // Store in DB
- em.createConnection(ownerEntityRef, connectionType, entryRef);
- }
+ /**
+ * This is boilerplate glue code. We have to follow this for the parallel operation. In the "call"
+ * method we want to simply return the input observable + the chain of operations we want to invoke
+ */
+ observable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+ @Override
- public Observable< WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
++ public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+ return entityWrapperObservable.doOnNext(doWork);
+ }
- }, Schedulers.io() ).toBlocking().last();
++ }, Schedulers.io()).toBlocking().last();
+
+
+ }
+
- private interface WriteEvent{
- public void doWrite(EntityManager em);
++ private interface WriteEvent {
++ public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport);
+ }
+
- private final class EntityEvent implements WriteEvent{
++ private final class EntityEvent implements WriteEvent {
+ UUID entityUuid;
+ String entityType;
+ Map<String, Object> properties;
- EntityEvent(UUID entityUuid, String entityType,Map<String, Object> properties){
++
++ EntityEvent(UUID entityUuid, String entityType, Map<String, Object> properties) {
+ this.entityUuid = entityUuid;
+ this.entityType = entityType;
+ this.properties = properties;
+ }
++
+ @Override
- public void doWrite(EntityManager em) {
++ public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
++ EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
++
+ try {
- em.create(entityUuid, entityType, properties);
- System.out.println("Emitting UUID " + entityUuid + " on thread " + Thread.currentThread().getName() );
- }catch (Exception e) {
- System.out.println("something went wrong while creating this - " + e);
++ Entity entity = em.create(entityUuid, entityType, properties);
++ // update the last updated entity
++ if (entity != null) {
++ entityCount++;
++ if ((entityCount % 10) == 0) {
++ jobExecution.heartbeat();
++ }
++ if (entityCount == 2000) {
++ fileImport.setLastUpdatedUUID(entityUuid.toString());
++ rootEm.update(fileImport);
++ entityCount = 0;
+ }
}
- // add dictionaries
- else if (collectionName.equals("dictionaries")) {
+
- jp.nextToken(); // START_OBJECT
- while (jp.nextToken() != JsonToken.END_OBJECT) {
++ } catch (Exception e) {
++ logger.error("something went wrong while creating this - " + e);
++ fileImport.setErrorMessage(e.getMessage());
++ try {
++ rootEm.update(fileImport);
++ } catch (Exception ex) { }
++ }
+ }
+ }
- private final class ConnectionEvent implements WriteEvent{
+
- String dictionaryName = jp.getCurrentName();
++ private final class ConnectionEvent implements WriteEvent {
+ EntityRef ownerEntityRef;
+ String connectionType;
+ EntityRef entryRef;
- jp.nextToken();
- ConnectionEvent(EntityRef ownerEntityRef, String connectionType, EntityRef entryRef){
++ ConnectionEvent(EntityRef ownerEntityRef, String connectionType, EntityRef entryRef) {
+ this.ownerEntityRef = ownerEntityRef;
+ this.connectionType = connectionType;
+ this.entryRef = entryRef;
- Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
+ }
+
+ @Override
- public void doWrite(EntityManager em) {
++ public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
++ EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
++
+ try {
+ em.createConnection(ownerEntityRef, connectionType, entryRef);
- System.out.println("creating connection " + connectionType + " on thread " + Thread.currentThread().getName() );
- }catch (Exception e) {
- System.out.println("something went wrong while creating this - " + e);
++
++ } catch (Exception e) {
++
++ logger.error("something went wrong while creating this - " + e);
++ fileImport.setErrorMessage(e.getMessage());
++ try {
++ rootEm.update(fileImport);
++ } catch (Exception ex) { }
+ }
+ }
+ }
- private final class DictionaryEvent implements WriteEvent{
++
++ private final class DictionaryEvent implements WriteEvent {
+
+ EntityRef ownerEntityRef;
+ String dictionaryName;
+ Map<String, Object> dictionary;
+
- DictionaryEvent(EntityRef ownerEntityRef, String dictionaryName, Map<String, Object> dictionary){
++ DictionaryEvent(EntityRef ownerEntityRef, String dictionaryName, Map<String, Object> dictionary) {
+ this.ownerEntityRef = ownerEntityRef;
+ this.dictionaryName = dictionaryName;
+ this.dictionary = dictionary;
+ }
+
+ @Override
- public void doWrite(EntityManager em) {
++ public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
++ EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+ try {
++
+ em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
- System.out.println("creating dictionary " + dictionaryName + " on thread " + Thread.currentThread().getName() );
- }catch (Exception e) {
- System.out.println("something went wrong while creating this - " + e);
++
++ } catch (Exception e) {
++
++ logger.error("something went wrong while creating this - " + e);
++ fileImport.setErrorMessage(e.getMessage());
++ try {
++ rootEm.update(fileImport);
++ } catch (Exception ex) { }
+ }
+ }
+ }
- em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
- }
- } else {
- // Regular collections
- jp.nextToken(); // START_OBJECT
- Map<String, Object> properties = new HashMap<String, Object>();
-
+ private final class JsonParserObservable implements Observable.OnSubscribe<WriteEvent> {
+ private final JsonParser jp;
+ EntityManager em;
+ EntityManager rootEm;
- Import importUG;
- int index;
++ FileImport fileImport;
+
- JsonToken token = jp.nextToken();
- while (token != JsonToken.END_OBJECT) {
- if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
- String key = jp.getCurrentName();
- if (key.equals("uuid")) {
- entityUuid = jp.getText();
- private int entityCount = 0;
- } else if (key.equals("type")) {
- entityType = jp.getText();
- } else if (key.length() != 0 && jp.getText().length() != 0) {
- String value = jp.getText();
- properties.put(key, value);
- JsonParserObservable(JsonParser parser, EntityManager em, EntityManager rootEm, Import importUG, int index ) {
++ JsonParserObservable(JsonParser parser, EntityManager em, EntityManager rootEm, FileImport fileImport) {
+ this.jp = parser;
+ this.em = em;
+ this.rootEm = rootEm;
- this.importUG = importUG;
- this.index = index;
++ this.fileImport = fileImport;
+ }
+
+ @Override
+ public void call(final Subscriber<? super WriteEvent> subscriber) {
- ArrayList fileNames = (ArrayList) importUG.getDynamicProperties().get("files");
+
+ WriteEvent entityWrapper = null;
- Entity entity = null;
+ EntityRef ownerEntityRef = null;
+ String entityUuid = "";
+ String entityType = "";
+ try {
+ while (!subscriber.isUnsubscribed() && jp.nextToken() != JsonToken.END_OBJECT) {
+ String collectionName = jp.getCurrentName();
- try {
- // create the connections
- if (collectionName.equals("connections")) {
+
- jp.nextToken(); // START_OBJECT
- while (jp.nextToken() != JsonToken.END_OBJECT) {
- String connectionType = jp.getCurrentName();
++ // create the connections
++ if (collectionName.equals("connections")) {
+
- jp.nextToken(); // START_ARRAY
- while (jp.nextToken() != JsonToken.END_ARRAY) {
- String entryId = jp.getText();
++ jp.nextToken(); // START_OBJECT
++ while (jp.nextToken() != JsonToken.END_OBJECT) {
++ String connectionType = jp.getCurrentName();
+
- EntityRef entryRef = new SimpleEntityRef(UUID.fromString(entryId));
- entityWrapper = new ConnectionEvent(ownerEntityRef, connectionType, entryRef);
- subscriber.onNext(entityWrapper);
- }
++ jp.nextToken(); // START_ARRAY
++ while (jp.nextToken() != JsonToken.END_ARRAY) {
++ String entryId = jp.getText();
++
++ EntityRef entryRef = new SimpleEntityRef(UUID.fromString(entryId));
++ entityWrapper = new ConnectionEvent(ownerEntityRef, connectionType, entryRef);
++ subscriber.onNext(entityWrapper);
}
}
- token = jp.nextToken();
- // add dictionaries
- else if (collectionName.equals("dictionaries")) {
+ }
++ // add dictionaries
++ else if (collectionName.equals("dictionaries")) {
+
- jp.nextToken(); // START_OBJECT
- while (jp.nextToken() != JsonToken.END_OBJECT) {
++ jp.nextToken(); // START_OBJECT
++ while (jp.nextToken() != JsonToken.END_OBJECT) {
+
- String dictionaryName = jp.getCurrentName();
++ String dictionaryName = jp.getCurrentName();
- entity = em.create(UUID.fromString(entityUuid), entityType, properties);
- ownerEntityRef = em.getRef(UUID.fromString(entityUuid));
- jp.nextToken();
++ jp.nextToken();
+
- Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
- entityWrapper = new DictionaryEvent(ownerEntityRef, dictionaryName, dictionary);
- subscriber.onNext(entityWrapper);
- }
- subscriber.onCompleted();
- } else {
- // Regular collections
- jp.nextToken(); // START_OBJECT
-
- Map<String, Object> properties = new HashMap<String, Object>();
-
- JsonToken token = jp.nextToken();
-
- while (token != JsonToken.END_OBJECT) {
- if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
- String key = jp.getCurrentName();
- if (key.equals("uuid")) {
- entityUuid = jp.getText();
-
- } else if (key.equals("type")) {
- entityType = jp.getText();
- } else if (key.length() != 0 && jp.getText().length() != 0) {
- String value = jp.getText();
- properties.put(key, value);
- }
++ Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
++ entityWrapper = new DictionaryEvent(ownerEntityRef, dictionaryName, dictionary);
++ subscriber.onNext(entityWrapper);
++ }
++ subscriber.onCompleted();
++ } else {
++ // Regular collections
++ jp.nextToken(); // START_OBJECT
++
++ Map<String, Object> properties = new HashMap<String, Object>();
++
++ JsonToken token = jp.nextToken();
++
++ while (token != JsonToken.END_OBJECT) {
++ if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
++ String key = jp.getCurrentName();
++ if (key.equals("uuid")) {
++ entityUuid = jp.getText();
++
++ } else if (key.equals("type")) {
++ entityType = jp.getText();
++ } else if (key.length() != 0 && jp.getText().length() != 0) {
++ String value = jp.getText();
++ properties.put(key, value);
+ }
- token = jp.nextToken();
+ }
- entityWrapper = new EntityEvent(UUID.fromString(entityUuid), entityType, properties);
- subscriber.onNext(entityWrapper);
- ownerEntityRef = new SimpleEntityRef(entityType,UUID.fromString(entityUuid));
++ token = jp.nextToken();
+ }
- } catch (IllegalArgumentException e) {
- // skip illegal entity UUID and go to next one
- ((Map<String, Object>) fileNames.get(index)).put("Entity Creation Error", e.getMessage());
- rootEm.update(importUG);
- subscriber.onError( e );
- } catch (Exception e) {
- // skip illegal entity UUID and go to next one
- ((Map<String, Object>) fileNames.get(index)).put("Miscellaneous Error", e.getMessage());
- rootEm.update(importUG);
- subscriber.onError( e );
- }
- }
-
- // update the last updated entity
- if (entity != null) {
- entityCount++;
- if (entityCount == 2000) {
- ((Map<String, Object>) fileNames.get(index)).put("lastUpdatedUUID", entityUuid);
- rootEm.update(importUG);
- entityCount = 0;
++ entityWrapper = new EntityEvent(UUID.fromString(entityUuid), entityType, properties);
++ subscriber.onNext(entityWrapper);
++ ownerEntityRef = new SimpleEntityRef(entityType, UUID.fromString(entityUuid));
+ }
}
- } catch (IllegalArgumentException e) {
- } catch (Exception e) {
- System.out.println("something went wrong in observable json parser - " + e);
++ } catch (Exception e) {
+ // skip illegal entity UUID and go to next one
+ fileImport.setErrorMessage(e.getMessage());
- rootEm.update(fileImport);
- } catch (Exception e) {
- // skip illegal entity and go to next one
- fileImport.setErrorMessage(e.getMessage());
- rootEm.update(fileImport);
- }
- }
-
- // update the last updated entity
- if (entity != null) {
- entityCount++;
- if( (entityCount%10) == 0) {
- jobExecution.heartbeat();
- }
- if (entityCount == 1000) {
- fileImport.setLastUpdatedUUID(entityUuid);
- rootEm.update(fileIsmport);
- entityCount = 0;
++ try {
++ rootEm.update(fileImport);
++ } catch(Exception ex) {}
++ subscriber.onError(e);
}
- return entityUuid;
-
}
- return new String(" ");
}
-
}
- /**
- * custom exceptions
- */
-
class OrganizationNotFoundException extends Exception {
OrganizationNotFoundException(String s) {
super(s);