You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/02/10 14:44:26 UTC

[3/4] incubator-usergrid git commit: After the Import->FileImport connections are made, wait for them to take effect before scheduling jobs.

After the Import->FileImport connections are made, wait for them to take effect before scheduling jobs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2428b862
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2428b862
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2428b862

Branch: refs/heads/two-dot-o-import
Commit: 2428b862a4b6993e89376e580dcd3dd2955182aa
Parents: 9e76c56
Author: Dave Johnson <dm...@apigee.com>
Authored: Mon Feb 9 19:57:33 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Mon Feb 9 19:57:33 2015 -0500

----------------------------------------------------------------------
 .../management/importer/ImportServiceImpl.java  | 122 ++++++++++++-------
 .../management/importer/ImportCollectionIT.java |   9 +-
 2 files changed, 78 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2428b862/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index a613fdb..2d611c5 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -183,6 +183,13 @@ public class ImportServiceImpl implements ImportService {
         try {
             // create a connection between the main import job and the sub FileImport Job
             emManagementApp.createConnection(importEntity, "includes", fileImport);
+
+            logger.debug("Created connection from {}:{} to {}:{}",
+                new Object[] {
+                    importEntity.getType(), importEntity.getUuid(),
+                    fileImport.getType(), fileImport.getUuid()
+                });
+
         } catch (Exception e) {
             logger.error(e.getMessage());
             return null;
@@ -202,8 +209,6 @@ public class ImportServiceImpl implements ImportService {
         fileImport.setState(FileImport.State.SCHEDULED);
         emManagementApp.update(fileImport);
 
-        emf.refreshIndex();
-
         return jobData;
     }
 
@@ -372,12 +377,12 @@ public class ImportServiceImpl implements ImportService {
 
         EntityManager emManagementApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
         UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
-        Import rootImportTask = emManagementApp.get(importId, Import.class);
+        Import importEntity = emManagementApp.get(importId, Import.class);
 
-        rootImportTask.setState(Import.State.STARTED);
-        rootImportTask.setStarted(System.currentTimeMillis());
-        rootImportTask.setErrorMessage( " " );
-        emManagementApp.update(rootImportTask);
+        importEntity.setState(Import.State.STARTED);
+        importEntity.setStarted(System.currentTimeMillis());
+        importEntity.setErrorMessage(" ");
+        emManagementApp.update(importEntity);
         logger.debug("doImport(): updated state");
 
         // if no S3 importer was passed in then create one
@@ -392,9 +397,9 @@ public class ImportServiceImpl implements ImportService {
             }
         } catch (Exception e) {
             logger.error("doImport(): Error creating S3Import", e);
-            rootImportTask.setErrorMessage(e.getMessage());
-            rootImportTask.setState( Import.State.FAILED );
-            emManagementApp.update(rootImportTask);
+            importEntity.setErrorMessage(e.getMessage());
+            importEntity.setState(Import.State.FAILED);
+            emManagementApp.update(importEntity);
             return;
         }
 
@@ -405,9 +410,9 @@ public class ImportServiceImpl implements ImportService {
 
             if (config.get("organizationId") == null) {
                 logger.error("doImport(): No organization could be found");
-                rootImportTask.setErrorMessage( "No organization could be found" );
-                rootImportTask.setState( Import.State.FAILED );
-                emManagementApp.update(rootImportTask);
+                importEntity.setErrorMessage("No organization could be found");
+                importEntity.setState(Import.State.FAILED);
+                emManagementApp.update(importEntity);
                 return;
 
             } else {
@@ -422,9 +427,9 @@ public class ImportServiceImpl implements ImportService {
             }
 
         } catch (OrganizationNotFoundException | ApplicationNotFoundException e) {
-            rootImportTask.setErrorMessage( e.getMessage() );
-            rootImportTask.setState( Import.State.FAILED );
-            emManagementApp.update(rootImportTask);
+            importEntity.setErrorMessage(e.getMessage());
+            importEntity.setState(Import.State.FAILED);
+            emManagementApp.update(importEntity);
             return;
         }
 
@@ -432,9 +437,9 @@ public class ImportServiceImpl implements ImportService {
         // schedule a FileImport job for each file found in the bucket
 
         if ( bucketFiles.isEmpty() )  {
-            rootImportTask.setState( Import.State.FINISHED );
-            rootImportTask.setErrorMessage( "No files found in the bucket: " + bucketName );
-            emManagementApp.update(rootImportTask);
+            importEntity.setState(Import.State.FINISHED);
+            importEntity.setErrorMessage("No files found in the bucket: " + bucketName);
+            emManagementApp.update(importEntity);
 
         } else {
 
@@ -445,10 +450,31 @@ public class ImportServiceImpl implements ImportService {
             // create the Entity Connection and set up metadata for each job
 
             for ( String bucketFile : bucketFiles ) {
-                final JobData jobData = createFileTask(config, bucketFile, rootImportTask);
+                final JobData jobData = createFileTask(config, bucketFile, importEntity);
                 fileJobs.add( jobData) ;
             }
 
+            int retries = 0;
+            int maxRetries = 60;
+            Results entities;
+            boolean done = false;
+            while ( !done && retries++ < maxRetries ) {
+
+                entities = emManagementApp.getConnectedEntities(
+                    importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
+
+                logger.debug("Found {} jobs", entities.size());
+                
+                if ( entities.size() == fileJobs.size() ) {
+                    done = true;
+                } else {
+                    Thread.sleep(1000);
+                }
+            }
+            if ( retries >= maxRetries ) {
+                throw new RuntimeException("Max retries was reached");
+            }
+
             // schedule each job
 
             for ( JobData jobData: fileJobs ) {
@@ -462,9 +488,9 @@ public class ImportServiceImpl implements ImportService {
             }
 
             fileMetadata.put("files", value);
-            rootImportTask.addProperties(fileMetadata);
-            rootImportTask.setFileCount( fileJobs.size() );
-            emManagementApp.update(rootImportTask);
+            importEntity.addProperties(fileMetadata);
+            importEntity.setFileCount(fileJobs.size());
+            emManagementApp.update(importEntity);
         }
     }
 
@@ -546,31 +572,35 @@ public class ImportServiceImpl implements ImportService {
         String randTag = RandomStringUtils.randomAlphanumeric(4);
         logger.debug("{} Got importEntity {}", randTag, importEntity.getUuid() );
 
-        int retries = 0;
-        int maxRetries = 60;
-        Results entities = null;
-        boolean done = false;
-        while ( !done && retries++ < maxRetries ) {
-
-            // get all file import job siblings of the current job we're working now
-            entities = emManagementApp.getConnectedEntities(
-                importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
+        Results entities = emManagementApp.getConnectedEntities(
+            importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
 
-            if ( entities.size() == importEntity.getFileCount() ) {
-                logger.debug("{} got {} file_import entities, expected {} DONE!",
-                    new Object[] { randTag, entities.size(), importEntity.getFileCount() });
-                done = true;
 
-            } else {
-                logger.debug("{} got {} file_import entities, expected {} waiting... ",
-                    new Object[] { randTag, entities.size(), importEntity.getFileCount() });
-                Thread.sleep(1000);
-            }
-        }
-
-        if ( retries >= maxRetries ) {
-            throw new RuntimeException("Max retries was reached");
-        }
+//        int retries = 0;
+//        int maxRetries = 60;
+//        Results entities = null;
+//        boolean done = false;
+//        while ( !done && retries++ < maxRetries ) {
+//
+//            // get all file import job siblings of the current job we're working now
+//            entities = emManagementApp.getConnectedEntities(
+//                importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
+//
+//            if ( entities.size() == importEntity.getFileCount() ) {
+//                logger.debug("{} got {} file_import entities, expected {} DONE!",
+//                    new Object[] { randTag, entities.size(), importEntity.getFileCount() });
+//                done = true;
+//
+//            } else {
+//                logger.debug("{} got {} file_import entities, expected {} waiting... ",
+//                    new Object[] { randTag, entities.size(), importEntity.getFileCount() });
+//                Thread.sleep(1000);
+//            }
+//        }
+//
+//        if ( retries >= maxRetries ) {
+//            throw new RuntimeException("Max retries was reached");
+//        }
 
 
         PagingResultsIterator itr = new PagingResultsIterator( entities );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2428b862/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index 24ed292..a9fd7db 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@ -285,9 +285,6 @@ public class ImportCollectionIT {
     }
 
 
-
-
-
    /**
      * Simple import test but with multiple files.
      */
@@ -301,7 +298,7 @@ public class ImportCollectionIT {
             // create 4 applications each with collection of 10 things, export all to S3
             logger.debug("\n\nCreating 10 applications with 10 entities each\n");
 
-            for (int i = 0; i < 4; i++) {
+            for (int i = 0; i < 10; i++) {
 
                 String appName = "import-test-" + i + RandomStringUtils.randomAlphanumeric(10);
                 UUID appId = setup.getMgmtSvc().createApplication(organization.getUuid(), appName).getId();
@@ -324,14 +321,12 @@ public class ImportCollectionIT {
 
             logger.debug("\n\nQuery to see if we now have 100 entities\n");
 
-
             Query query = Query.fromQL("select *").withLimit(101);
             List<Entity> importedThings = emDefaultApp.getCollection(
                 emDefaultApp.getApplicationId(), "things", query, Level.ALL_PROPERTIES).getEntities();
 
-
             assertTrue(!importedThings.isEmpty());
-            assertEquals(40, importedThings.size());
+            assertEquals(100, importedThings.size());
 
         } finally {
             deleteBucket();