You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/08/22 16:16:09 UTC

[73/95] [abbrv] git commit: added code for parallel processing of multiple files

added code for parallel processing of multiple files


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/16a22ff9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/16a22ff9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/16a22ff9

Branch: refs/heads/import-feature
Commit: 16a22ff95e2b037baa780f27211b66d76c63ed84
Parents: e0e7536
Author: Pooja Jain <pj...@apigee.com>
Authored: Mon Aug 4 09:23:23 2014 -0700
Committer: Pooja Jain <pj...@apigee.com>
Committed: Mon Aug 4 09:23:23 2014 -0700

----------------------------------------------------------------------
 .../management/importUG/ImportServiceImpl.java  | 32 ++++++++++++--------
 1 file changed, 20 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16a22ff9/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
index ed7516d..78c47c6 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importUG/ImportServiceImpl.java
@@ -137,7 +137,7 @@ public class ImportServiceImpl implements ImportService {
      * @return it returns the UUID of the scheduled job
      * @throws Exception
      */
-    public UUID scheduleFile(File file, EntityRef importRef) throws Exception {
+    public UUID scheduleFile(String file, EntityRef importRef) throws Exception {
 
         ApplicationInfo defaultImportApp = null;
 
@@ -153,9 +153,9 @@ public class ImportServiceImpl implements ImportService {
 
         FileImport fileImport = new FileImport();
 
-        fileImport.setFileName(file.getName());
+        fileImport.setFileName(file);
         fileImport.setCompleted(false);
-        fileImport.setLastUpdatedUUID("");
+        fileImport.setLastUpdatedUUID(" ");
         fileImport.setErrorMessage("");
         fileImport.setState(FileImport.State.CREATED);
         fileImport = em.create(fileImport);
@@ -176,8 +176,7 @@ public class ImportServiceImpl implements ImportService {
         //set data to be transferred to importInfo
         JobData jobData = new JobData();
         jobData.setProperty( "File", file );
-        jobData.setProperty( "fileImportId", fileImport.getUuid() );
-        jobData.setProperty(FILE_IMPORT_ID,importRef);
+        jobData.setProperty( FILE_IMPORT_ID , fileImport.getUuid() );
 
         long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
 
@@ -369,7 +368,7 @@ public class ImportServiceImpl implements ImportService {
 
                 for(File eachfile: files) {
 
-                    UUID jobID = scheduleFile(eachfile, em.getRef(importId));
+                    UUID jobID = scheduleFile(eachfile.getPath(), em.getRef(importId));
                     Map<String,Object> fileJobID = new HashMap<String,Object>();
                     fileJobID.put("FileName",eachfile.getName());
                     fileJobID.put("JobID", jobID.toString());
@@ -519,8 +518,12 @@ public class ImportServiceImpl implements ImportService {
 
         fileImport.setState(FileImport.State.STARTED);
 
-        File file = (File)jobExecution.getJobData().getProperty("file");
+        File file = new File(jobExecution.getJobData().getProperty("File").toString());
+
+        logger.error(file.getName());
+
         EntityManager rootEm = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+        rootEm.update(fileImport);
 
         boolean completed = fileImport.getCompleted();
 
@@ -531,13 +534,18 @@ public class ImportServiceImpl implements ImportService {
 
                 String applicationName = file.getPath().split("\\.")[0];
 
+                logger.error(applicationName);
+
                 ApplicationInfo application = managementService.getApplicationInfo(applicationName);
 
+
+
                 JsonParser jp = getJsonParserForFile(file);
                 String lastUpdatedUUID = fileImport.getLastUpdatedUUID();
 
+                logger.error(lastUpdatedUUID);
                 // this handles partially completed files by updating entities from the point of failure
-                if (!lastUpdatedUUID.equals("")) {
+                if (!lastUpdatedUUID.equals(" ")) {
                     // go till the last updated entity
                     while (!jp.getText().equals(lastUpdatedUUID)) {
                         jp.nextToken();
@@ -577,6 +585,7 @@ public class ImportServiceImpl implements ImportService {
                     // the case where job will be retried i.e. resumed from the failed point
                     fileImport.setErrorMessage(e.getMessage());
                     em.update(fileImport);
+                    //TODO : check this.
                     throw e;
                 }
 
@@ -587,7 +596,7 @@ public class ImportServiceImpl implements ImportService {
                     rootEm.update(fileImport);
 
                     //check other files status and mark the status of import Job.
-
+                    //TODO: need to fix this
                     Results ImportJobResults = em.getConnectingEntities(fileImport.getUuid(), "includes", null, Results.Level.ALL_PROPERTIES);
                     List<Entity> importEntity = ImportJobResults.getEntities();
                     UUID importId = importEntity.get(0).getUuid();
@@ -612,8 +621,8 @@ public class ImportServiceImpl implements ImportService {
                         }
                     }
                     if(count == entities.size()) {
-                       importUG.setState(Import.State.FINISHED);
-                       rootEm.update(importUG);
+                        importUG.setState(Import.State.FINISHED);
+                        rootEm.update(importUG);
                     }
                 }
             }
@@ -759,4 +768,3 @@ class ApplicationNotFoundException extends Exception {
         super(s);
     }
 }
-