You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/02/12 16:39:49 UTC

[1/2] incubator-usergrid git commit: More improvements to error handling and reporting and using the tracker correctly, plus improvements to ImportCollectionIT.testImportBadJson().

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-import 650864929 -> c65918dd6


More improvements to error handling and reporting and using the tracker correctly, plus improvements to ImportCollectionIT.testImportBadJson().


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ee864092
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ee864092
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ee864092

Branch: refs/heads/two-dot-o-import
Commit: ee8640928dc05c473bd31c286280d4aa4af194a5
Parents: a00af86
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Feb 12 10:25:01 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Feb 12 10:25:01 2015 -0500

----------------------------------------------------------------------
 .../management/importer/ImportServiceImpl.java  | 94 ++++++++++----------
 .../management/importer/ImportCollectionIT.java | 61 +++++++++----
 2 files changed, 88 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee864092/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 434d62f..bba12f1 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -684,7 +684,7 @@ public class ImportServiceImpl implements ImportService {
 
         EntityManager emManagementApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
 
-        // get the file import entity
+       // get the file import entity
 
         FileImport fileImport;
         try {
@@ -694,6 +694,9 @@ public class ImportServiceImpl implements ImportService {
             return;
         }
 
+        // tracker flushes every 100 entities
+        final FileImportTracker tracker = new FileImportTracker( emf, fileImport, 100 );
+
         String fileName = jobExecution.getJobData().getProperty("File").toString();
         UUID targetAppId = (UUID) jobExecution.getJobData().getProperty("applicationId");
 
@@ -712,20 +715,12 @@ public class ImportServiceImpl implements ImportService {
             emManagementApp.update(fileImport);
 
             if ( emManagementApp.get( targetAppId ) == null ) {
-                fileImport.setState( FileImport.State.FAILED );
-                fileImport.setErrorMessage("Application " + targetAppId + " does not exist");
-                emManagementApp.update(fileImport);
+                tracker.fatal("Application " + targetAppId + " does not exist");
                 return;
             }
 
         } catch (Exception e) {
-            fileImport.setState( FileImport.State.FAILED );
-            fileImport.setErrorMessage("Application " + targetAppId + " does not exist");
-            try {
-                emManagementApp.update( fileImport );
-            } catch (Exception e1) {
-                logger.error("Error updating fileImport to set state of file import", e1);
-            }
+            tracker.fatal("Application " + targetAppId + " does not exist");
         }
         EntityManager targetEm = emf.getEntityManager( targetAppId );
 
@@ -741,14 +736,7 @@ public class ImportServiceImpl implements ImportService {
                 s3Import = new S3ImportImpl();
             }
         } catch (Exception e) {
-            logger.error("doImport(): Error creating S3Import", e);
-            fileImport.setErrorMessage(e.getMessage());
-            fileImport.setState( FileImport.State.FAILED );
-            try {
-                emManagementApp.update(fileImport);
-            } catch (Exception e1) {
-                logger.error("Error updating file import with error information", e1);
-            }
+            tracker.fatal("Unable to connect to S3 bucket, error: " + e.getMessage());
             return;
         }
 
@@ -756,31 +744,17 @@ public class ImportServiceImpl implements ImportService {
             downloadedFile = s3Import.copyFileFromBucket(
                 fileName, bucketName, accessId, secretKey );
         } catch (Exception e) {
-            logger.error("Error downloading file " + fileName, e);
-            fileImport.setErrorMessage("Error downloading file " + fileName + ": " + e.getMessage());
-            fileImport.setState(FileImport.State.FAILED);
-            try {
-                emManagementApp.update(fileImport);
-            } catch (Exception e1) {
-                logger.error("Error updating file import with error information", e1);
-            }
+            tracker.fatal("Error downloading file " + fileName + ": " + e.getMessage());
         }
 
         // parse JSON data, create Entities and Connections from import data
 
         try {
             parseEntitiesAndConnectionsFromJson(
-                jobExecution, downloadedFile, targetEm, emManagementApp, fileImport);
+                jobExecution, downloadedFile, targetEm, emManagementApp, fileImport, tracker);
 
         } catch (Exception e) {
-            logger.error("Error importing file " + fileName, e);
-            fileImport.setErrorMessage("Error downloading file " + fileName + ": " + e.getMessage());
-            fileImport.setState( FileImport.State.FAILED );
-            try {
-                emManagementApp.update(fileImport);
-            } catch (Exception e1) {
-                logger.error("Error updating file import with error information", e1);
-            }
+            tracker.fatal("Error importing file " + fileName + ": " + e.getMessage());
         }
 
         // mark ImportJob FINISHED but only if all other FileImportJobs are complete
