You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/02/12 04:47:06 UTC

incubator-usergrid git commit: Updated tests

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-import fcc173997 -> 650864929


Updated tests

Refactored finish logic so that it can be invoked on error as well


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/65086492
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/65086492
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/65086492

Branch: refs/heads/two-dot-o-import
Commit: 6508649290f4214ba52f16b560ec8d92923c9b1e
Parents: fcc1739
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Feb 11 20:47:04 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Feb 11 20:47:04 2015 -0700

----------------------------------------------------------------------
 .../rest/management/ImportResourceIT.java       |   4 +-
 .../management/importer/ImportServiceImpl.java  | 121 ++++++++++++-------
 2 files changed, 82 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/65086492/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
index 9b81b40..2448f6d 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
@@ -453,7 +453,7 @@ public class ImportResourceIT extends AbstractRestIT {
 
         assertEquals( "testimport-bad-connection.json", includesEntity.getString( "fileName" ) );
         assertEquals(1, includesEntity.get( "importedConnectionCount" ));
-        assertEquals(1, includesEntity.getString( "importedEntityCount" ));
+        assertEquals(1, includesEntity.get( "importedEntityCount" ));
 
         assertEquals("FINISHED", importGet.get("state"));
         assertEquals(1, importGet.get("fileCount"));
@@ -642,7 +642,7 @@ public class ImportResourceIT extends AbstractRestIT {
                 .addToPath(importEntity.getUuid().toString())
                 .get();
 
-            if (importGet.get("state").equals("FINISHED")) {
+            if (importGet.get("state").equals("FINISHED") || importGet.get( "state" ).equals( "FAILED" )) {
                 break;
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/65086492/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index c517ce4..43769aa 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -726,7 +726,11 @@ public class ImportServiceImpl implements ImportService {
             } catch (Exception e1) {
                 logger.error("Error updating fileImport to set state of file import", e1);
             }
+
+            checkIfComplete( emManagementApp, fileImport );
+            return;
         }
+
         EntityManager targetEm = emf.getEntityManager( targetAppId );
 
         // download file from S3, if no S3 importer was passed in then create one
@@ -741,14 +745,16 @@ public class ImportServiceImpl implements ImportService {
                 s3Import = new S3ImportImpl();
             }
         } catch (Exception e) {
-            logger.error("doImport(): Error creating S3Import", e);
-            fileImport.setErrorMessage(e.getMessage());
+            logger.error( "doImport(): Error creating S3Import", e );
+            fileImport.setErrorMessage( e.getMessage() );
             fileImport.setState( FileImport.State.FAILED );
             try {
                 emManagementApp.update(fileImport);
             } catch (Exception e1) {
                 logger.error("Error updating file import with error information", e1);
             }
+            checkIfComplete( emManagementApp, fileImport );
+
             return;
         }
 
@@ -765,6 +771,7 @@ public class ImportServiceImpl implements ImportService {
                 logger.error("Error updating file import with error information", e1);
             }
 
+            checkIfComplete( emManagementApp, fileImport );
             return;
         }
 
@@ -784,46 +791,73 @@ public class ImportServiceImpl implements ImportService {
                 logger.error("Error updating file import with error information", e1);
             }
 
-            return;
         }
 
-        // mark ImportJob FINISHED but only if all other FileImportJobs are complete
+        checkIfComplete( emManagementApp, fileImport );
 
-        // get parent import job of this file import job
 
-        String randTag = RandomStringUtils.randomAlphanumeric(4); // for logging
+    }
 
