You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/08/22 16:16:09 UTC
[73/95] [abbrv] git commit: added code for parallel processing of
multiple files
added code for parallel processing of multiple files
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/16a22ff9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/16a22ff9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/16a22ff9
Branch: refs/heads/import-feature
Commit: 16a22ff95e2b037baa780f27211b66d76c63ed84
Parents: e0e7536
Author: Pooja Jain <pj...@apigee.com>
Authored: Mon Aug 4 09:23:23 2014 -0700
Committer: Pooja Jain <pj...@apigee.com>
Committed: Mon Aug 4 09:23:23 2014 -0700
----------------------------------------------------------------------
.../management/importUG/ImportServiceImpl.java | 32 ++++++++++++--------
1 file changed, 20 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16a22ff9/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
index ed7516d..78c47c6 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
@@ -137,7 +137,7 @@ public class ImportServiceImpl implements ImportService {
* @return it returns the UUID of the scheduled job
* @throws Exception
*/
- public UUID scheduleFile(File file, EntityRef importRef) throws Exception {
+ public UUID scheduleFile(String file, EntityRef importRef) throws Exception {
ApplicationInfo defaultImportApp = null;
@@ -153,9 +153,9 @@ public class ImportServiceImpl implements ImportService {
FileImport fileImport = new FileImport();
- fileImport.setFileName(file.getName());
+ fileImport.setFileName(file);
fileImport.setCompleted(false);
- fileImport.setLastUpdatedUUID("");
+ fileImport.setLastUpdatedUUID(" ");
fileImport.setErrorMessage("");
fileImport.setState(FileImport.State.CREATED);
fileImport = em.create(fileImport);
@@ -176,8 +176,7 @@ public class ImportServiceImpl implements ImportService {
//set data to be transferred to importInfo
JobData jobData = new JobData();
jobData.setProperty( "File", file );
- jobData.setProperty( "fileImportId", fileImport.getUuid() );
- jobData.setProperty(FILE_IMPORT_ID,importRef);
+ jobData.setProperty( FILE_IMPORT_ID , fileImport.getUuid() );
long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
@@ -369,7 +368,7 @@ public class ImportServiceImpl implements ImportService {
for(File eachfile: files) {
- UUID jobID = scheduleFile(eachfile, em.getRef(importId));
+ UUID jobID = scheduleFile(eachfile.getPath(), em.getRef(importId));
Map<String,Object> fileJobID = new HashMap<String,Object>();
fileJobID.put("FileName",eachfile.getName());
fileJobID.put("JobID", jobID.toString());
@@ -519,8 +518,12 @@ public class ImportServiceImpl implements ImportService {
fileImport.setState(FileImport.State.STARTED);
- File file = (File)jobExecution.getJobData().getProperty("file");
+ File file = new File(jobExecution.getJobData().getProperty("File").toString());
+
+ logger.error(file.getName());
+
EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+ rootEm.update(fileImport);
boolean completed = fileImport.getCompleted();
@@ -531,13 +534,18 @@ public class ImportServiceImpl implements ImportService {
String applicationName = file.getPath().split("\\.")[0];
+ logger.error(applicationName);
+
ApplicationInfo application = managementService.getApplicationInfo(applicationName);
+
+
JsonParser jp = getJsonParserForFile(file);
String lastUpdatedUUID = fileImport.getLastUpdatedUUID();
+ logger.error(lastUpdatedUUID);
// this handles partially completed files by updating entities from the point of failure
- if (!lastUpdatedUUID.equals("")) {
+ if (!lastUpdatedUUID.equals(" ")) {
// go till the last updated entity
while (!jp.getText().equals(lastUpdatedUUID)) {
jp.nextToken();
@@ -577,6 +585,7 @@ public class ImportServiceImpl implements ImportService {
// the case where job will be retried i.e. resumed from the failed point
fileImport.setErrorMessage(e.getMessage());
em.update(fileImport);
+ //TODO : check this.
throw e;
}
@@ -587,7 +596,7 @@ public class ImportServiceImpl implements ImportService {
rootEm.update(fileImport);
//check other files status and mark the status of import Job.
-
+ //TODO: need to fix this
Results ImportJobResults = em.getConnectingEntities(fileImport.getUuid(), "includes", null, Results.Level.ALL_PROPERTIES);
List<Entity> importEntity = ImportJobResults.getEntities();
UUID importId = importEntity.get(0).getUuid();
@@ -612,8 +621,8 @@ public class ImportServiceImpl implements ImportService {
}
}
if(count == entities.size()) {
- importUG.setState(Import.State.FINISHED);
- rootEm.update(importUG);
+ importUG.setState(Import.State.FINISHED);
+ rootEm.update(importUG);
}
}
}
@@ -759,4 +768,3 @@ class ApplicationNotFoundException extends Exception {
super(s);
}
}
-