You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/02/11 19:14:39 UTC

[1/2] incubator-usergrid git commit: Improve error reporting during file import.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-import 25b3645f2 -> a4327e815


Improve error reporting during file import.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0234e2bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0234e2bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0234e2bd

Branch: refs/heads/two-dot-o-import
Commit: 0234e2bd61004908173b488abdbb9430340f7c82
Parents: ef2bdc0
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Feb 11 13:11:58 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Feb 11 13:11:58 2015 -0500

----------------------------------------------------------------------
 .../management/importer/ImportServiceImpl.java  | 197 ++++++++++++-------
 .../management/importer/ImportCollectionIT.java |  14 +-
 2 files changed, 138 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0234e2bd/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 6e24264..6f08ffb 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -87,7 +87,7 @@ public class ImportServiceImpl implements ImportService {
         this.sch = sch;
     }
 
-    
+
     /**
      * This schedules the main import Job.
      *
@@ -529,7 +529,9 @@ public class ImportServiceImpl implements ImportService {
 
 
     @Override
-    public void downloadAndImportFile(JobExecution jobExecution) throws Exception {
+    public void downloadAndImportFile(JobExecution jobExecution) {
+
+        // get values we need
 
         Map<String, Object> properties =
             (Map<String, Object>)jobExecution.getJobData().getProperty("properties");
@@ -544,12 +546,22 @@ public class ImportServiceImpl implements ImportService {
         String accessId = (String) storage_info.get( "s3_access_id");
         String secretKey = (String) storage_info.get( "s3_key" );
 
-        FileImport fileImport = getFileImportEntity(jobExecution);
+        EntityManager emManagementApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
+
+        // get the file import entity
+
+        FileImport fileImport;
+        try {
+            fileImport = getFileImportEntity(jobExecution);
+        } catch (Exception e) {
+            logger.error("Error updating fileImport to set state of file import", e);
+            return;
+        }
+
         String fileName = jobExecution.getJobData().getProperty("File").toString();
         UUID targetAppId = (UUID) jobExecution.getJobData().getProperty("applicationId");
 
-        logger.debug("downloadAndImportFile() for file {} ", fileName);
-
+        // is job already done?
         if (   FileImport.State.FAILED.equals( fileImport.getState() )
             || FileImport.State.FINISHED .equals(fileImport.getState()) ) {
             return;
@@ -557,18 +569,33 @@ public class ImportServiceImpl implements ImportService {
 
         // update FileImport Entity to indicate that we have started
 
-        EntityManager emManagementApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
-        emManagementApp.update( fileImport );
-        fileImport.setState( FileImport.State.STARTED );
-        emManagementApp.update( fileImport );
+        logger.debug("downloadAndImportFile() for file {} ", fileName);
+        try {
+            emManagementApp.update( fileImport );
+            fileImport.setState(FileImport.State.STARTED);
+            emManagementApp.update(fileImport);
+
+            if ( emManagementApp.get( targetAppId ) == null ) {
+                fileImport.setState( FileImport.State.FAILED );
+                fileImport.setErrorMessage("Application " + targetAppId + " does not exist");
+                emManagementApp.update(fileImport);
+                return;
+            }
 
-        if ( emManagementApp.get( targetAppId ) == null ) {
-            throw new IllegalArgumentException( "Application does not exist: " + targetAppId.toString() );
+        } catch (Exception e) {
+            fileImport.setState( FileImport.State.FAILED );
+            fileImport.setErrorMessage("Application " + targetAppId + " does not exist");
+            try {
+                emManagementApp.update( fileImport );
+            } catch (Exception e1) {
+                logger.error("Error updating fileImport to set state of file import", e1);
+            }
         }
         EntityManager targetEm = emf.getEntityManager( targetAppId );
 
         // download file from S3, if no S3 importer was passed in then create one
 
+        File downloadedFile = null;
         S3Import s3Import;
         Object s3PlaceHolder = jobExecution.getJobData().getProperty("s3Import");
         try {
@@ -581,70 +608,115 @@ public class ImportServiceImpl implements ImportService {
             logger.error("doImport(): Error creating S3Import", e);
             fileImport.setErrorMessage(e.getMessage());
             fileImport.setState( FileImport.State.FAILED );
-            emManagementApp.update(fileImport);
+            try {
+                emManagementApp.update(fileImport);
+            } catch (Exception e1) {
+                logger.error("Error updating file import with error information", e1);
+            }
             return;
         }
-        File downloadedFile = s3Import.copyFileFromBucket(
-            fileName, bucketName, accessId, secretKey );
+
+        try {
+            downloadedFile = s3Import.copyFileFromBucket(
+                fileName, bucketName, accessId, secretKey );
+        } catch (Exception e) {
+            logger.error("Error downloading file " + fileName, e);
+            fileImport.setErrorMessage("Error downloading file " + fileName + ": " + e.getMessage());
+            fileImport.setState(FileImport.State.FAILED);
+            try {
+                emManagementApp.update(fileImport);
+            } catch (Exception e1) {
+                logger.error("Error updating file import with error information", e1);
+            }
+        }
 
         // parse JSON data, create Entities and Connections from import data
 
-        parseEntitiesAndConnectionsFromJson(
-            jobExecution, downloadedFile, targetEm, emManagementApp, fileImport);
+        try {
+            parseEntitiesAndConnectionsFromJson(
+                jobExecution, downloadedFile, targetEm, emManagementApp, fileImport);
+
+        } catch (Exception e) {
+            logger.error("Error importing file " + fileName, e);
+            fileImport.setErrorMessage("Error downloading file " + fileName + ": " + e.getMessage());
+            fileImport.setState( FileImport.State.FAILED );
+            try {
+                emManagementApp.update(fileImport);
+            } catch (Exception e1) {
+                logger.error("Error updating file import with error information", e1);
+            }
+        }
 
         // mark ImportJob FINISHED but only if all other FileImportJobs are complete
 
         // get parent import job of this file import job
 
-        Results importJobResults =
-            emManagementApp.getConnectingEntities( fileImport, IMPORT_FILE_INCLUDES, null, Level.ALL_PROPERTIES );
-        List<Entity> importEntities = importJobResults.getEntities();
-        UUID importId = importEntities.get( 0 ).getUuid();
-        Import importEntity = emManagementApp.get( importId, Import.class );
+        String randTag = RandomStringUtils.randomAlphanumeric(4); // for logging
 
-        String randTag = RandomStringUtils.randomAlphanumeric(4);
-        logger.debug("{} Got importEntity {}", randTag, importEntity.getUuid() );
+        int failCount = 0;
+        int successCount = 0;
+        Import importEntity = null;
+        try {
 
-        EntityManager emMgmtApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
-        Query query = Query.fromQL("select *");
-        query.setEntityType("file_import");
-        query.setConnectionType( IMPORT_FILE_INCLUDES );
-        query.setLimit(MAX_FILE_IMPORTS);
-        Results entities = emMgmtApp.searchConnectedEntities(importEntity, query);
+            Results importJobResults =
+                emManagementApp.getConnectingEntities(fileImport, IMPORT_FILE_INCLUDES, null, Level.ALL_PROPERTIES);
+            List<Entity> importEntities = importJobResults.getEntities();
+            UUID importId = importEntities.get(0).getUuid();
+            importEntity = emManagementApp.get(importId, Import.class);
 
-        PagingResultsIterator itr = new PagingResultsIterator( entities );
+            logger.debug("{} Got importEntity {}", randTag, importEntity.getUuid());
 
-        logger.debug("{} Check {} jobs to see if we are done for file {}",
-            new Object[] { randTag, entities.size(), fileImport.getFileName() } );
+            EntityManager emMgmtApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+            Query query = Query.fromQL("select *");
+            query.setEntityType("file_import");
+            query.setConnectionType(IMPORT_FILE_INCLUDES);
+            query.setLimit(MAX_FILE_IMPORTS);
+            Results entities = emMgmtApp.searchConnectedEntities(importEntity, query);
 
-        int failCount = 0;
-        int successCount = 0;
-        while ( itr.hasNext() ) {
-            FileImport fi = ( FileImport ) itr.next();
-            switch ( fi.getState() ) {
-                case FAILED:     // failed, but we may not be complete so continue checking
-                    failCount++;
-                    break;
-                case FINISHED:   // finished, we can continue checking
-                    successCount++;
-                    continue;
-                default:         // not something we recognize as complete, short circuit
-                    logger.debug("{} not done yet, bail out...", randTag);
-                    return;
+            PagingResultsIterator itr = new PagingResultsIterator(entities);
+
+            logger.debug("{} Check {} jobs to see if we are done for file {}",
+                new Object[]{randTag, entities.size(), fileImport.getFileName()});
+
+            while (itr.hasNext()) {
+                FileImport fi = (FileImport) itr.next();
+                switch (fi.getState()) {
+                    case FAILED:     // failed, but we may not be complete so continue checking
+                        failCount++;
+                        break;
+                    case FINISHED:   // finished, we can continue checking
+                        successCount++;
+                        continue;
+                    default:         // not something we recognize as complete, short circuit
+                        logger.debug("{} not done yet, bail out...", randTag);
+                        return;
+                }
+            }
+
+        } catch ( Exception e ) {
+            failCount++;
+            if ( importEntity != null ) {
+                importEntity.setErrorMessage("Error determining status of file import jobs");
             }
+            logger.debug("Error determining status of file import jobs", e);
         }
 
         logger.debug("{} successCount = {} failCount = {}", new Object[] { randTag, successCount, failCount } );
 
-        if ( failCount == 0 ) {
+        if ( importEntity != null && failCount == 0 ) {
             logger.debug("{} FINISHED", randTag);
             importEntity.setState(Import.State.FINISHED);
-        }  else {
+
+        }  else if ( importEntity != null ) {
             // we had failures, set it to failed
             importEntity.setState(Import.State.FAILED);
         }
 
-        emManagementApp.update( importEntity );
+        try {
+            emManagementApp.update( importEntity );
+        } catch (Exception e) {
+            logger.error("Error updating import entity", e);
+        }
     }
 
 
@@ -809,9 +881,9 @@ public class ImportServiceImpl implements ImportService {
                 tracker.entityWritten();
 
             } catch (Exception e) {
-                logger.error("Error writing entity", e);
+                logger.error("Error writing entity. From file:" + fileImport.getFileName(), e);
 
-                tracker.entityFailed( e.getMessage() );
+                tracker.entityFailed( e.getMessage() + " From file: " + fileImport.getFileName() );
             }
         }
     }
@@ -826,7 +898,6 @@ public class ImportServiceImpl implements ImportService {
             this.ownerEntityRef = ownerEntityRef;
             this.connectionType = connectionType;
             this.entityRef = entryRef;
-
         }
 
         // creates connections between entities
@@ -852,8 +923,9 @@ public class ImportServiceImpl implements ImportService {
                 tracker.connectionWritten();
 
             } catch (Exception e) {
-                logger.error("Error writing connection", e);
-                tracker.connectionFailed( e.getMessage() );
+                logger.error("Error writing connection. From file: " + fileImport.getFileName(), e);
+
+                tracker.connectionFailed( e.getMessage() + " From file: " + fileImport.getFileName() );
             }
         }
     }
@@ -882,20 +954,9 @@ public class ImportServiceImpl implements ImportService {
                 em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
 
             } catch (Exception e) {
-                logger.error("Error writing dictionary", e);
-
-                //TODO add statistics for dictionary writes and failures
-//                fileImport.setErrorMessage(e.getMessage());
-//                try {
-//
-//                    rootEm.update(fileImport);
-//
-//                } catch (Exception ex) {
-//
-//                    // TODO should we abort at this point?
-//                    logger.error("Error updating file import report with error message: "
-//                        + fileImport.getErrorMessage(), ex);
-//                }
+                logger.error("Error writing dictionary. From file: " + fileImport.getFileName(), e);
+
+                // TODO add statistics for dictionary writes and failures
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0234e2bd/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index 9662e0c..0d20ea1 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@ -377,17 +377,20 @@ public class ImportCollectionIT {
 
         deleteBucket();
 
-        //list out all the files in the resource directory you want uploaded
+        // upload good and badly formatted files to our S3 bucket
+
         List<String> filenames = new ArrayList<>( 3 );
         filenames.add( "testImport.testCollection.1.json" );
         filenames.add( "testImport.testApplication.2.json" );
         filenames.add( "testImportInvalidJson.testApplication.3.json" );
-        // create 10 applications each with collection of 10 things, export all to S3
+
         S3Upload s3Upload = new S3Upload();
-        s3Upload.copyToS3( System.getProperty(SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR), System.getProperty(SDKGlobalConfiguration.SECRET_KEY_ENV_VAR),
+        s3Upload.copyToS3(
+            System.getProperty( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR),
+            System.getProperty( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR),
             bucketName, filenames );
 
-        // import all those exports from S3 into the default test application
+        // import all those files into the default test application
 
         final EntityManager emDefaultApp = setup.getEmf().getEntityManager( applicationId );
         importCollection( emDefaultApp, "things" );
@@ -399,7 +402,8 @@ public class ImportCollectionIT {
 
         assertTrue( !importedThings.isEmpty() );
         assertEquals( 7, importedThings.size() );
-        //TODO: have something that checks the exceptions and errors.
+
+        // TODO: have something that checks the exceptions and errors.
     }
 
 


[2/2] incubator-usergrid git commit: Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import

Posted by sn...@apache.org.
Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import

Conflicts:
	stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a4327e81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a4327e81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a4327e81

Branch: refs/heads/two-dot-o-import
Commit: a4327e8152162288e7881ecdb7a243b6ea7ae9fd
Parents: 0234e2b 25b3645
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Feb 11 13:14:28 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Feb 11 13:14:28 2015 -0500

----------------------------------------------------------------------
 .../usergrid/persistence/index/query/Query.java |  47 +++--
 .../org/apache/usergrid/rest/ApiResponse.java   |  16 ++
 .../exceptions/NullArgumentExceptionMapper.java |  42 +++++
 .../imports/FileErrorsResource.java             | 143 ++++++++++++++++
 .../imports/FileIncludesResource.java           | 159 +++++++++++++++++
 .../applications/imports/ImportResource.java    |  98 -----------
 .../applications/imports/ImportsResource.java   | 126 ++++++++------
 .../rest/management/ImportResourceIT.java       | 104 +++++------
 .../management/importer/FileImportTracker.java  |   3 +
 .../management/importer/ImportService.java      |  52 +++++-
 .../management/importer/ImportServiceImpl.java  | 171 ++++++++++++++++---
 11 files changed, 717 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a4327e81/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 6f08ffb,f35e6f2..d549dba
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@@ -78,16 -80,7 +80,6 @@@ public class ImportServiceImpl implemen
      }
  
  
-     public SchedulerService getSch() {
-         return sch;
-     }
- 
- 
-     public void setSch(final SchedulerService sch) {
-         this.sch = sch;
-     }
- 
--
      /**
       * This schedules the main import Job.
       *
@@@ -181,6 -195,118 +194,118 @@@
          return entity;
      }
  
+     @Override
+     public Results getFileImports(final UUID applicationId, final UUID importId, @Nullable  final String ql, @Nullable final String cursor ) {
+ 
+         Preconditions.checkNotNull( applicationId, "applicationId must be specified" );
+                Preconditions.checkNotNull( importId, "importId must be specified" );
+ 
+         try {
+             final EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId() );
+ 
+ 
+             final Import importEntity = getImport( applicationId, importId );
+ 
+             Query query = Query.fromQLNullSafe( ql );
+             query.setCursor( cursor );
+ 
+             //set our entity type
+             query.setEntityType( Schema.getDefaultSchema().getEntityType( Import.class ) );
+ 
+             return rootEm.searchCollection( importEntity, IMPORT_FILE_INCLUDES_CONNECTION, query );
+         }
+         catch ( Exception e ) {
+             throw new RuntimeException( "Unable to get import entity", e );
+         }
+ 
+     }
+ 
+ 
+     @Override
+     public FileImport getFileImport(final UUID applicationId,  final UUID importId, final UUID fileImportId ) {
+         try {
+             final EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId() );
+ 
+ 
+             final Import importEntity = getImport( applicationId, importId );
+ 
+             if ( importEntity == null ) {
+                 throw new EntityNotFoundException( "Import not found with id " + importId );
+             }
+ 
+ 
+             final FileImport fileImport = rootEm.get( importId, FileImport.class );
+ 
+ 
+             // check if it's on the path
+             if ( !rootEm.isConnectionMember( importEntity, APP_IMPORT_CONNECTION, fileImport ) ) {
+                 return null;
+             }
+ 
+             return fileImport;
+         }
+         catch ( Exception e ) {
+             throw new RuntimeException( "Unable to load file import", e );
+         }
+     }
+ 
+ 
+     @Override
+     public Results getFailedImportEntities(final UUID applicationId,  final UUID importId, final UUID fileImportId,  @Nullable  final String ql, @Nullable final String cursor ) {
+ 
+         Preconditions.checkNotNull( applicationId, "applicationId must be specified" );
+         Preconditions.checkNotNull( importId, "importId must be specified" );
+         Preconditions.checkNotNull( fileImportId, "fileImportId must be specified" );
+ 
+         try {
+             final EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId() );
+ 
+ 
 -            final FileImport importEntity = getFileImport( applicationId, importId, fileImportId );
++            final FileImport importEntity = getFileImport(applicationId, importId, fileImportId);
+ 
+             Query query = Query.fromQLNullSafe( ql );
+             query.setCursor( cursor );
+ 
+             //set our entity type
+             query.setEntityType( Schema.getDefaultSchema().getEntityType( FailedImportEntity.class ) );
+ 
+             return rootEm.searchCollection( importEntity, FileImportTracker.ERRORS_CONNECTION_NAME, query );
+         }
+         catch ( Exception e ) {
+             throw new RuntimeException( "Unable to get import entity", e );
+         }
+     }
+ 
+ 
+     @Override
+     public FailedImportEntity getFailedImportEntity(final UUID applicationId, final UUID importId, final UUID fileImportId,
+                                                      final UUID failedImportId ) {
+         try {
+             final EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId() );
+ 
+ 
+             final FileImport importEntity = getFileImport( applicationId, importId, fileImportId );
+ 
+             if ( importEntity == null ) {
+                 throw new EntityNotFoundException( "Import not found with id " + importId );
+             }
+ 
+ 
+             final FailedImportEntity fileImport = rootEm.get( importId, FailedImportEntity.class );
+ 
+ 
+             // check if it's on the path
+             if ( !rootEm.isConnectionMember( importEntity, FileImportTracker.ERRORS_CONNECTION_NAME, fileImport ) ) {
+                 return null;
+             }
+ 
+             return fileImport;
+         }
+         catch ( Exception e ) {
+             throw new RuntimeException( "Unable to load file import", e );
+         }
+     }
+ 
  
      /**
       * This schedules the sub  FileImport Job
@@@ -343,7 -470,7 +469,7 @@@
          UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
          EntityManager importManager = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
  
--        return importManager.get( importId, Import.class );
++        return importManager.get(importId, Import.class);
      }
  
  
@@@ -651,54 -728,42 +782,54 @@@
  
          // get parent import job of this file import job
  
 -        Results importJobResults =
 -            emManagementApp.getConnectingEntities( fileImport, IMPORT_FILE_INCLUDES_CONNECTION, null, Level.ALL_PROPERTIES );
 -        List<Entity> importEntities = importJobResults.getEntities();
 -        UUID importId = importEntities.get( 0 ).getUuid();
 -        Import importEntity = emManagementApp.get( importId, Import.class );
 +        String randTag = RandomStringUtils.randomAlphanumeric(4); // for logging
  
 -        String randTag = RandomStringUtils.randomAlphanumeric(4);
 -        logger.debug("{} Got importEntity {}", randTag, importEntity.getUuid() );
 +        int failCount = 0;
 +        int successCount = 0;
 +        Import importEntity = null;
 +        try {
  
 -        EntityManager emMgmtApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
 -        Query query = Query.fromQL("select *");
 -        query.setEntityType("file_import");
 -        query.setConnectionType( IMPORT_FILE_INCLUDES_CONNECTION );
 -        query.setLimit(MAX_FILE_IMPORTS);
 -        Results entities = emMgmtApp.searchConnectedEntities(importEntity, query);
 +            Results importJobResults =
-                 emManagementApp.getConnectingEntities(fileImport, IMPORT_FILE_INCLUDES, null, Level.ALL_PROPERTIES);
++                emManagementApp.getConnectingEntities( fileImport, IMPORT_FILE_INCLUDES_CONNECTION, null, Level.ALL_PROPERTIES );
 +            List<Entity> importEntities = importJobResults.getEntities();
 +            UUID importId = importEntities.get(0).getUuid();
 +            importEntity = emManagementApp.get(importId, Import.class);
  
 -        PagingResultsIterator itr = new PagingResultsIterator( entities );
 +            logger.debug("{} Got importEntity {}", randTag, importEntity.getUuid());
  
-             EntityManager emMgmtApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
 -        logger.debug("{} Check {} jobs to see if we are done for file {}",
 -            new Object[] { randTag, entities.size(), fileImport.getFileName() } );
++            EntityManager emMgmtApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
 +            Query query = Query.fromQL("select *");
 +            query.setEntityType("file_import");
-             query.setConnectionType(IMPORT_FILE_INCLUDES);
++            query.setConnectionType( IMPORT_FILE_INCLUDES_CONNECTION );
 +            query.setLimit(MAX_FILE_IMPORTS);
 +            Results entities = emMgmtApp.searchConnectedEntities(importEntity, query);
 +
 +            PagingResultsIterator itr = new PagingResultsIterator(entities);
 +
 +            logger.debug("{} Check {} jobs to see if we are done for file {}",
 +                new Object[]{randTag, entities.size(), fileImport.getFileName()});
 +
 +            while (itr.hasNext()) {
 +                FileImport fi = (FileImport) itr.next();
 +                switch (fi.getState()) {
 +                    case FAILED:     // failed, but we may not be complete so continue checking
 +                        failCount++;
 +                        break;
 +                    case FINISHED:   // finished, we can continue checking
 +                        successCount++;
 +                        continue;
 +                    default:         // not something we recognize as complete, short circuit
 +                        logger.debug("{} not done yet, bail out...", randTag);
 +                        return;
 +                }
 +            }
  
 -        int failCount = 0;
 -        int successCount = 0;
 -        while ( itr.hasNext() ) {
 -            FileImport fi = ( FileImport ) itr.next();
 -            switch ( fi.getState() ) {
 -                case FAILED:     // failed, but we may not be complete so continue checking
 -                    failCount++;
 -                    break;
 -                case FINISHED:   // finished, we can continue checking
 -                    successCount++;
 -                    continue;
 -                default:         // not something we recognize as complete, short circuit
 -                    logger.debug("{} not done yet, bail out...", randTag);
 -                    return;
 +        } catch ( Exception e ) {
 +            failCount++;
 +            if ( importEntity != null ) {
 +                importEntity.setErrorMessage("Error determining status of file import jobs");
              }
 +            logger.debug("Error determining status of file import jobs", e);
          }
  
          logger.debug("{} successCount = {} failCount = {}", new Object[] { randTag, successCount, failCount } );