@@ -804,12 +778,14 @@ public class ImportServiceImpl implements ImportService {
 
             EntityManager emMgmtApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
 
-
             Query query = new Query();
             query.setEntityType(Schema.getDefaultSchema().getEntityType( FileImport.class ));
             query.setConnectionType( IMPORT_FILE_INCLUDES_CONNECTION );
             query.setLimit(MAX_FILE_IMPORTS);
 
+            // TODO: better way to wait for ES indexes to catch up
+            try { Thread.sleep(5000); } catch ( Exception intentionallyIgnored ) {}
+
             Results entities = emMgmtApp.searchConnectedEntities(importEntity, query);
 
             PagingResultsIterator itr = new PagingResultsIterator(entities);
@@ -885,11 +861,12 @@ public class ImportServiceImpl implements ImportService {
         final File file,
         final EntityManager em,
         final EntityManager rootEm,
-        final FileImport fileImport) throws Exception {
+        final FileImport fileImport,
+        final FileImportTracker tracker) throws Exception {
 
 
         // tracker flushes every 100 entities
-        final FileImportTracker tracker = new FileImportTracker( emf, fileImport, 100 );
+        //final FileImportTracker tracker = new FileImportTracker( emf, fileImport, 100 );
 
         // function to execute for each write event
         final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
@@ -945,7 +922,12 @@ public class ImportServiceImpl implements ImportService {
 
         jp.close();
 
-        logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote entities\n");
+        if ( FileImport.State.FAILED.equals( fileImport.getState() ) ) {
+            logger.debug("\n\nFailed to completely write entities, skipping second phase. File: {}\n",
+                fileImport.getFileName());
+            return;
+        }
+        logger.debug("\n\nWrote entities. File: {}\n", fileImport.getFileName() );
 
 
         // SECOND PASS: import all connections and dictionaries
@@ -984,9 +966,20 @@ public class ImportServiceImpl implements ImportService {
         logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote others for file {}\n",
             fileImport.getFileName());
 
+        if ( FileImport.State.FAILED.equals( fileImport.getState() ) ) {
+            logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): failed to completely write entities\n");
+            return;
+        }
 
         // flush the job statistics
         tracker.complete();
+
+        if ( FileImport.State.FAILED.equals( fileImport.getState() ) ) {
+            logger.debug("\n\nFailed to completely wrote connections and dictionaries. File: {}\n",
+                fileImport.getFileName());
+            return;
+        }
+        logger.debug("\n\nWrote connections and dictionaries. File: {}\n", fileImport.getFileName() );
     }
 
 
@@ -1249,15 +1242,17 @@ public class ImportServiceImpl implements ImportService {
                 logger.debug("process(): done parsing JSON");
 
             } catch (Exception e) {
-                // skip illegal entity UUID and go to next one
-                fileImport.setErrorMessage(e.getMessage());
-                try {
-                    rootEm.update(fileImport);
-                } catch (Exception ex) {
-                    logger.error("Error updating file import record", ex);
-                }
+
+                tracker.fatal("Failed to import file" + fileImport.getFileName() + " error " + e.getMessage());
+
                 if ( subscriber != null ) {
-                    subscriber.onError(e);
+
+                    // don't need to blow up here, we handled the problem
+                    // if we blow up we may prevent in-flight entities from being written
+                    // subscriber.onError(e);
+
+                    // but we are done reading entities
+                    subscriber.onCompleted();
                 }
             }
         }