-        int failCount = 0;
-        int successCount = 0;
-        Import importEntity = null;
-        try {
 
+    private Import getImportEntity( final EntityManager rootEm, final FileImport fileImport ) {
+        try {
             Results importJobResults =
-                emManagementApp.getConnectingEntities( fileImport, IMPORT_FILE_INCLUDES_CONNECTION, null, Level.ALL_PROPERTIES );
+                rootEm.getConnectingEntities( fileImport, IMPORT_FILE_INCLUDES_CONNECTION, null, Level.ALL_PROPERTIES );
+
             List<Entity> importEntities = importJobResults.getEntities();
-            UUID importId = importEntities.get(0).getUuid();
-            importEntity = emManagementApp.get(importId, Import.class);
+            UUID importId = importEntities.get( 0 ).getUuid();
+
+            final Import importEntity = ( Import ) importEntities.get( 0 ).toTypedEntity();
+
+            //        final Import importEntity = rootEm.get(importId, Import.class);
+
+            return importEntity;
+        }
+        catch ( Exception e ) {
+            throw new RuntimeException( "Unable to import entity" );
+        }
+    }
+
+    /**
+     * Check if we're the last job on failure
+     * @param rootEm
+     * @param fileImport
+     */
+    private void checkIfComplete( final EntityManager rootEm, final FileImport fileImport ) {
+        int failCount = 0;
+        int successCount = 0;
 
-            logger.debug("{} Got importEntity {}", randTag, importEntity.getUuid());
+        final Import importEntity = getImportEntity( rootEm, fileImport );
+
+        try {
+
+            logger.debug( "Got importEntity {}", importEntity.getUuid() );
 
             EntityManager emMgmtApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
 
 
             Query query = new Query();
-            query.setEntityType(Schema.getDefaultSchema().getEntityType( FileImport.class ));
+            query.setEntityType( Schema.getDefaultSchema().getEntityType( FileImport.class ) );
             query.setConnectionType( IMPORT_FILE_INCLUDES_CONNECTION );
-            query.setLimit(MAX_FILE_IMPORTS);
+            query.setLimit( MAX_FILE_IMPORTS );
+
+            Results entities = emMgmtApp.searchConnectedEntities( importEntity, query );
 
-            Results entities = emMgmtApp.searchConnectedEntities(importEntity, query);
+            PagingResultsIterator itr = new PagingResultsIterator( entities );
 
-            PagingResultsIterator itr = new PagingResultsIterator(entities);
+            //TODO Dave, I found a race condition here with the query module and ES indexing latency.  Aside from wait, I'm not sure what else we can do.
+            if(!itr.hasNext()){
+                logger.warn( "Just processed a file, but we were unable to get it back in a connection, this is a bug" );
+                return;
+            }
+
+            logger.debug( "Check {} jobs to see if we are done for file {}",
+                new Object[] { entities.size(), fileImport.getFileName() } );
 
-            logger.debug("{} Check {} jobs to see if we are done for file {}",
-                new Object[]{randTag, entities.size(), fileImport.getFileName()});
 
-            while (itr.hasNext()) {
-                FileImport fi = (FileImport) itr.next();
-                switch (fi.getState()) {
+            while ( itr.hasNext() ) {
+                FileImport fi = ( FileImport ) itr.next();
+                switch ( fi.getState() ) {
                     case FAILED:     // failed, but we may not be complete so continue checking
                         failCount++;
                         break;
@@ -831,35 +865,40 @@ public class ImportServiceImpl implements ImportService {
                         successCount++;
                         continue;
                     default:         // not something we recognize as complete, short circuit
-                        logger.debug("{} not done yet, bail out...", randTag);
-                        return;
+                        logger.debug( "not done yet, bail out..." ); return;
                 }
             }
-
-        } catch ( Exception e ) {
+        }
+        catch ( Exception e ) {
             failCount++;
             if ( importEntity != null ) {
-                importEntity.setErrorMessage("Error determining status of file import jobs");
+                importEntity.setErrorMessage( "Error determining status of file import jobs" );
             }
-            logger.debug("Error determining status of file import jobs", e);
+            logger.debug( "Error determining status of file import jobs", e );
         }
 
-        logger.debug("{} successCount = {} failCount = {}", new Object[] { randTag, successCount, failCount } );
+        logger.debug( "successCount = {} failCount = {}", new Object[] { successCount, failCount } );
 
-        if ( importEntity != null && failCount == 0 ) {
-            logger.debug("{} FINISHED", randTag);
-            importEntity.setState(Import.State.FINISHED);
+        if ( importEntity != null ) {
+            logger.debug( "FINISHED" );
 
-        }  else if ( importEntity != null ) {
-            // we had failures, set it to failed
-            importEntity.setState(Import.State.FAILED);
-        }
+            if ( failCount == 0 ) {
+                importEntity.setState( Import.State.FINISHED );
+            }
+            else {
+                // we had failures, set it to failed
+                importEntity.setState( Import.State.FAILED );
+            }
 
-        try {
-            emManagementApp.update( importEntity );
-        } catch (Exception e) {
-            logger.error("Error updating import entity", e);
+            try {
+                rootEm.update( importEntity );
+            }
+            catch ( Exception e ) {
+                logger.error( "Error updating import entity", e );
+            }
         }
+
+
     }