You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/02/12 16:39:49 UTC
[1/2] incubator-usergrid git commit: More improvements to error
handling and reporting and using the tracker correctly,
plus improvements to ImportCollectionIT.testImportBadJson().
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-import 650864929 -> c65918dd6
More improvements to error handling and reporting and using the tracker correctly, plus improvements to ImportCollectionIT.testImportBadJson().
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ee864092
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ee864092
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ee864092
Branch: refs/heads/two-dot-o-import
Commit: ee8640928dc05c473bd31c286280d4aa4af194a5
Parents: a00af86
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Feb 12 10:25:01 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Feb 12 10:25:01 2015 -0500
----------------------------------------------------------------------
.../management/importer/ImportServiceImpl.java | 94 ++++++++++----------
.../management/importer/ImportCollectionIT.java | 61 +++++++++----
2 files changed, 88 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee864092/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 434d62f..bba12f1 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -684,7 +684,7 @@ public class ImportServiceImpl implements ImportService {
EntityManager emManagementApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
- // get the file import entity
+ // get the file import entity
FileImport fileImport;
try {
@@ -694,6 +694,9 @@ public class ImportServiceImpl implements ImportService {
return;
}
+ // tracker flushes every 100 entities
+ final FileImportTracker tracker = new FileImportTracker( emf, fileImport, 100 );
+
String fileName = jobExecution.getJobData().getProperty("File").toString();
UUID targetAppId = (UUID) jobExecution.getJobData().getProperty("applicationId");
@@ -712,20 +715,12 @@ public class ImportServiceImpl implements ImportService {
emManagementApp.update(fileImport);
if ( emManagementApp.get( targetAppId ) == null ) {
- fileImport.setState( FileImport.State.FAILED );
- fileImport.setErrorMessage("Application " + targetAppId + " does not exist");
- emManagementApp.update(fileImport);
+ tracker.fatal("Application " + targetAppId + " does not exist");
return;
}
} catch (Exception e) {
- fileImport.setState( FileImport.State.FAILED );
- fileImport.setErrorMessage("Application " + targetAppId + " does not exist");
- try {
- emManagementApp.update( fileImport );
- } catch (Exception e1) {
- logger.error("Error updating fileImport to set state of file import", e1);
- }
+ tracker.fatal("Application " + targetAppId + " does not exist");
}
EntityManager targetEm = emf.getEntityManager( targetAppId );
@@ -741,14 +736,7 @@ public class ImportServiceImpl implements ImportService {
s3Import = new S3ImportImpl();
}
} catch (Exception e) {
- logger.error("doImport(): Error creating S3Import", e);
- fileImport.setErrorMessage(e.getMessage());
- fileImport.setState( FileImport.State.FAILED );
- try {
- emManagementApp.update(fileImport);
- } catch (Exception e1) {
- logger.error("Error updating file import with error information", e1);
- }
+ tracker.fatal("Unable to connect to S3 bucket, error: " + e.getMessage());
return;
}
@@ -756,31 +744,17 @@ public class ImportServiceImpl implements ImportService {
downloadedFile = s3Import.copyFileFromBucket(
fileName, bucketName, accessId, secretKey );
} catch (Exception e) {
- logger.error("Error downloading file " + fileName, e);
- fileImport.setErrorMessage("Error downloading file " + fileName + ": " + e.getMessage());
- fileImport.setState(FileImport.State.FAILED);
- try {
- emManagementApp.update(fileImport);
- } catch (Exception e1) {
- logger.error("Error updating file import with error information", e1);
- }
+ tracker.fatal("Error downloading file " + fileName + ": " + e.getMessage());
}
// parse JSON data, create Entities and Connections from import data
try {
parseEntitiesAndConnectionsFromJson(
- jobExecution, downloadedFile, targetEm, emManagementApp, fileImport);
+ jobExecution, downloadedFile, targetEm, emManagementApp, fileImport, tracker);
} catch (Exception e) {
- logger.error("Error importing file " + fileName, e);
- fileImport.setErrorMessage("Error downloading file " + fileName + ": " + e.getMessage());
- fileImport.setState( FileImport.State.FAILED );
- try {
- emManagementApp.update(fileImport);
- } catch (Exception e1) {
- logger.error("Error updating file import with error information", e1);
- }
+ tracker.fatal("Error importing file " + fileName + ": " + e.getMessage());
}
// mark ImportJob FINISHED but only if all other FileImportJobs are complete
@@ -804,12 +778,14 @@ public class ImportServiceImpl implements ImportService {
EntityManager emMgmtApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
-
Query query = new Query();
query.setEntityType(Schema.getDefaultSchema().getEntityType( FileImport.class ));
query.setConnectionType( IMPORT_FILE_INCLUDES_CONNECTION );
query.setLimit(MAX_FILE_IMPORTS);
+ // TODO: better way to wait for ES indexes to catch up
+ try { Thread.sleep(5000); } catch ( Exception intentionallyIgnored ) {}
+
Results entities = emMgmtApp.searchConnectedEntities(importEntity, query);
PagingResultsIterator itr = new PagingResultsIterator(entities);
@@ -885,11 +861,12 @@ public class ImportServiceImpl implements ImportService {
final File file,
final EntityManager em,
final EntityManager rootEm,
- final FileImport fileImport) throws Exception {
+ final FileImport fileImport,
+ final FileImportTracker tracker) throws Exception {
// tracker flushes every 100 entities
- final FileImportTracker tracker = new FileImportTracker( emf, fileImport, 100 );
+ //final FileImportTracker tracker = new FileImportTracker( emf, fileImport, 100 );
// function to execute for each write event
final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
@@ -945,7 +922,12 @@ public class ImportServiceImpl implements ImportService {
jp.close();
- logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote entities\n");
+ if ( FileImport.State.FAILED.equals( fileImport.getState() ) ) {
+ logger.debug("\n\nFailed to completely write entities, skipping second phase. File: {}\n",
+ fileImport.getFileName());
+ return;
+ }
+ logger.debug("\n\nWrote entities. File: {}\n", fileImport.getFileName() );
// SECOND PASS: import all connections and dictionaries
@@ -984,9 +966,20 @@ public class ImportServiceImpl implements ImportService {
logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote others for file {}\n",
fileImport.getFileName());
+ if ( FileImport.State.FAILED.equals( fileImport.getState() ) ) {
+ logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): failed to completely write entities\n");
+ return;
+ }
// flush the job statistics
tracker.complete();
+
+ if ( FileImport.State.FAILED.equals( fileImport.getState() ) ) {
+ logger.debug("\n\nFailed to completely wrote connections and dictionaries. File: {}\n",
+ fileImport.getFileName());
+ return;
+ }
+ logger.debug("\n\nWrote connections and dictionaries. File: {}\n", fileImport.getFileName() );
}
@@ -1249,15 +1242,17 @@ public class ImportServiceImpl implements ImportService {
logger.debug("process(): done parsing JSON");
} catch (Exception e) {
- // skip illegal entity UUID and go to next one
- fileImport.setErrorMessage(e.getMessage());
- try {
- rootEm.update(fileImport);
- } catch (Exception ex) {
- logger.error("Error updating file import record", ex);
- }
+
+ tracker.fatal("Failed to import file" + fileImport.getFileName() + " error " + e.getMessage());
+
if ( subscriber != null ) {
- subscriber.onError(e);
+
+ // don't need to blow up here, we handled the problem
+ // if we blow up we may prevent in-flight entities from being written
+ // subscriber.onError(e);
+
+ // but we are done reading entities
+ subscriber.onCompleted();
}
}
}
@@ -1265,8 +1260,11 @@ public class ImportServiceImpl implements ImportService {
private void processWriteEvent( final Subscriber<? super WriteEvent> subscriber, WriteEvent writeEvent ) {
if ( subscriber == null ) {
+
+ // this logic makes it easy to remove Rx for debugging purposes
// no Rx, just do it
writeEvent.doWrite(em, fileImport, tracker);
+
} else {
subscriber.onNext( writeEvent );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee864092/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index 41d264c..fde0d8b 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.management.OrganizationInfo;
import org.apache.usergrid.management.UserInfo;
import org.apache.usergrid.management.export.ExportService;
import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.entities.FileImport;
import org.apache.usergrid.persistence.entities.Import;
import org.apache.usergrid.persistence.index.impl.ElasticSearchResource;
import org.apache.usergrid.persistence.index.query.Query;
@@ -342,37 +343,49 @@ public class ImportCollectionIT {
* TODO: Test that importing bad JSON will result in an informative error message.
*/
@Test
- public void testImportBadJson() throws Exception{
- // import from a bad JSON file
+ public void testImportBadJson() throws Exception {
deleteBucket();
- //list out all the files in the resource directory you want uploaded
- List<String> filenames = new ArrayList<>( 1 );
+ // export and upload a bad JSON file to the S3 bucket
+ List<String> filenames = new ArrayList<>( 1 );
filenames.add( "testimport-bad-json.json");
- // create 10 applications each with collection of 10 things, export all to S3
S3Upload s3Upload = new S3Upload();
s3Upload.copyToS3(
System.getProperty(SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR),
System.getProperty(SDKGlobalConfiguration.SECRET_KEY_ENV_VAR),
bucketName, filenames );
- // import all those exports from S3 into the default test application
+ // import bad JSON from from the S3 bucket
final EntityManager emDefaultApp = setup.getEmf().getEntityManager( applicationId );
- importCollection( emDefaultApp );
+ UUID importId = importCollection( emDefaultApp );
- // we should now have 100 Entities in the default app
+
+ // check that we got an informative error message back
List<Entity> importedThings = emDefaultApp.getCollection(
emDefaultApp.getApplicationId(), "things", null, Level.ALL_PROPERTIES).getEntities();
- assertTrue( importedThings.isEmpty() );
- // assertEquals( , importedThings.size() );
+ assertTrue("No entities should have been imported", importedThings.isEmpty());
+
+ ImportService importService = setup.getImportService();
+ Results results = importService.getFileImports( applicationId, importId, null, null );
+
+ assertEquals( "There is one", 1, results.size() );
+
+ assertEquals( "Entity is FileImport object",
+ FileImport.class, results.getEntity().getClass() );
- // check that error message indicates JSON parsing error
+ FileImport fileImport = (FileImport)results.getEntity();
+
+ assertEquals( "File name is correct",
+ "testimport-bad-json.json", fileImport.getFileName());
+
+ assertTrue( "Error message is correct",
+ fileImport.getErrorMessage().startsWith("Unexpected character ('<' (code 60))"));
}
@Test
@@ -432,7 +445,7 @@ public class ImportCollectionIT {
//---------------------------------------------------------------------------------------------
- private void importCollection(final EntityManager em ) throws Exception {
+ private UUID importCollection(final EntityManager em ) throws Exception {
logger.debug("\n\nImport into new app {}\n", em.getApplication().getName() );
@@ -454,19 +467,25 @@ public class ImportCollectionIT {
}} );
}} );
-
-
- int maxRetries = 60;
+ int maxRetries = 30;
int retries = 0;
- while ( !importService.getState( importEntity.getUuid() ).equals( "FINISHED" )
- && !importService.getState( importEntity.getUuid() ).equals( "FAILED" )
+ Import.State state = importService.getState(importEntity.getUuid());
+ while ( !state.equals( Import.State.FINISHED )
+ && !state.equals( Import.State.FAILED )
&& retries++ < maxRetries ) {
- logger.debug("Waiting for import...");
+ logger.debug("Waiting for import ({}) ...", state.toString());
Thread.sleep(1000);
+
+ state = importService.getState(importEntity.getUuid());
}
+ if ( retries >= maxRetries ) {
+ throw new RuntimeException("Max retries reached");
+ }
em.refreshIndex();
+
+ return importEntity.getUuid();
}
@@ -499,12 +518,16 @@ public class ImportCollectionIT {
}});
}});
- int maxRetries = 60;
+ int maxRetries = 30;
int retries = 0;
while ( !exportService.getState( exportUUID ).equals( "FINISHED" ) && retries++ < maxRetries ) {
logger.debug("Waiting for export...");
Thread.sleep(1000);
}
+
+ if ( retries >= maxRetries ) {
+ throw new RuntimeException("Max retries reached");
+ }
}
[2/2] incubator-usergrid git commit: Merge branch 'two-dot-o-import'
of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
two-dot-o-import
Posted by sn...@apache.org.
Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import
Conflicts:
stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c65918dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c65918dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c65918dd
Branch: refs/heads/two-dot-o-import
Commit: c65918dd6f352feaf5e55a5047f81abb45e315e6
Parents: ee86409 6508649
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Feb 12 10:39:38 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Feb 12 10:39:38 2015 -0500
----------------------------------------------------------------------
.../rest/management/ImportResourceIT.java | 43 +++---
.../management/importer/ImportServiceImpl.java | 131 ++++++++++++-------
2 files changed, 107 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c65918dd/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index bba12f1,43769aa..39cd480
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@@ -18,7 -18,7 +18,6 @@@
package org.apache.usergrid.management.importer;
import com.google.common.base.Preconditions;
--import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.batch.JobExecution;
import org.apache.usergrid.batch.service.SchedulerService;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@@ -389,7 -389,7 +388,8 @@@ public class ImportServiceImpl implemen
return entities.size();
// see ImportConnectsTest()
--// Results entities = emMgmtApp.getConnectedEntities( importRoot, "includes", null, Level.ALL_PROPERTIES );
++// Results entities = emMgmtApp.getConnectedEntities(
++// importRoot, "includes", null, Level.ALL_PROPERTIES );
// PagingResultsIterator itr = new PagingResultsIterator( entities );
// int count = 0;
// while ( itr.hasNext() ) {
@@@ -404,9 -404,9 +404,10 @@@
}
}
++
/**
-- * Schedule the file tasks. This must happen in 2 phases. The first is linking the sub files to the master the
-- * second is scheduling them to run.
++ * Schedule the file tasks. This must happen in 2 phases. The first is linking the
++ * sub files to the master the second is scheduling them to run.
*/
private JobData scheduleFileTasks( final JobData jobData ) {
@@@ -416,6 -416,6 +417,7 @@@
return sch.createJob(FILE_IMPORT_JOB_NAME, soonestPossible, jobData);
}
++
/**
* Query Entity Manager for the state of the Import Entity. This corresponds to the GET /import
*/
@@@ -437,6 -437,6 +439,7 @@@
return importUG.getState();
}
++
/**
* Query Entity Manager for the error message generated for an import job.
*/
@@@ -462,6 -462,6 +465,7 @@@
return importUG.getErrorMessage();
}
++
/**
* Returns the Import Entity that stores all meta-data for the particular import Job
*
@@@ -484,11 -484,11 +488,8 @@@
*/
@Override
public FileImport getFileImportEntity(final JobExecution jobExecution) throws Exception {
--
UUID fileImportId = (UUID) jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
--
EntityManager em = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
--
return em.get(fileImportId, FileImport.class);
}
@@@ -720,8 -719,18 +721,11 @@@
}
} catch (Exception e) {
- fileImport.setState( FileImport.State.FAILED );
- fileImport.setErrorMessage("Application " + targetAppId + " does not exist");
- try {
- emManagementApp.update( fileImport );
- } catch (Exception e1) {
- logger.error("Error updating fileImport to set state of file import", e1);
- }
-
+ tracker.fatal("Application " + targetAppId + " does not exist");
+ checkIfComplete( emManagementApp, fileImport );
+ return;
}
+
EntityManager targetEm = emf.getEntityManager( targetAppId );
// download file from S3, if no S3 importer was passed in then create one
@@@ -736,7 -745,16 +740,8 @@@
s3Import = new S3ImportImpl();
}
} catch (Exception e) {
- logger.error( "doImport(): Error creating S3Import", e );
- fileImport.setErrorMessage( e.getMessage() );
- fileImport.setState( FileImport.State.FAILED );
- try {
- emManagementApp.update(fileImport);
- } catch (Exception e1) {
- logger.error("Error updating file import with error information", e1);
- }
+ tracker.fatal("Unable to connect to S3 bucket, error: " + e.getMessage());
+ checkIfComplete( emManagementApp, fileImport );
-
return;
}
@@@ -744,58 -762,102 +749,80 @@@
downloadedFile = s3Import.copyFileFromBucket(
fileName, bucketName, accessId, secretKey );
} catch (Exception e) {
- logger.error("Error downloading file " + fileName, e);
- fileImport.setErrorMessage("Error downloading file " + fileName + ": " + e.getMessage());
- fileImport.setState(FileImport.State.FAILED);
- try {
- emManagementApp.update(fileImport);
- } catch (Exception e1) {
- logger.error("Error updating file import with error information", e1);
- }
-
+ tracker.fatal("Error downloading file " + fileName + ": " + e.getMessage());
+ checkIfComplete( emManagementApp, fileImport );
+ return;
}
// parse JSON data, create Entities and Connections from import data
try {
parseEntitiesAndConnectionsFromJson(
- jobExecution, downloadedFile, targetEm, emManagementApp, fileImport);
+ jobExecution, downloadedFile, targetEm, emManagementApp, fileImport, tracker);
} catch (Exception e) {
- logger.error("Error importing file " + fileName, e);
- fileImport.setErrorMessage("Error downloading file " + fileName + ": " + e.getMessage());
- fileImport.setState( FileImport.State.FAILED );
- try {
- emManagementApp.update(fileImport);
- } catch (Exception e1) {
- logger.error("Error updating file import with error information", e1);
- }
-
+ tracker.fatal("Error importing file " + fileName + ": " + e.getMessage());
}
- // mark ImportJob FINISHED but only if all other FileImportJobs are complete
+ checkIfComplete( emManagementApp, fileImport );
-
-
+ }
+
- // get parent import job of this file import job
+ private Import getImportEntity( final EntityManager rootEm, final FileImport fileImport ) {
+ try {
+ Results importJobResults =
- rootEm.getConnectingEntities( fileImport, IMPORT_FILE_INCLUDES_CONNECTION, null, Level.ALL_PROPERTIES );
++ rootEm.getConnectingEntities( fileImport, IMPORT_FILE_INCLUDES_CONNECTION,
++ null, Level.ALL_PROPERTIES );
- String randTag = RandomStringUtils.randomAlphanumeric(4); // for logging
+ List<Entity> importEntities = importJobResults.getEntities();
- UUID importId = importEntities.get( 0 ).getUuid();
-
+ final Import importEntity = ( Import ) importEntities.get( 0 ).toTypedEntity();
-
- // final Import importEntity = rootEm.get(importId, Import.class);
-
+ return importEntity;
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Unable to import entity" );
+ }
+ }
+ /**
+ * Check if we're the last job on failure
- * @param rootEm
- * @param fileImport
+ */
- private void checkIfComplete( final EntityManager rootEm, final FileImport fileImport ) {
++ private void checkIfComplete( final EntityManager emMgmtApp, final FileImport fileImport ) {
int failCount = 0;
int successCount = 0;
- Import importEntity = null;
+
- final Import importEntity = getImportEntity( rootEm, fileImport );
++ final Import importEntity = getImportEntity( emMgmtApp, fileImport );
+
try {
- Results importJobResults =
- emManagementApp.getConnectingEntities( fileImport, IMPORT_FILE_INCLUDES_CONNECTION, null, Level.ALL_PROPERTIES );
- List<Entity> importEntities = importJobResults.getEntities();
- UUID importId = importEntities.get(0).getUuid();
- importEntity = emManagementApp.get(importId, Import.class);
- logger.debug( "Got importEntity {}", importEntity.getUuid() );
++ // wait for query index to catch up
- logger.debug("{} Got importEntity {}", randTag, importEntity.getUuid());
- EntityManager emMgmtApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
++ // TODO: better way to wait for indexes to catch up
++ try { Thread.sleep(5000); } catch ( Exception intentionallyIgnored ) {}
- EntityManager emMgmtApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
++ // get file import entities for this import job
Query query = new Query();
- query.setEntityType(Schema.getDefaultSchema().getEntityType( FileImport.class ));
+ query.setEntityType( Schema.getDefaultSchema().getEntityType( FileImport.class ) );
query.setConnectionType( IMPORT_FILE_INCLUDES_CONNECTION );
- query.setLimit(MAX_FILE_IMPORTS);
+ query.setLimit( MAX_FILE_IMPORTS );
- // TODO: better way to wait for ES indexes to catch up
- try { Thread.sleep(5000); } catch ( Exception intentionallyIgnored ) {}
+ Results entities = emMgmtApp.searchConnectedEntities( importEntity, query );
-
+ PagingResultsIterator itr = new PagingResultsIterator( entities );
- Results entities = emMgmtApp.searchConnectedEntities(importEntity, query);
- //TODO Dave, I found a race condition here with the query module and ES indexing latency. Aside from wait, I'm not sure what else we can do.
- if(!itr.hasNext()){
- logger.warn( "Just processed a file, but we were unable to get it back in a connection, this is a bug" );
++ if ( !itr.hasNext() ) {
++ logger.warn("Found no FileImport entities for import {}, " +
++ "unable to check if complete", importEntity.getUuid());
+ return;
+ }
- PagingResultsIterator itr = new PagingResultsIterator(entities);
- logger.debug( "Check {} jobs to see if we are done for file {}",
++ logger.debug( "Checking {} file import jobs to see if we are done for file {}",
+ new Object[] { entities.size(), fileImport.getFileName() } );
- logger.debug("{} Check {} jobs to see if we are done for file {}",
- new Object[]{randTag, entities.size(), fileImport.getFileName()});
++ // loop through entities, count different types of status
- while (itr.hasNext()) {
- FileImport fi = (FileImport) itr.next();
- switch (fi.getState()) {
+ while ( itr.hasNext() ) {
+ FileImport fi = ( FileImport ) itr.next();
+ switch ( fi.getState() ) {
case FAILED: // failed, but we may not be complete so continue checking
failCount++;
break;
@@@ -803,35 -865,40 +830,40 @@@
successCount++;
continue;
default: // not something we recognize as complete, short circuit
- logger.debug("{} not done yet, bail out...", randTag);
- return;
+ logger.debug( "not done yet, bail out..." ); return;
}
}
-
- } catch ( Exception e ) {
+ }
+ catch ( Exception e ) {
failCount++;
if ( importEntity != null ) {
- importEntity.setErrorMessage("Error determining status of file import jobs");
+ importEntity.setErrorMessage( "Error determining status of file import jobs" );
}
- logger.debug("Error determining status of file import jobs", e);
+ logger.debug( "Error determining status of file import jobs", e );
}
- logger.debug("{} successCount = {} failCount = {}", new Object[] { randTag, successCount, failCount } );
+ logger.debug( "successCount = {} failCount = {}", new Object[] { successCount, failCount } );
- if ( importEntity != null && failCount == 0 ) {
- logger.debug("{} FINISHED", randTag);
- importEntity.setState(Import.State.FINISHED);
+ if ( importEntity != null ) {
+ logger.debug( "FINISHED" );
- } else if ( importEntity != null ) {
- // we had failures, set it to failed
- importEntity.setState(Import.State.FAILED);
- }
+ if ( failCount == 0 ) {
+ importEntity.setState( Import.State.FINISHED );
+ }
+ else {
+ // we had failures, set it to failed
+ importEntity.setState( Import.State.FAILED );
+ }
- try {
- emManagementApp.update( importEntity );
- } catch (Exception e) {
- logger.error("Error updating import entity", e);
+ try {
- rootEm.update( importEntity );
++ emMgmtApp.update( importEntity );
+ }
+ catch ( Exception e ) {
+ logger.error( "Error updating import entity", e );
+ }
}
+
+
}
@@@ -973,13 -1030,6 +1005,13 @@@
// flush the job statistics
tracker.complete();
+
+ if ( FileImport.State.FAILED.equals( fileImport.getState() ) ) {
+ logger.debug("\n\nFailed to completely wrote connections and dictionaries. File: {}\n",
+ fileImport.getFileName());
+ return;
+ }
- logger.debug("\n\nWrote connections and dictionaries. File: {}\n", fileImport.getFileName() );
++ logger.debug("\n\nWrote connections and dictionaries. File: {}\n", fileImport.getFileName());
}
@@@ -1226,7 -1276,7 +1258,8 @@@
}
} else if (token.equals( JsonToken.START_ARRAY )) {
-- if ( objectNameStack.size() == 1 && COLLECTION_OBJECT_NAME.equals( objectNameStack.peek() )) {
++ if ( objectNameStack.size() == 1
++ && COLLECTION_OBJECT_NAME.equals( objectNameStack.peek() )) {
entityType = InflectionUtils.singularize( name );
}