@@ -1265,8 +1260,11 @@ public class ImportServiceImpl implements ImportService {
         private void processWriteEvent( final Subscriber<? super WriteEvent> subscriber, WriteEvent writeEvent ) {
 
             if ( subscriber == null ) {
+
+                // this logic makes it easy to remove Rx for debugging purposes
                 // no Rx, just do it
                 writeEvent.doWrite(em, fileImport, tracker);
+
             } else {
                 subscriber.onNext( writeEvent );
             }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee864092/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index 41d264c..fde0d8b 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.management.OrganizationInfo;
 import org.apache.usergrid.management.UserInfo;
 import org.apache.usergrid.management.export.ExportService;
 import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.entities.FileImport;
 import org.apache.usergrid.persistence.entities.Import;
 import org.apache.usergrid.persistence.index.impl.ElasticSearchResource;
 import org.apache.usergrid.persistence.index.query.Query;
@@ -342,37 +343,49 @@ public class ImportCollectionIT {
      * TODO: Test that importing bad JSON will result in an informative error message.
      */
     @Test
-    public void testImportBadJson() throws Exception{
-        // import from a bad JSON file
+    public void testImportBadJson() throws Exception {
 
         deleteBucket();
 
-        //list out all the files in the resource directory you want uploaded
-        List<String> filenames = new ArrayList<>( 1 );
+        // export and upload a bad JSON file to the S3 bucket
 
+        List<String> filenames = new ArrayList<>( 1 );
         filenames.add( "testimport-bad-json.json");
 
-        // create 10 applications each with collection of 10 things, export all to S3
         S3Upload s3Upload = new S3Upload();
         s3Upload.copyToS3(
             System.getProperty(SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR),
             System.getProperty(SDKGlobalConfiguration.SECRET_KEY_ENV_VAR),
             bucketName, filenames );
 
-        // import all those exports from S3 into the default test application
+        // import bad JSON from from the S3 bucket
 
         final EntityManager emDefaultApp = setup.getEmf().getEntityManager( applicationId );
-        importCollection( emDefaultApp );
+        UUID importId = importCollection( emDefaultApp );
 
-        // we should now have 100 Entities in the default app
+
+        // check that we got an informative error message back
 
         List<Entity> importedThings = emDefaultApp.getCollection(
             emDefaultApp.getApplicationId(), "things", null, Level.ALL_PROPERTIES).getEntities();
 
-        assertTrue( importedThings.isEmpty() );
-      //  assertEquals( , importedThings.size() );
+        assertTrue("No entities should have been imported", importedThings.isEmpty());
+
+        ImportService importService = setup.getImportService();
+        Results results = importService.getFileImports( applicationId, importId, null, null );
+
+        assertEquals( "There is one", 1, results.size() );
+
+        assertEquals( "Entity is FileImport object",
+            FileImport.class, results.getEntity().getClass() );
 
-        // check that error message indicates JSON parsing error
+        FileImport fileImport = (FileImport)results.getEntity();
+
+        assertEquals( "File name is correct",
+            "testimport-bad-json.json", fileImport.getFileName());
+
+        assertTrue( "Error message is correct",
+            fileImport.getErrorMessage().startsWith("Unexpected character ('<' (code 60))"));
     }
 
     @Test
@@ -432,7 +445,7 @@ public class ImportCollectionIT {
    //---------------------------------------------------------------------------------------------
 
 
-    private void importCollection(final EntityManager em ) throws Exception {
+    private UUID importCollection(final EntityManager em ) throws Exception {
 
         logger.debug("\n\nImport into new app {}\n", em.getApplication().getName() );
 
@@ -454,19 +467,25 @@ public class ImportCollectionIT {
             }} );
         }} );
 
-
-
-        int maxRetries = 60;
+        int maxRetries = 30;
         int retries = 0;
-        while (     !importService.getState( importEntity.getUuid() ).equals( "FINISHED" )
-                 && !importService.getState( importEntity.getUuid() ).equals( "FAILED" )
+        Import.State state = importService.getState(importEntity.getUuid());
+        while (     !state.equals( Import.State.FINISHED )
+                 && !state.equals( Import.State.FAILED )
                  && retries++ < maxRetries ) {
 
-            logger.debug("Waiting for import...");
+            logger.debug("Waiting for import ({}) ...", state.toString());
             Thread.sleep(1000);
+
+            state = importService.getState(importEntity.getUuid());
         }
 
+        if ( retries >= maxRetries ) {
+            throw new RuntimeException("Max retries reached");
+        }
         em.refreshIndex();
+
+        return importEntity.getUuid();
     }
 
 
@@ -499,12 +518,16 @@ public class ImportCollectionIT {
             }});
         }});
 
