You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/02/12 04:47:06 UTC
incubator-usergrid git commit: Updated tests
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-import fcc173997 -> 650864929
Updated tests
Refactored finish logic so that it can be invoked on error as well
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/65086492
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/65086492
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/65086492
Branch: refs/heads/two-dot-o-import
Commit: 6508649290f4214ba52f16b560ec8d92923c9b1e
Parents: fcc1739
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Feb 11 20:47:04 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Feb 11 20:47:04 2015 -0700
----------------------------------------------------------------------
.../rest/management/ImportResourceIT.java | 4 +-
.../management/importer/ImportServiceImpl.java | 121 ++++++++++++-------
2 files changed, 82 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/65086492/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
index 9b81b40..2448f6d 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/management/ImportResourceIT.java
@@ -453,7 +453,7 @@ public class ImportResourceIT extends AbstractRestIT {
assertEquals( "testimport-bad-connection.json", includesEntity.getString( "fileName" ) );
assertEquals(1, includesEntity.get( "importedConnectionCount" ));
- assertEquals(1, includesEntity.getString( "importedEntityCount" ));
+ assertEquals(1, includesEntity.get( "importedEntityCount" ));
assertEquals("FINISHED", importGet.get("state"));
assertEquals(1, importGet.get("fileCount"));
@@ -642,7 +642,7 @@ public class ImportResourceIT extends AbstractRestIT {
.addToPath(importEntity.getUuid().toString())
.get();
- if (importGet.get("state").equals("FINISHED")) {
+ if (importGet.get("state").equals("FINISHED") || importGet.get( "state" ).equals( "FAILED" )) {
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/65086492/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index c517ce4..43769aa 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -726,7 +726,11 @@ public class ImportServiceImpl implements ImportService {
} catch (Exception e1) {
logger.error("Error updating fileImport to set state of file import", e1);
}
+
+ checkIfComplete( emManagementApp, fileImport );
+ return;
}
+
EntityManager targetEm = emf.getEntityManager( targetAppId );
// download file from S3, if no S3 importer was passed in then create one
@@ -741,14 +745,16 @@ public class ImportServiceImpl implements ImportService {
s3Import = new S3ImportImpl();
}
} catch (Exception e) {
- logger.error("doImport(): Error creating S3Import", e);
- fileImport.setErrorMessage(e.getMessage());
+ logger.error( "doImport(): Error creating S3Import", e );
+ fileImport.setErrorMessage( e.getMessage() );
fileImport.setState( FileImport.State.FAILED );
try {
emManagementApp.update(fileImport);
} catch (Exception e1) {
logger.error("Error updating file import with error information", e1);
}
+ checkIfComplete( emManagementApp, fileImport );
+
return;
}
@@ -765,6 +771,7 @@ public class ImportServiceImpl implements ImportService {
logger.error("Error updating file import with error information", e1);
}
+ checkIfComplete( emManagementApp, fileImport );
return;
}
@@ -784,46 +791,73 @@ public class ImportServiceImpl implements ImportService {
logger.error("Error updating file import with error information", e1);
}
- return;
}
- // mark ImportJob FINISHED but only if all other FileImportJobs are complete
+ checkIfComplete( emManagementApp, fileImport );
- // get parent import job of this file import job
- String randTag = RandomStringUtils.randomAlphanumeric(4); // for logging
+ }
- int failCount = 0;
- int successCount = 0;
- Import importEntity = null;
- try {
+ private Import getImportEntity( final EntityManager rootEm, final FileImport fileImport ) {
+ try {
Results importJobResults =
- emManagementApp.getConnectingEntities( fileImport, IMPORT_FILE_INCLUDES_CONNECTION, null, Level.ALL_PROPERTIES );
+ rootEm.getConnectingEntities( fileImport, IMPORT_FILE_INCLUDES_CONNECTION, null, Level.ALL_PROPERTIES );
+
List<Entity> importEntities = importJobResults.getEntities();
- UUID importId = importEntities.get(0).getUuid();
- importEntity = emManagementApp.get(importId, Import.class);
+ UUID importId = importEntities.get( 0 ).getUuid();
+
+ final Import importEntity = ( Import ) importEntities.get( 0 ).toTypedEntity();
+
+ // final Import importEntity = rootEm.get(importId, Import.class);
+
+ return importEntity;
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Unable to import entity" );
+ }
+ }
+
+ /**
+ * Check if we're the last job on failure
+ * @param rootEm
+ * @param fileImport
+ */
+ private void checkIfComplete( final EntityManager rootEm, final FileImport fileImport ) {
+ int failCount = 0;
+ int successCount = 0;
- logger.debug("{} Got importEntity {}", randTag, importEntity.getUuid());
+ final Import importEntity = getImportEntity( rootEm, fileImport );
+
+ try {
+
+ logger.debug( "Got importEntity {}", importEntity.getUuid() );
EntityManager emMgmtApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
Query query = new Query();
- query.setEntityType(Schema.getDefaultSchema().getEntityType( FileImport.class ));
+ query.setEntityType( Schema.getDefaultSchema().getEntityType( FileImport.class ) );
query.setConnectionType( IMPORT_FILE_INCLUDES_CONNECTION );
- query.setLimit(MAX_FILE_IMPORTS);
+ query.setLimit( MAX_FILE_IMPORTS );
+
+ Results entities = emMgmtApp.searchConnectedEntities( importEntity, query );
- Results entities = emMgmtApp.searchConnectedEntities(importEntity, query);
+ PagingResultsIterator itr = new PagingResultsIterator( entities );
- PagingResultsIterator itr = new PagingResultsIterator(entities);
+ //TODO Dave, I found a race condition here with the query module and ES indexing latency. Aside from wait, I'm not sure what else we can do.
+ if(!itr.hasNext()){
+ logger.warn( "Just processed a file, but we were unable to get it back in a connection, this is a bug" );
+ return;
+ }
+
+ logger.debug( "Check {} jobs to see if we are done for file {}",
+ new Object[] { entities.size(), fileImport.getFileName() } );
- logger.debug("{} Check {} jobs to see if we are done for file {}",
- new Object[]{randTag, entities.size(), fileImport.getFileName()});
- while (itr.hasNext()) {
- FileImport fi = (FileImport) itr.next();
- switch (fi.getState()) {
+ while ( itr.hasNext() ) {
+ FileImport fi = ( FileImport ) itr.next();
+ switch ( fi.getState() ) {
case FAILED: // failed, but we may not be complete so continue checking
failCount++;
break;
@@ -831,35 +865,40 @@ public class ImportServiceImpl implements ImportService {
successCount++;
continue;
default: // not something we recognize as complete, short circuit
- logger.debug("{} not done yet, bail out...", randTag);
- return;
+ logger.debug( "not done yet, bail out..." ); return;
}
}
-
- } catch ( Exception e ) {
+ }
+ catch ( Exception e ) {
failCount++;
if ( importEntity != null ) {
- importEntity.setErrorMessage("Error determining status of file import jobs");
+ importEntity.setErrorMessage( "Error determining status of file import jobs" );
}
- logger.debug("Error determining status of file import jobs", e);
+ logger.debug( "Error determining status of file import jobs", e );
}
- logger.debug("{} successCount = {} failCount = {}", new Object[] { randTag, successCount, failCount } );
+ logger.debug( "successCount = {} failCount = {}", new Object[] { successCount, failCount } );
- if ( importEntity != null && failCount == 0 ) {
- logger.debug("{} FINISHED", randTag);
- importEntity.setState(Import.State.FINISHED);
+ if ( importEntity != null ) {
+ logger.debug( "FINISHED" );
- } else if ( importEntity != null ) {
- // we had failures, set it to failed
- importEntity.setState(Import.State.FAILED);
- }
+ if ( failCount == 0 ) {
+ importEntity.setState( Import.State.FINISHED );
+ }
+ else {
+ // we had failures, set it to failed
+ importEntity.setState( Import.State.FAILED );
+ }
- try {
- emManagementApp.update( importEntity );
- } catch (Exception e) {
- logger.error("Error updating import entity", e);
+ try {
+ rootEm.update( importEntity );
+ }
+ catch ( Exception e ) {
+ logger.error( "Error updating import entity", e );
+ }
}
+
+
}