You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/02/06 20:04:07 UTC
incubator-usergrid git commit: Fixed bug where there is a race
condition in scheduling.
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-import f23dd7686 -> 2e05f2496
Fixed bug where there is a race condition in scheduling.
Fixes bug in test cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2e05f249
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2e05f249
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2e05f249
Branch: refs/heads/two-dot-o-import
Commit: 2e05f2496d1f20d76d1a1a8886d5199524726720
Parents: f23dd76
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Feb 6 12:04:05 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Feb 6 12:04:05 2015 -0700
----------------------------------------------------------------------
.../persistence/entities/FileImport.java | 22 --
.../resources/usergrid-custom-test.properties | 7 +-
.../management/importer/FileImportTracker.java | 5 +-
.../management/importer/ImportServiceImpl.java | 254 +++++++++----------
.../management/importer/ImportCollectionIT.java | 20 +-
5 files changed, 145 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2e05f249/stack/core/src/main/java/org/apache/usergrid/persistence/entities/FileImport.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/FileImport.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/FileImport.java
index 2f6ec87..9a04dd4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/FileImport.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/FileImport.java
@@ -59,11 +59,6 @@ public class FileImport extends TypedEntity {
@EntityProperty
protected String collectionName;
- /**
- * File completion Status
- */
- @EntityProperty
- protected Boolean completed;
/**
* LastUpdatedUUID
@@ -89,7 +84,6 @@ public class FileImport extends TypedEntity {
public FileImport() {
- setCompleted(false);
setLastUpdatedUUID(" ");
setErrorMessage(" ");
setState(FileImport.State.CREATED);
@@ -120,22 +114,6 @@ public class FileImport extends TypedEntity {
}
/**
- * Get the completed status of the file i.e. if the file is completely parsed or not.
- * @return completed status
- */
- public Boolean getCompleted() {
- return completed;
- }
-
- /**
- * Get the completed status of the file i.e. if the file is completely parsed or not.
- * @param completed Boolean indicating whether parsing this file is complete or not
- */
- public void setCompleted(final Boolean completed) {
- this.completed = completed;
- }
-
- /**
* gets the state of the current job
* @return state
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2e05f249/stack/core/src/test/resources/usergrid-custom-test.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/usergrid-custom-test.properties b/stack/core/src/test/resources/usergrid-custom-test.properties
index d093ca8..ae614db 100644
--- a/stack/core/src/test/resources/usergrid-custom-test.properties
+++ b/stack/core/src/test/resources/usergrid-custom-test.properties
@@ -12,8 +12,8 @@
# Core module test properties
-# these settings allow tests to run and consistently pass on 16GB MacBook Pro
-# with ug.heapmax=5000m and ug.heapmin=3000m (set in Maven settings.xml)
+# these settings allow tests to run and consistently pass on 16GB MacBook Pro
+# with ug.heapmax=5000m and ug.heapmin=3000m (set in Maven settings.xml)
cassandra.startup=external
elasticsearch.startup=external
cassandra.timeout=2000
@@ -23,3 +23,6 @@ hystrix.threadpool.graph_async.coreSize=50
collection.task.pool.threadsize=8
collection.task.pool.queuesize=8
+
+
+usergrid.scheduler.job.workers=1
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2e05f249/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportTracker.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportTracker.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportTracker.java
index cda01a6..e52d850 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportTracker.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportTracker.java
@@ -292,14 +292,17 @@ public class FileImportTracker {
final long writtenEntities = entitiesWritten.get();
final long failedEntities = entitiesFailed.get();
- final long writtenConnections = connectionsFailed.get();
+ final long writtenConnections = connectionsWritten.get();
final long failedConnections = connectionsFailed.get();
fileImport.setImportedEntityCount( writtenEntities );
fileImport.setFailedEntityCount( failedEntities );
+
fileImport.setImportedConnectionCount( writtenConnections );
fileImport.setFailedConnectionCount( failedConnections );
+
+
fileImport.setState( state );
fileImport.setErrorMessage( message );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2e05f249/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 148a6ab..51e9a09 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -31,7 +31,6 @@ import org.apache.usergrid.persistence.entities.JobData;
import org.apache.usergrid.utils.InflectionUtils;
import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
import org.codehaus.jackson.map.ObjectMapper;
@@ -40,15 +39,12 @@ import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
-import rx.functions.Action2;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;
import java.io.File;
-import java.io.IOException;
import java.util.*;
-import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
@@ -172,7 +168,7 @@ public class ImportServiceImpl implements ImportService {
* @return it returns the UUID of the scheduled job
* @throws Exception
*/
- public UUID scheduleFile(Map<String, Object> config, String file, EntityRef importRef) throws Exception {
+ public JobData createFileTask( Map<String, Object> config, String file, EntityRef importRef ) throws Exception {
logger.debug("scheduleFile() for import {}:{} file {}",
new Object[]{importRef.getType(), importRef.getType(), file});
@@ -203,31 +199,37 @@ public class ImportServiceImpl implements ImportService {
}
// mark the File Import Job as created
- fileImport.setState(FileImport.State.CREATED);
- rootEm.update(fileImport);
+ fileImport.setState( FileImport.State.CREATED );
+ rootEm.update( fileImport );
//set data to be transferred to the FileImport Job
JobData jobData = new JobData();
- jobData.setProperty("File", file);
+ jobData.setProperty( "File", file );
jobData.setProperty(FILE_IMPORT_ID, fileImport.getUuid());
jobData.addProperties(config);
- long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
- // TODO SQS: Tear this part out and set the new job to be taken in here
- // schedule file import job
- sch.createJob(FILE_IMPORT_JOB_NAME, soonestPossible, jobData);
-
- //probably how it should work
- ImportQueueMessage message = new ImportQueueMessage( fileImport.getUuid(),
- (UUID) config.get( "applicationId" ) ,file );
- qm.sendMessage( message );
//update state of the job to Scheduled
fileImport.setState(FileImport.State.SCHEDULED);
rootEm.update(fileImport);
- return fileImport.getUuid();
+ return jobData;
+ }
+
+
+ /**
+ * Schedule the file tasks. This must happen in 2 phases. The first is linking the sub files to the master the
+ * second is scheduling them to run.
+ */
+ public JobData scheduleFileTasks( final JobData jobData ) {
+
+
+ long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
+
+ // TODO SQS: Tear this part out and set the new job to be taken in here
+ // schedule file import job
+ return sch.createJob( FILE_IMPORT_JOB_NAME, soonestPossible, jobData );
}
/**
@@ -294,7 +296,7 @@ public class ImportServiceImpl implements ImportService {
UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
EntityManager importManager = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
- return importManager.get(importId, Import.class);
+ return importManager.get( importId, Import.class );
}
@@ -380,13 +382,13 @@ public class ImportServiceImpl implements ImportService {
UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
EntityManager rooteEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
- Import importUG = rooteEm.get(importId, Import.class);
+ Import rootImportTask = rooteEm.get(importId, Import.class);
//update the entity state to show that the job has officially started.
- importUG.setState(Import.State.STARTED);
- importUG.setStarted(System.currentTimeMillis());
- importUG.setErrorMessage(" ");
- rooteEm.update(importUG);
+ rootImportTask.setState( Import.State.STARTED );
+ rootImportTask.setStarted( System.currentTimeMillis() );
+ rootImportTask.setErrorMessage( " " );
+ rooteEm.update(rootImportTask);
try {
if (s3PlaceHolder != null) {
s3Import = (S3Import) s3PlaceHolder;
@@ -395,9 +397,9 @@ public class ImportServiceImpl implements ImportService {
}
} catch (Exception e) {
logger.error("doImport(): S3Import doesn't exist");
- importUG.setErrorMessage(e.getMessage());
- importUG.setState(Import.State.FAILED);
- rooteEm.update(importUG);
+ rootImportTask.setErrorMessage( e.getMessage() );
+ rootImportTask.setState( Import.State.FAILED );
+ rooteEm.update(rootImportTask);
return;
}
@@ -409,9 +411,9 @@ public class ImportServiceImpl implements ImportService {
if (config.get("organizationId") == null) {
logger.error("doImport(): No organization could be found");
- importUG.setErrorMessage("No organization could be found");
- importUG.setState(Import.State.FAILED);
- rooteEm.update(importUG);
+ rootImportTask.setErrorMessage( "No organization could be found" );
+ rootImportTask.setState( Import.State.FAILED );
+ rooteEm.update(rootImportTask);
return;
} else {
@@ -443,16 +445,16 @@ public class ImportServiceImpl implements ImportService {
}
} catch (OrganizationNotFoundException | ApplicationNotFoundException e) {
- importUG.setErrorMessage(e.getMessage());
- importUG.setState(Import.State.FAILED);
- rooteEm.update(importUG);
+ rootImportTask.setErrorMessage( e.getMessage() );
+ rootImportTask.setState( Import.State.FAILED );
+ rooteEm.update(rootImportTask);
return;
}
if (files.size() == 0) {
- importUG.setState(Import.State.FINISHED);
- importUG.setErrorMessage("no files found in the bucket with the relevant context");
- rooteEm.update(importUG);
+ rootImportTask.setState( Import.State.FINISHED );
+ rootImportTask.setErrorMessage( "no files found in the bucket with the relevant context" );
+ rooteEm.update(rootImportTask);
} else {
@@ -460,22 +462,34 @@ public class ImportServiceImpl implements ImportService {
ArrayList<Map<String, Object>> value = new ArrayList<Map<String, Object>>();
- // schedule each file as a separate job
+ final List<JobData> fileJobs = new ArrayList<>(files.size());
+
+ //create the connection and set up metadata
for (File file : files) {
// TODO SQS: replace the method inside here so that it uses sqs instead of internal q
+ final JobData jobData = createFileTask( config, file.getPath(), rootImportTask );
- UUID jobID = scheduleFile(config, file.getPath(), importUG);
+ fileJobs.add( jobData) ;
+
+ }
+
+ //schedule each job
+ for(JobData jobData: fileJobs){
+
+ final JobData scheduled = scheduleFileTasks( jobData );
Map<String, Object> fileJobID = new HashMap<String, Object>();
- fileJobID.put("FileName", file.getName());
- fileJobID.put("JobID", jobID.toString());
+ fileJobID.put("FileName", scheduled.getProperty( "File" ));
+ fileJobID.put("JobID", scheduled.getUuid());
value.add(fileJobID);
}
+
+
fileMetadata.put("files", value);
- importUG.addProperties(fileMetadata);
- rooteEm.update(importUG);
+ rootImportTask.addProperties( fileMetadata );
+ rooteEm.update(rootImportTask);
}
}
@@ -490,7 +504,7 @@ public class ImportServiceImpl implements ImportService {
logger.debug("importCollectionFromOrgApp()");
//retrieves import entity
- Import importUG = getImportEntity(jobExecution);
+ Import importUG = getImportEntity( jobExecution );
ApplicationInfo application = managementService.getApplicationInfo(applicationUUID);
if (application == null) {
@@ -504,8 +518,8 @@ public class ImportServiceImpl implements ImportService {
String collectionName = config.get("collectionName").toString();
- String appFileName = prepareCollectionInputFileName(
- organizationInfo.getName(), application.getName(), collectionName);
+ String appFileName = prepareCollectionInputFileName( organizationInfo.getName(), application.getName(),
+ collectionName );
return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.COLLECTION);
@@ -521,7 +535,7 @@ public class ImportServiceImpl implements ImportService {
//retrieves import entity
Import importUG = getImportEntity(jobExecution);
- ApplicationInfo application = managementService.getApplicationInfo(applicationId);
+ ApplicationInfo application = managementService.getApplicationInfo( applicationId );
if (application == null) {
throw new ApplicationNotFoundException("Application Not Found");
}
@@ -531,8 +545,7 @@ public class ImportServiceImpl implements ImportService {
throw new OrganizationNotFoundException("Organization Not Found");
}
- String appFileName = prepareApplicationInputFileName(
- organizationInfo.getName(), application.getName());
+ String appFileName = prepareApplicationInputFileName( organizationInfo.getName(), application.getName() );
return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.APPLICATION);
@@ -548,13 +561,13 @@ public class ImportServiceImpl implements ImportService {
// retrieves import entity
Import importUG = getImportEntity(jobExecution);
- OrganizationInfo organizationInfo = managementService.getOrganizationByUuid(organizationUUID);
+ OrganizationInfo organizationInfo = managementService.getOrganizationByUuid( organizationUUID );
if (organizationInfo == null) {
throw new OrganizationNotFoundException("Organization Not Found");
}
// prepares the prefix path for the files to be import depending on the endpoint being hit
- String appFileName = prepareOrganizationInputFileName(organizationInfo.getName());
+ String appFileName = prepareOrganizationInputFileName( organizationInfo.getName() );
return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.ORGANIZATION);
@@ -609,7 +622,7 @@ public class ImportServiceImpl implements ImportService {
// TODO: ImportService should not have to know about JobExecution
public void parseFileToEntities(JobExecution jobExecution) throws Exception {
- FileImport fileImport = getFileImportEntity(jobExecution);
+ FileImport fileImport = getFileImportEntity( jobExecution );
File file = new File(jobExecution.getJobData().getProperty("File").toString());
UUID targetAppId = (UUID) jobExecution.getJobData().getProperty("applicationId");
@@ -617,103 +630,87 @@ public class ImportServiceImpl implements ImportService {
}
- public void parseFileToEntities(final JobExecution execution, final FileImport fileImport, final File file, final UUID targetAppId ) throws Exception {
+ public void parseFileToEntities( final JobExecution execution, final FileImport fileImport, final File file,
+ final UUID targetAppId ) throws Exception {
- logger.debug("parseFileToEntities() for file {} ", file.getAbsolutePath());
+ logger.debug( "parseFileToEntities() for file {} ", file.getAbsolutePath() );
- EntityManager emManagementApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
- emManagementApp.update(fileImport);
+ EntityManager emManagementApp = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
+ emManagementApp.update( fileImport );
- boolean completed = fileImport.getCompleted();
+ //already done, nothing to do
+ if ( FileImport.State.FAILED.equals( fileImport.getState() ) || FileImport.State.FINISHED
+ .equals( fileImport.getState() ) ) {
+ return;
+ }
- // on resume, completed files will not be traversed again
- if (!completed) {
- // validates the JSON structure
- //TODO Dave, do we really want to validate this up front? This will require us to download the file 3x
- if (isValidJSON(file, emManagementApp, fileImport)) {
+ // mark the File import job as started
+ fileImport.setState( FileImport.State.STARTED );
+ emManagementApp.update( fileImport );
- // mark the File import job as started
- fileImport.setState(FileImport.State.STARTED);
- emManagementApp.update(fileImport);
+ if ( emManagementApp.get( targetAppId ) == null ) {
+ throw new IllegalArgumentException( "Application does not exist: " + targetAppId.toString() );
+ }
+ EntityManager targetEm = emf.getEntityManager( targetAppId );
+ logger.debug( " importing into app {} file {}", targetAppId.toString(), file.getAbsolutePath() );
- if (emManagementApp.get(targetAppId) == null) {
- throw new IllegalArgumentException("Application does not exist: " + targetAppId.toString());
- }
- EntityManager targetEm = emf.getEntityManager(targetAppId);
- logger.debug(" importing into app {} file {}", targetAppId.toString(), file.getAbsolutePath());
+ importEntitiesFromFile( execution, file, targetEm, emManagementApp, fileImport );
- importEntitiesFromFile(execution, file, targetEm, emManagementApp, fileImport );
+ // Updates the state of file import job
- // Updates the state of file import job
- if (!fileImport.getState().toString().equals("FAILED")) {
- // mark file as completed
- fileImport.setCompleted(true);
- fileImport.setState(FileImport.State.FINISHED);
- emManagementApp.update(fileImport);
+ // check other files status and mark the status of
+ // import Job as Finished if all others are finished
+ Results ImportJobResults =
+ emManagementApp.getConnectingEntities( fileImport, "includes", null, Level.ALL_PROPERTIES );
- // check other files status and mark the status of
- // import Job as Finished if all others are finished
- Results ImportJobResults = emManagementApp.getConnectingEntities(
- fileImport, "includes", null, Level.ALL_PROPERTIES);
+ List<Entity> importEntity = ImportJobResults.getEntities();
+ UUID importId = importEntity.get( 0 ).getUuid();
+ Import importUG = emManagementApp.get( importId, Import.class );
- List<Entity> importEntity = ImportJobResults.getEntities();
- UUID importId = importEntity.get(0).getUuid();
- Import importUG = emManagementApp.get(importId, Import.class);
+ Results entities = emManagementApp.getConnectedEntities( importUG, "includes", null, Level.ALL_PROPERTIES );
- Results entities = emManagementApp.getConnectedEntities(importUG, "includes", null, Level.ALL_PROPERTIES);
- List<Entity> importFile = entities.getEntities();
+ PagingResultsIterator itr = new PagingResultsIterator( entities );
- int count = 0;
- for (Entity eachEntity : importFile) {
- FileImport fi = emManagementApp.get(eachEntity.getUuid(), FileImport.class);
- if (fi.getState().toString().equals("FINISHED")) {
- count++;
- } else if (fi.getState().toString().equals("FAILED")) {
- importUG.setState(Import.State.FAILED);
- emManagementApp.update(importUG);
- break;
- }
- }
- if (count == importFile.size()) {
- importUG.setState(Import.State.FINISHED);
- emManagementApp.update(importUG);
- }
- }
+
+ int failCount = 0;
+
+ while ( itr.hasNext() ) {
+ FileImport fi = ( FileImport ) itr.next();
+
+
+ switch ( fi.getState() ) {
+ //failed, but we may not be complete so continue checking
+ case FAILED:
+ failCount++;
+ break;
+ //finished, we can continue checking
+ case FINISHED:
+ break;
+ //not something we recognize as complete, short circuit
+ default:
+ return;
}
}
- }
- /**
- * Checks if a file is a valid JSON
- *
- * @param collectionFile the file being validated
- * @param rootEm the Entity Manager for the Management application
- * @param fileImport the file import entity
- */
- private boolean isValidJSON(File collectionFile, EntityManager rootEm, FileImport fileImport)
- throws Exception {
- boolean valid = false;
- try {
- final JsonParser jp = jsonFactory.createJsonParser(collectionFile);
- while (jp.nextToken() != null) {
- }
- valid = true;
- } catch (JsonParseException e) {
- e.printStackTrace();
- fileImport.setErrorMessage(e.getMessage());
- rootEm.update(fileImport);
- } catch (IOException e) {
- fileImport.setErrorMessage(e.getMessage());
- rootEm.update(fileImport);
+ if ( failCount == 0 ) {
+ importUG.setState( Import.State.FINISHED );
}
- return valid;
+
+ //we had failures, set it to failed
+ else {
+ importUG.setState( Import.State.FAILED );
+ }
+
+ emManagementApp.update( importUG );
}
+
+
/**
* Gets the JSON parser for given file
*
@@ -784,7 +781,7 @@ public class ImportServiceImpl implements ImportService {
// potentially skip the first n if this is a resume operation
final int entityNumSkip = (int)tracker.getTotalEntityCount();
- entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
+ final int entityCount = entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
@Override
public Boolean call( final WriteEvent writeEvent ) {
return !tracker.shouldStopProcessingEntities();
@@ -812,13 +809,14 @@ public class ImportServiceImpl implements ImportService {
// TODO: move JSON parser into observable creation so open/close happens within the stream
final JsonEntityParserObservable jsonObservableOther =
new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
+
final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
// only take while our stats tell us we should continue processing
// potentially skip the first n if this is a resume operation
final int connectionNumSkip = (int)tracker.getTotalConnectionCount();
- otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
+ final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
@Override
public Boolean call( final WriteEvent writeEvent ) {
return !tracker.shouldStopProcessingConnections();
@@ -834,7 +832,7 @@ public class ImportServiceImpl implements ImportService {
logger.debug("\n\nimportEntitiesFromFile(): Wrote others\n");
-
+
// flush the job statistics
tracker.complete();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2e05f249/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index c0d215e..ef335c8 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@ -88,6 +88,8 @@ public class ImportCollectionIT {
@BeforeClass
public static void setup() throws Exception {
+ bucketPrefix = System.getProperty( "bucketName" );
+
// start the scheduler after we're all set up
JobSchedulerService jobScheduler = cassandraResource.getBean( JobSchedulerService.class );
if ( jobScheduler.state() != Service.State.RUNNING ) {
@@ -99,9 +101,9 @@ public class ImportCollectionIT {
@AfterClass
public static void tearDown() {
-// if ( !StringUtils.isEmpty( bucketPrefix )) {
-// deleteBucketsWithPrefix();
-// }
+ if ( !StringUtils.isEmpty( bucketPrefix )) {
+ deleteBucketsWithPrefix();
+ }
}
@@ -128,7 +130,7 @@ public class ImportCollectionIT {
organization = newOrgAppAdminRule.getOrganizationInfo();
applicationId = newOrgAppAdminRule.getApplicationInfo().getId();
- bucketPrefix = System.getProperty( "bucketName" );
+
bucketName = bucketPrefix + RandomStringUtils.randomAlphanumeric(10).toLowerCase();
}
@@ -361,9 +363,6 @@ public class ImportCollectionIT {
logger.debug("\n\nQuery to see if we now have 100 entities\n");
- // take this out and the test will fail
- // TODO: fix ImportService so that it doesn't mark job finished until ALL entities are written
- Thread.sleep(5000);
Query query = Query.fromQL("select *").withLimit(101);
List<Entity> importedThings = emDefaultApp.getCollection(
@@ -423,12 +422,13 @@ public class ImportCollectionIT {
// listener.start();
- int maxRetries = 20;
- int retries = 0;
- while ( !importService.getState( importUUID ).equals( "FINISHED" ) && retries++ < maxRetries ) {
+// int maxRetries = 20;
+// int retries = 0;
+ while ( !importService.getState( importUUID ).equals( "FINISHED" ) ) {
Thread.sleep(1000);
}
+
em.refreshIndex();
}