-        int maxRetries = 60;
+        int maxRetries = 30;
         int retries = 0;
         while ( !exportService.getState( exportUUID ).equals( "FINISHED" ) && retries++ < maxRetries ) {
             logger.debug("Waiting for export...");
             Thread.sleep(1000);
         }
+
+        if ( retries >= maxRetries ) {
+            throw new RuntimeException("Max retries reached");
+        }
     }
 
 


[2/2] incubator-usergrid git commit: Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import

Posted by sn...@apache.org.
Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import

Conflicts:
	stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c65918dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c65918dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c65918dd

Branch: refs/heads/two-dot-o-import
Commit: c65918dd6f352feaf5e55a5047f81abb45e315e6
Parents: ee86409 6508649
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Feb 12 10:39:38 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Feb 12 10:39:38 2015 -0500

----------------------------------------------------------------------
 .../rest/management/ImportResourceIT.java       |  43 +++---
 .../management/importer/ImportServiceImpl.java  | 131 ++++++++++++-------
 2 files changed, 107 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c65918dd/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index bba12f1,43769aa..39cd480
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@@ -18,7 -18,7 +18,6 @@@
  package org.apache.usergrid.management.importer;
  
  import com.google.common.base.Preconditions;
--import org.apache.commons.lang.RandomStringUtils;
  import org.apache.usergrid.batch.JobExecution;
  import org.apache.usergrid.batch.service.SchedulerService;
  import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@@ -389,7 -389,7 +388,8 @@@ public class ImportServiceImpl implemen
              return entities.size();
  
              // see ImportConnectsTest()
--//            Results entities = emMgmtApp.getConnectedEntities( importRoot, "includes", null, Level.ALL_PROPERTIES );
++//            Results entities = emMgmtApp.getConnectedEntities(
++//              importRoot, "includes", null, Level.ALL_PROPERTIES );
  //            PagingResultsIterator itr = new PagingResultsIterator( entities );
  //            int count = 0;
  //            while ( itr.hasNext() ) {
@@@ -404,9 -404,9 +404,10 @@@
          }
      }
  
++
      /**
--     * Schedule the file tasks.  This must happen in 2 phases.  The first is linking the sub files to the master the
--     * second is scheduling them to run.
++     * Schedule the file tasks.  This must happen in 2 phases.  The first is linking the
++     * sub files to the master the second is scheduling them to run.
       */
      private JobData scheduleFileTasks( final JobData jobData ) {
  
@@@ -416,6 -416,6 +417,7 @@@
          return sch.createJob(FILE_IMPORT_JOB_NAME, soonestPossible, jobData);
      }
  
++
      /**
       * Query Entity Manager for the state of the Import Entity. This corresponds to the GET /import
       */
