You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/02/06 20:04:07 UTC

incubator-usergrid git commit: Fixed bug where there is a race condition in scheduling.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-import f23dd7686 -> 2e05f2496


Fixed bug where there is a race condition in scheduling.

Fixes bug in test cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2e05f249
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2e05f249
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2e05f249

Branch: refs/heads/two-dot-o-import
Commit: 2e05f2496d1f20d76d1a1a8886d5199524726720
Parents: f23dd76
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Feb 6 12:04:05 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Feb 6 12:04:05 2015 -0700

----------------------------------------------------------------------
 .../persistence/entities/FileImport.java        |  22 --
 .../resources/usergrid-custom-test.properties   |   7 +-
 .../management/importer/FileImportTracker.java  |   5 +-
 .../management/importer/ImportServiceImpl.java  | 254 +++++++++----------
 .../management/importer/ImportCollectionIT.java |  20 +-
 5 files changed, 145 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2e05f249/stack/core/src/main/java/org/apache/usergrid/persistence/entities/FileImport.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/FileImport.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/FileImport.java
index 2f6ec87..9a04dd4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/FileImport.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/FileImport.java
@@ -59,11 +59,6 @@ public class FileImport extends TypedEntity {
     @EntityProperty
     protected String collectionName;
 
-    /**
-     * File completion Status
-     */
-    @EntityProperty
-    protected Boolean completed;
 
     /**
      * LastUpdatedUUID
@@ -89,7 +84,6 @@ public class FileImport extends TypedEntity {
 
 
     public FileImport() {
-        setCompleted(false);
         setLastUpdatedUUID(" ");
         setErrorMessage(" ");
         setState(FileImport.State.CREATED);
@@ -120,22 +114,6 @@ public class FileImport extends TypedEntity {
     }
 
     /**
-     * Get the completed status of the file i.e. if the file is completely parsed or not.
-     * @return completed status
-     */
-    public Boolean getCompleted() {
-        return completed;
-    }
-
-    /**
-     * Get the completed status of the file i.e. if the file is completely parsed or not.
-     * @param completed Boolean indicating whether parsing this file is complete or not
-     */
-    public void setCompleted(final Boolean completed) {
-        this.completed = completed;
-    }
-
-    /**
      * gets the state of the current job
      * @return state
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2e05f249/stack/core/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/usergrid-custom-test.properties b/stack/core/src/test/resources/usergrid-custom-test.properties
index d093ca8..ae614db 100644
--- a/stack/core/src/test/resources/usergrid-custom-test.properties
+++ b/stack/core/src/test/resources/usergrid-custom-test.properties
@@ -12,8 +12,8 @@
 
 # Core module test properties
 
-# these settings allow tests to run and consistently pass on 16GB MacBook Pro 
-# with ug.heapmax=5000m and ug.heapmin=3000m (set in Maven settings.xml) 
+# these settings allow tests to run and consistently pass on 16GB MacBook Pro
+# with ug.heapmax=5000m and ug.heapmin=3000m (set in Maven settings.xml)
 cassandra.startup=external
 elasticsearch.startup=external
 cassandra.timeout=2000
@@ -23,3 +23,6 @@ hystrix.threadpool.graph_async.coreSize=50
 
 collection.task.pool.threadsize=8
 collection.task.pool.queuesize=8
+
+
+usergrid.scheduler.job.workers=1

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2e05f249/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportTracker.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportTracker.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportTracker.java
index cda01a6..e52d850 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportTracker.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportTracker.java
@@ -292,14 +292,17 @@ public class FileImportTracker {
             final long writtenEntities = entitiesWritten.get();
             final long failedEntities = entitiesFailed.get();
 
-            final long writtenConnections = connectionsFailed.get();
+            final long writtenConnections = connectionsWritten.get();
             final long failedConnections = connectionsFailed.get();
 
 
             fileImport.setImportedEntityCount( writtenEntities );
             fileImport.setFailedEntityCount( failedEntities );
+
             fileImport.setImportedConnectionCount( writtenConnections );
             fileImport.setFailedConnectionCount( failedConnections );
+
+
             fileImport.setState( state );
             fileImport.setErrorMessage( message );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2e05f249/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 148a6ab..51e9a09 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -31,7 +31,6 @@ import org.apache.usergrid.persistence.entities.JobData;
 
 import org.apache.usergrid.utils.InflectionUtils;
 import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.JsonToken;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -40,15 +39,12 @@ import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Subscriber;
 import rx.functions.Action1;
-import rx.functions.Action2;
 import rx.functions.Func1;
 import rx.functions.Func2;
 import rx.schedulers.Schedulers;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.PostConstruct;
 
@@ -172,7 +168,7 @@ public class ImportServiceImpl implements ImportService {
      * @return it returns the UUID of the scheduled job
      * @throws Exception
      */
-    public UUID scheduleFile(Map<String, Object> config, String file, EntityRef importRef) throws Exception {
+    public JobData createFileTask( Map<String, Object> config, String file, EntityRef importRef ) throws Exception {
 
         logger.debug("scheduleFile() for import {}:{} file {}",
             new Object[]{importRef.getType(), importRef.getType(), file});
@@ -203,31 +199,37 @@ public class ImportServiceImpl implements ImportService {
         }
 
         // mark the File Import Job as created
-        fileImport.setState(FileImport.State.CREATED);
-        rootEm.update(fileImport);
+        fileImport.setState( FileImport.State.CREATED );
+        rootEm.update( fileImport );
 
         //set data to be transferred to the FileImport Job
         JobData jobData = new JobData();
-        jobData.setProperty("File", file);
+        jobData.setProperty( "File", file );
         jobData.setProperty(FILE_IMPORT_ID, fileImport.getUuid());
         jobData.addProperties(config);
 
-        long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
 
-        // TODO SQS: Tear this part out and set the new job to be taken in here
-        // schedule file import job
-        sch.createJob(FILE_IMPORT_JOB_NAME, soonestPossible, jobData);
-
-        //probably how it should work
-        ImportQueueMessage message = new ImportQueueMessage( fileImport.getUuid(),
-            (UUID) config.get( "applicationId" ) ,file );
-        qm.sendMessage( message );
 
         //update state of the job to Scheduled
         fileImport.setState(FileImport.State.SCHEDULED);
         rootEm.update(fileImport);
 
-        return fileImport.getUuid();
+        return jobData;
+    }
+
+
+    /**
+     * Schedule the file tasks.  This must happen in 2 phases.  The first is linking the sub files to the master the
+     * second is scheduling them to run.
+     */
+    public JobData scheduleFileTasks( final JobData jobData ) {
+
+
+        long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
+
+        // TODO SQS: Tear this part out and set the new job to be taken in here
+        // schedule file import job
+        return sch.createJob( FILE_IMPORT_JOB_NAME, soonestPossible, jobData );
     }
 
     /**
@@ -294,7 +296,7 @@ public class ImportServiceImpl implements ImportService {
         UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
         EntityManager importManager = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
 
-        return importManager.get(importId, Import.class);
+        return importManager.get( importId, Import.class );
     }
 
 
@@ -380,13 +382,13 @@ public class ImportServiceImpl implements ImportService {
         UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
 
         EntityManager rooteEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
-        Import importUG = rooteEm.get(importId, Import.class);
+        Import rootImportTask = rooteEm.get(importId, Import.class);
 
         //update the entity state to show that the job has officially started.
-        importUG.setState(Import.State.STARTED);
-        importUG.setStarted(System.currentTimeMillis());
-        importUG.setErrorMessage(" ");
-        rooteEm.update(importUG);
+        rootImportTask.setState( Import.State.STARTED );
+        rootImportTask.setStarted( System.currentTimeMillis() );
+        rootImportTask.setErrorMessage( " " );
+        rooteEm.update(rootImportTask);
         try {
             if (s3PlaceHolder != null) {
                 s3Import = (S3Import) s3PlaceHolder;
@@ -395,9 +397,9 @@ public class ImportServiceImpl implements ImportService {
             }
         } catch (Exception e) {
             logger.error("doImport(): S3Import doesn't exist");
-            importUG.setErrorMessage(e.getMessage());
-            importUG.setState(Import.State.FAILED);
-            rooteEm.update(importUG);
+            rootImportTask.setErrorMessage( e.getMessage() );
+            rootImportTask.setState( Import.State.FAILED );
+            rooteEm.update(rootImportTask);
             return;
         }
 
@@ -409,9 +411,9 @@ public class ImportServiceImpl implements ImportService {
 
             if (config.get("organizationId") == null) {
                 logger.error("doImport(): No organization could be found");
-                importUG.setErrorMessage("No organization could be found");
-                importUG.setState(Import.State.FAILED);
-                rooteEm.update(importUG);
+                rootImportTask.setErrorMessage( "No organization could be found" );
+                rootImportTask.setState( Import.State.FAILED );
+                rooteEm.update(rootImportTask);
                 return;
 
             } else {
@@ -443,16 +445,16 @@ public class ImportServiceImpl implements ImportService {
             }
 
         } catch (OrganizationNotFoundException | ApplicationNotFoundException e) {
-            importUG.setErrorMessage(e.getMessage());
-            importUG.setState(Import.State.FAILED);
-            rooteEm.update(importUG);
+            rootImportTask.setErrorMessage( e.getMessage() );
+            rootImportTask.setState( Import.State.FAILED );
+            rooteEm.update(rootImportTask);
             return;
         }
 
         if (files.size() == 0) {
-            importUG.setState(Import.State.FINISHED);
-            importUG.setErrorMessage("no files found in the bucket with the relevant context");
-            rooteEm.update(importUG);
+            rootImportTask.setState( Import.State.FINISHED );
+            rootImportTask.setErrorMessage( "no files found in the bucket with the relevant context" );
+            rooteEm.update(rootImportTask);
 
         } else {
 
@@ -460,22 +462,34 @@ public class ImportServiceImpl implements ImportService {
 
             ArrayList<Map<String, Object>> value = new ArrayList<Map<String, Object>>();
 
-            // schedule each file as a separate job
+            final List<JobData> fileJobs = new ArrayList<>(files.size());
+
+            //create the connection and set up metadata
             for (File file : files) {
 
                 // TODO SQS: replace the method inside here so that it uses sqs instead of internal q
+                final JobData jobData = createFileTask( config, file.getPath(), rootImportTask );
 
-                UUID jobID = scheduleFile(config, file.getPath(), importUG);
+                fileJobs.add( jobData) ;
+
+            }
+
+            //schedule each job
+            for(JobData jobData: fileJobs){
+
+                final JobData scheduled = scheduleFileTasks( jobData );
 
                 Map<String, Object> fileJobID = new HashMap<String, Object>();
-                fileJobID.put("FileName", file.getName());
-                fileJobID.put("JobID", jobID.toString());
+                            fileJobID.put("FileName", scheduled.getProperty( "File" ));
+                            fileJobID.put("JobID", scheduled.getUuid());
                 value.add(fileJobID);
             }
 
+
+
             fileMetadata.put("files", value);
-            importUG.addProperties(fileMetadata);
-            rooteEm.update(importUG);
+            rootImportTask.addProperties( fileMetadata );
+            rooteEm.update(rootImportTask);
         }
     }
 
@@ -490,7 +504,7 @@ public class ImportServiceImpl implements ImportService {
         logger.debug("importCollectionFromOrgApp()");
 
         //retrieves import entity
-        Import importUG = getImportEntity(jobExecution);
+        Import importUG = getImportEntity( jobExecution );
 
         ApplicationInfo application = managementService.getApplicationInfo(applicationUUID);
         if (application == null) {
@@ -504,8 +518,8 @@ public class ImportServiceImpl implements ImportService {
 
         String collectionName = config.get("collectionName").toString();
 
-        String appFileName = prepareCollectionInputFileName(
-            organizationInfo.getName(), application.getName(), collectionName);
+        String appFileName = prepareCollectionInputFileName( organizationInfo.getName(), application.getName(),
+            collectionName );
 
         return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.COLLECTION);
 
@@ -521,7 +535,7 @@ public class ImportServiceImpl implements ImportService {
         //retrieves import entity
         Import importUG = getImportEntity(jobExecution);
 
-        ApplicationInfo application = managementService.getApplicationInfo(applicationId);
+        ApplicationInfo application = managementService.getApplicationInfo( applicationId );
         if (application == null) {
             throw new ApplicationNotFoundException("Application Not Found");
         }
@@ -531,8 +545,7 @@ public class ImportServiceImpl implements ImportService {
             throw new OrganizationNotFoundException("Organization Not Found");
         }
 
-        String appFileName = prepareApplicationInputFileName(
-            organizationInfo.getName(), application.getName());
+        String appFileName = prepareApplicationInputFileName( organizationInfo.getName(), application.getName() );
 
         return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.APPLICATION);
 
@@ -548,13 +561,13 @@ public class ImportServiceImpl implements ImportService {
         // retrieves import entity
         Import importUG = getImportEntity(jobExecution);
 
-        OrganizationInfo organizationInfo = managementService.getOrganizationByUuid(organizationUUID);
+        OrganizationInfo organizationInfo = managementService.getOrganizationByUuid( organizationUUID );
         if (organizationInfo == null) {
             throw new OrganizationNotFoundException("Organization Not Found");
         }
 
         // prepares the prefix path for the files to be import depending on the endpoint being hit
-        String appFileName = prepareOrganizationInputFileName(organizationInfo.getName());
+        String appFileName = prepareOrganizationInputFileName( organizationInfo.getName() );
 
         return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.ORGANIZATION);
 
@@ -609,7 +622,7 @@ public class ImportServiceImpl implements ImportService {
     // TODO: ImportService should not have to know about JobExecution
     public void parseFileToEntities(JobExecution jobExecution) throws Exception {
 
-        FileImport fileImport = getFileImportEntity(jobExecution);
+        FileImport fileImport = getFileImportEntity( jobExecution );
         File file = new File(jobExecution.getJobData().getProperty("File").toString());
         UUID targetAppId = (UUID) jobExecution.getJobData().getProperty("applicationId");
 
@@ -617,103 +630,87 @@ public class ImportServiceImpl implements ImportService {
     }
 
 
-    public void parseFileToEntities(final JobExecution execution, final FileImport fileImport, final File file, final UUID targetAppId ) throws Exception {
+    public void parseFileToEntities( final JobExecution execution, final FileImport fileImport, final File file,
+                                     final UUID targetAppId ) throws Exception {
 
-        logger.debug("parseFileToEntities() for file {} ", file.getAbsolutePath());
+        logger.debug( "parseFileToEntities() for file {} ", file.getAbsolutePath() );
 
-        EntityManager emManagementApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
-        emManagementApp.update(fileImport);
+        EntityManager emManagementApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
+        emManagementApp.update( fileImport );
 
-        boolean completed = fileImport.getCompleted();
+        //already done, nothing to do
+        if ( FileImport.State.FAILED.equals( fileImport.getState() ) || FileImport.State.FINISHED
+            .equals( fileImport.getState() ) ) {
+            return;
+        }
 
-        // on resume, completed files will not be traversed again
-        if (!completed) {
 
-            // validates the JSON structure
-            //TODO Dave, do we really want to validate this up front?  This will require us to download the file 3x
-            if (isValidJSON(file, emManagementApp, fileImport)) {
+        // mark the File import job as started
+        fileImport.setState( FileImport.State.STARTED );
+        emManagementApp.update( fileImport );
 
-                // mark the File import job as started
-                fileImport.setState(FileImport.State.STARTED);
-                emManagementApp.update(fileImport);
+        if ( emManagementApp.get( targetAppId ) == null ) {
+            throw new IllegalArgumentException( "Application does not exist: " + targetAppId.toString() );
+        }
+        EntityManager targetEm = emf.getEntityManager( targetAppId );
+        logger.debug( "   importing into app {} file {}", targetAppId.toString(), file.getAbsolutePath() );
 
-                if (emManagementApp.get(targetAppId) == null) {
-                    throw new IllegalArgumentException("Application does not exist: " + targetAppId.toString());
-                }
-                EntityManager targetEm = emf.getEntityManager(targetAppId);
-                logger.debug("   importing into app {} file {}", targetAppId.toString(), file.getAbsolutePath());
+        importEntitiesFromFile( execution, file, targetEm, emManagementApp, fileImport );
 
-                importEntitiesFromFile(execution,  file, targetEm, emManagementApp, fileImport );
 
+        // Updates the state of file import job
 
-                // Updates the state of file import job
-                if (!fileImport.getState().toString().equals("FAILED")) {
 
-                    // mark file as completed
-                    fileImport.setCompleted(true);
-                    fileImport.setState(FileImport.State.FINISHED);
-                    emManagementApp.update(fileImport);
+        // check other files status and mark the status of
+        // import Job as Finished if all others are finished
+        Results ImportJobResults =
+            emManagementApp.getConnectingEntities( fileImport, "includes", null, Level.ALL_PROPERTIES );
 
-                    // check other files status and mark the status of
-                    // import Job as Finished if all others are finished
-                    Results ImportJobResults = emManagementApp.getConnectingEntities(
-                        fileImport, "includes", null, Level.ALL_PROPERTIES);
+        List<Entity> importEntity = ImportJobResults.getEntities();
+        UUID importId = importEntity.get( 0 ).getUuid();
+        Import importUG = emManagementApp.get( importId, Import.class );
 
-                    List<Entity> importEntity = ImportJobResults.getEntities();
-                    UUID importId = importEntity.get(0).getUuid();
-                    Import importUG = emManagementApp.get(importId, Import.class);
+        Results entities = emManagementApp.getConnectedEntities( importUG, "includes", null, Level.ALL_PROPERTIES );
 
-                    Results entities = emManagementApp.getConnectedEntities(importUG, "includes", null, Level.ALL_PROPERTIES);
-                    List<Entity> importFile = entities.getEntities();
+        PagingResultsIterator itr = new PagingResultsIterator( entities );
 
-                    int count = 0;
-                    for (Entity eachEntity : importFile) {
-                        FileImport fi = emManagementApp.get(eachEntity.getUuid(), FileImport.class);
-                        if (fi.getState().toString().equals("FINISHED")) {
-                            count++;
-                        } else if (fi.getState().toString().equals("FAILED")) {
-                            importUG.setState(Import.State.FAILED);
-                            emManagementApp.update(importUG);
-                            break;
-                        }
-                    }
-                    if (count == importFile.size()) {
-                        importUG.setState(Import.State.FINISHED);
-                        emManagementApp.update(importUG);
-                    }
-                }
+
+        int failCount = 0;
+
+        while ( itr.hasNext() ) {
+            FileImport fi = ( FileImport ) itr.next();
+
+
+            switch ( fi.getState() ) {
+                //failed, but we may not be complete so continue checking
+                case FAILED:
+                    failCount++;
+                    break;
+                //finished, we can continue checking
+                case FINISHED:
+                    break;
+                //not something we recognize as complete, short circuit
+                default:
+                    return;
             }
         }
-    }
 
-    /**
-     * Checks if a file is a valid JSON
-     *
-     * @param collectionFile the file being validated
-     * @param rootEm         the Entity Manager for the Management application
-     * @param fileImport     the file import entity
-     */
-    private boolean isValidJSON(File collectionFile, EntityManager rootEm, FileImport fileImport)
-        throws Exception {
 
-        boolean valid = false;
-        try {
-            final JsonParser jp = jsonFactory.createJsonParser(collectionFile);
-            while (jp.nextToken() != null) {
-            }
-            valid = true;
-        } catch (JsonParseException e) {
-            e.printStackTrace();
-            fileImport.setErrorMessage(e.getMessage());
-            rootEm.update(fileImport);
-        } catch (IOException e) {
-            fileImport.setErrorMessage(e.getMessage());
-            rootEm.update(fileImport);
+        if ( failCount == 0 ) {
+            importUG.setState( Import.State.FINISHED );
         }
-        return valid;
+
+        //we had failures, set it to failed
+        else {
+            importUG.setState( Import.State.FAILED );
+        }
+
+        emManagementApp.update( importUG );
     }
 
 
+
+
     /**
      * Gets the JSON parser for given file
      *
@@ -784,7 +781,7 @@ public class ImportServiceImpl implements ImportService {
         // potentially skip the first n if this is a resume operation
         final int entityNumSkip = (int)tracker.getTotalEntityCount();
 
-        entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
+       final int entityCount =  entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
             @Override
             public Boolean call( final WriteEvent writeEvent ) {
                 return !tracker.shouldStopProcessingEntities();
@@ -812,13 +809,14 @@ public class ImportServiceImpl implements ImportService {
         // TODO: move JSON parser into observable creation so open/close happens within the stream
         final JsonEntityParserObservable jsonObservableOther =
             new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
+
         final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
 
         // only take while our stats tell us we should continue processing
         // potentially skip the first n if this is a resume operation
         final int connectionNumSkip = (int)tracker.getTotalConnectionCount();
 
-        otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
+        final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
             @Override
             public Boolean call( final WriteEvent writeEvent ) {
                 return !tracker.shouldStopProcessingConnections();
@@ -834,7 +832,7 @@ public class ImportServiceImpl implements ImportService {
 
         logger.debug("\n\nimportEntitiesFromFile(): Wrote others\n");
 
-        
+
         // flush the job statistics
         tracker.complete();
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2e05f249/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index c0d215e..ef335c8 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@ -88,6 +88,8 @@ public class ImportCollectionIT {
     @BeforeClass
     public static void setup() throws Exception {
 
+        bucketPrefix = System.getProperty( "bucketName" );
+
         // start the scheduler after we're all set up
         JobSchedulerService jobScheduler = cassandraResource.getBean( JobSchedulerService.class );
         if ( jobScheduler.state() != Service.State.RUNNING ) {
@@ -99,9 +101,9 @@ public class ImportCollectionIT {
 
     @AfterClass
     public static void tearDown() {
-//        if ( !StringUtils.isEmpty( bucketPrefix )) {
-//            deleteBucketsWithPrefix();
-//        }
+        if ( !StringUtils.isEmpty( bucketPrefix )) {
+            deleteBucketsWithPrefix();
+        }
     }
 
 
@@ -128,7 +130,7 @@ public class ImportCollectionIT {
         organization = newOrgAppAdminRule.getOrganizationInfo();
         applicationId = newOrgAppAdminRule.getApplicationInfo().getId();
 
-        bucketPrefix = System.getProperty( "bucketName" );
+
         bucketName = bucketPrefix + RandomStringUtils.randomAlphanumeric(10).toLowerCase();
     }
 
@@ -361,9 +363,6 @@ public class ImportCollectionIT {
 
             logger.debug("\n\nQuery to see if we now have 100 entities\n");
 
-            // take this out and the test will fail
-            // TODO: fix ImportService so that it doesn't mark job finished until ALL entities are written
-            Thread.sleep(5000);
 
             Query query = Query.fromQL("select *").withLimit(101);
             List<Entity> importedThings = emDefaultApp.getCollection(
@@ -423,12 +422,13 @@ public class ImportCollectionIT {
 
         //  listener.start();
 
-        int maxRetries = 20;
-        int retries = 0;
-        while ( !importService.getState( importUUID ).equals( "FINISHED" ) && retries++ < maxRetries ) {
+//        int maxRetries = 20;
+//        int retries = 0;
+        while ( !importService.getState( importUUID ).equals( "FINISHED" ) ) {
             Thread.sleep(1000);
         }
 
+
         em.refreshIndex();
     }