@@@ -437,6 -437,6 +439,7 @@@
          return importUG.getState();
      }
  
++
      /**
       * Query Entity Manager for the error message generated for an import job.
       */
@@@ -462,6 -462,6 +465,7 @@@
          return importUG.getErrorMessage();
      }
  
++
      /**
       * Returns the Import Entity that stores all meta-data for the particular import Job
       *
@@@ -484,11 -484,11 +488,8 @@@
       */
      @Override
      public FileImport getFileImportEntity(final JobExecution jobExecution) throws Exception {
--
          UUID fileImportId = (UUID) jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
--
          EntityManager em = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
--
          return em.get(fileImportId, FileImport.class);
      }
  
@@@ -720,8 -719,18 +721,11 @@@
              }
  
          } catch (Exception e) {
 -            fileImport.setState( FileImport.State.FAILED );
 -            fileImport.setErrorMessage("Application " + targetAppId + " does not exist");
 -            try {
 -                emManagementApp.update( fileImport );
 -            } catch (Exception e1) {
 -                logger.error("Error updating fileImport to set state of file import", e1);
 -            }
 -
 +            tracker.fatal("Application " + targetAppId + " does not exist");
+             checkIfComplete( emManagementApp, fileImport );
+             return;
          }
+ 
          EntityManager targetEm = emf.getEntityManager( targetAppId );
  
          // download file from S3, if no S3 importer was passed in then create one
@@@ -736,7 -745,16 +740,8 @@@
                  s3Import = new S3ImportImpl();
              }
          } catch (Exception e) {
 -            logger.error( "doImport(): Error creating S3Import", e );
 -            fileImport.setErrorMessage( e.getMessage() );
 -            fileImport.setState( FileImport.State.FAILED );
 -            try {
 -                emManagementApp.update(fileImport);
 -            } catch (Exception e1) {
 -                logger.error("Error updating file import with error information", e1);
 -            }
 +            tracker.fatal("Unable to connect to S3 bucket, error: " + e.getMessage());
+             checkIfComplete( emManagementApp, fileImport );
 -
              return;
          }
  
@@@ -744,58 -762,102 +749,80 @@@
              downloadedFile = s3Import.copyFileFromBucket(
                  fileName, bucketName, accessId, secretKey );
          } catch (Exception e) {
 -            logger.error("Error downloading file " + fileName, e);
 -            fileImport.setErrorMessage("Error downloading file " + fileName + ": " + e.getMessage());
 -            fileImport.setState(FileImport.State.FAILED);
 -            try {
 -                emManagementApp.update(fileImport);
 -            } catch (Exception e1) {
 -                logger.error("Error updating file import with error information", e1);
 -            }
 -
 +            tracker.fatal("Error downloading file " + fileName + ": " + e.getMessage());
+             checkIfComplete( emManagementApp, fileImport );
+             return;
          }
  
          // parse JSON data, create Entities and Connections from import data
  
          try {
              parseEntitiesAndConnectionsFromJson(
 -                jobExecution, downloadedFile, targetEm, emManagementApp, fileImport);
 +                jobExecution, downloadedFile, targetEm, emManagementApp, fileImport, tracker);
  
          } catch (Exception e) {
 -            logger.error("Error importing file " + fileName, e);
 -            fileImport.setErrorMessage("Error downloading file " + fileName + ": " + e.getMessage());
 -            fileImport.setState( FileImport.State.FAILED );
 -            try {
 -                emManagementApp.update(fileImport);
 -            } catch (Exception e1) {
 -                logger.error("Error updating file import with error information", e1);
 -            }
 -
 +            tracker.fatal("Error importing file " + fileName + ": " + e.getMessage());
          }
  
-         // mark ImportJob FINISHED but only if all other FileImportJobs are complete
+         checkIfComplete( emManagementApp, fileImport );
 -
 -
+     }
+ 
  
-         // get parent import job of this file import job
+     private Import getImportEntity( final EntityManager rootEm, final FileImport fileImport ) {
+         try {
+             Results importJobResults =
 -                rootEm.getConnectingEntities( fileImport, IMPORT_FILE_INCLUDES_CONNECTION, null, Level.ALL_PROPERTIES );
++                rootEm.getConnectingEntities( fileImport, IMPORT_FILE_INCLUDES_CONNECTION,
++                    null, Level.ALL_PROPERTIES );
  
-         String randTag = RandomStringUtils.randomAlphanumeric(4); // for logging
+             List<Entity> importEntities = importJobResults.getEntities();
 -            UUID importId = importEntities.get( 0 ).getUuid();
 -
+             final Import importEntity = ( Import ) importEntities.get( 0 ).toTypedEntity();
 -
 -            //        final Import importEntity = rootEm.get(importId, Import.class);
 -
+             return importEntity;
+         }
+         catch ( Exception e ) {
+             throw new RuntimeException( "Unable to import entity" );
+         }
+     }
  
+     /**
+      * Check if we're the last job on failure
 -     * @param rootEm
 -     * @param fileImport
+      */
 -    private void checkIfComplete( final EntityManager rootEm, final FileImport fileImport ) {
++    private void checkIfComplete( final EntityManager emMgmtApp, final FileImport fileImport ) {
          int failCount = 0;
          int successCount = 0;
-         Import importEntity = null;
+ 
 -        final Import importEntity = getImportEntity( rootEm, fileImport );
++        final Import importEntity = getImportEntity( emMgmtApp, fileImport );
+ 
          try {
  
-             Results importJobResults =
-                 emManagementApp.getConnectingEntities( fileImport, IMPORT_FILE_INCLUDES_CONNECTION, null, Level.ALL_PROPERTIES );
-             List<Entity> importEntities = importJobResults.getEntities();
-             UUID importId = importEntities.get(0).getUuid();
-             importEntity = emManagementApp.get(importId, Import.class);
 -            logger.debug( "Got importEntity {}", importEntity.getUuid() );
++            // wait for query index to catch up
  
-             logger.debug("{} Got importEntity {}", randTag, importEntity.getUuid());
 -            EntityManager emMgmtApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
++            // TODO: better way to wait for indexes to catch up
++            try { Thread.sleep(5000); } catch ( Exception intentionallyIgnored ) {}
  
-             EntityManager emMgmtApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
++            // get file import entities for this import job
  
              Query query = new Query();
-             query.setEntityType(Schema.getDefaultSchema().getEntityType( FileImport.class ));
+             query.setEntityType( Schema.getDefaultSchema().getEntityType( FileImport.class ) );
              query.setConnectionType( IMPORT_FILE_INCLUDES_CONNECTION );
-             query.setLimit(MAX_FILE_IMPORTS);
+             query.setLimit( MAX_FILE_IMPORTS );
  
-             // TODO: better way to wait for ES indexes to catch up
-             try { Thread.sleep(5000); } catch ( Exception intentionallyIgnored ) {}
+             Results entities = emMgmtApp.searchConnectedEntities( importEntity, query );
 -
+             PagingResultsIterator itr = new PagingResultsIterator( entities );
  
-             Results entities = emMgmtApp.searchConnectedEntities(importEntity, query);
 -            //TODO Dave, I found a race condition here with the query module and ES indexing latency.  Aside from wait, I'm not sure what else we can do.
 -            if(!itr.hasNext()){
 -                logger.warn( "Just processed a file, but we were unable to get it back in a connection, this is a bug" );
++            if ( !itr.hasNext() ) {
++                logger.warn("Found no FileImport entities for import {}, " +
++                    "unable to check if complete", importEntity.getUuid());
+                 return;
+             }
  
-             PagingResultsIterator itr = new PagingResultsIterator(entities);
 -            logger.debug( "Check {} jobs to see if we are done for file {}",
++            logger.debug( "Checking {} file import jobs to see if we are done for file {}",
+                 new Object[] { entities.size(), fileImport.getFileName() } );
  
-             logger.debug("{} Check {} jobs to see if we are done for file {}",
-                 new Object[]{randTag, entities.size(), fileImport.getFileName()});
++            // loop through entities, count different types of status
  
-             while (itr.hasNext()) {
-                 FileImport fi = (FileImport) itr.next();
-                 switch (fi.getState()) {
+             while ( itr.hasNext() ) {
+                 FileImport fi = ( FileImport ) itr.next();
+                 switch ( fi.getState() ) {
                      case FAILED:     // failed, but we may not be complete so continue checking
                          failCount++;
                          break;
@@@ -803,35 -865,40 +830,40 @@@
                          successCount++;
                          continue;
                      default:         // not something we recognize as complete, short circuit
-                         logger.debug("{} not done yet, bail out...", randTag);
-                         return;
+                         logger.debug( "not done yet, bail out..." ); return;
                  }
              }
- 
-         } catch ( Exception e ) {
+         }
+         catch ( Exception e ) {
              failCount++;
              if ( importEntity != null ) {
-                 importEntity.setErrorMessage("Error determining status of file import jobs");
+                 importEntity.setErrorMessage( "Error determining status of file import jobs" );
              }
-             logger.debug("Error determining status of file import jobs", e);
+             logger.debug( "Error determining status of file import jobs", e );
          }
  
-         logger.debug("{} successCount = {} failCount = {}", new Object[] { randTag, successCount, failCount } );
+         logger.debug( "successCount = {} failCount = {}", new Object[] { successCount, failCount } );
  
-         if ( importEntity != null && failCount == 0 ) {
-             logger.debug("{} FINISHED", randTag);
-             importEntity.setState(Import.State.FINISHED);
+         if ( importEntity != null ) {
+             logger.debug( "FINISHED" );
  
-         }  else if ( importEntity != null ) {
-             // we had failures, set it to failed
-             importEntity.setState(Import.State.FAILED);
-         }
+             if ( failCount == 0 ) {
+                 importEntity.setState( Import.State.FINISHED );
+             }
+             else {
+                 // we had failures, set it to failed
+                 importEntity.setState( Import.State.FAILED );
+             }
  
-         try {
-             emManagementApp.update( importEntity );
-         } catch (Exception e) {
-             logger.error("Error updating import entity", e);
+             try {
 -                rootEm.update( importEntity );
++                emMgmtApp.update( importEntity );
+             }
+             catch ( Exception e ) {
+                 logger.error( "Error updating import entity", e );
+             }
          }
+ 
+ 
      }
  
  
@@@ -973,13 -1030,6 +1005,13 @@@
  
          // flush the job statistics
          tracker.complete();
 +
 +        if ( FileImport.State.FAILED.equals( fileImport.getState() ) ) {
 +            logger.debug("\n\nFailed to completely wrote connections and dictionaries. File: {}\n",
 +                fileImport.getFileName());
 +            return;
 +        }
-         logger.debug("\n\nWrote connections and dictionaries. File: {}\n", fileImport.getFileName() );
++        logger.debug("\n\nWrote connections and dictionaries. File: {}\n", fileImport.getFileName());
      }
  
  
@@@ -1226,7 -1276,7 +1258,8 @@@
                          }
  
                      }  else if (token.equals( JsonToken.START_ARRAY )) {
--                         if ( objectNameStack.size() == 1 && COLLECTION_OBJECT_NAME.equals( objectNameStack.peek() )) {
++                         if ( objectNameStack.size() == 1
++                                && COLLECTION_OBJECT_NAME.equals( objectNameStack.peek() )) {
                              entityType = InflectionUtils.singularize( name );
                           }