You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/02/10 14:44:26 UTC
[3/4] incubator-usergrid git commit: After the Import->FileImport
connections are made, wait for them to take effect before scheduling jobs.
After the Import->FileImport connections are made, wait for them to take effect before scheduling jobs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2428b862
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2428b862
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2428b862
Branch: refs/heads/two-dot-o-import
Commit: 2428b862a4b6993e89376e580dcd3dd2955182aa
Parents: 9e76c56
Author: Dave Johnson <dm...@apigee.com>
Authored: Mon Feb 9 19:57:33 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Mon Feb 9 19:57:33 2015 -0500
----------------------------------------------------------------------
.../management/importer/ImportServiceImpl.java | 122 ++++++++++++-------
.../management/importer/ImportCollectionIT.java | 9 +-
2 files changed, 78 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2428b862/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index a613fdb..2d611c5 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -183,6 +183,13 @@ public class ImportServiceImpl implements ImportService {
try {
// create a connection between the main import job and the sub FileImport Job
emManagementApp.createConnection(importEntity, "includes", fileImport);
+
+ logger.debug("Created connection from {}:{} to {}:{}",
+ new Object[] {
+ importEntity.getType(), importEntity.getUuid(),
+ fileImport.getType(), fileImport.getUuid()
+ });
+
} catch (Exception e) {
logger.error(e.getMessage());
return null;
@@ -202,8 +209,6 @@ public class ImportServiceImpl implements ImportService {
fileImport.setState(FileImport.State.SCHEDULED);
emManagementApp.update(fileImport);
- emf.refreshIndex();
-
return jobData;
}
@@ -372,12 +377,12 @@ public class ImportServiceImpl implements ImportService {
EntityManager emManagementApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
- Import rootImportTask = emManagementApp.get(importId, Import.class);
+ Import importEntity = emManagementApp.get(importId, Import.class);
- rootImportTask.setState(Import.State.STARTED);
- rootImportTask.setStarted(System.currentTimeMillis());
- rootImportTask.setErrorMessage( " " );
- emManagementApp.update(rootImportTask);
+ importEntity.setState(Import.State.STARTED);
+ importEntity.setStarted(System.currentTimeMillis());
+ importEntity.setErrorMessage(" ");
+ emManagementApp.update(importEntity);
logger.debug("doImport(): updated state");
// if no S3 importer was passed in then create one
@@ -392,9 +397,9 @@ public class ImportServiceImpl implements ImportService {
}
} catch (Exception e) {
logger.error("doImport(): Error creating S3Import", e);
- rootImportTask.setErrorMessage(e.getMessage());
- rootImportTask.setState( Import.State.FAILED );
- emManagementApp.update(rootImportTask);
+ importEntity.setErrorMessage(e.getMessage());
+ importEntity.setState(Import.State.FAILED);
+ emManagementApp.update(importEntity);
return;
}
@@ -405,9 +410,9 @@ public class ImportServiceImpl implements ImportService {
if (config.get("organizationId") == null) {
logger.error("doImport(): No organization could be found");
- rootImportTask.setErrorMessage( "No organization could be found" );
- rootImportTask.setState( Import.State.FAILED );
- emManagementApp.update(rootImportTask);
+ importEntity.setErrorMessage("No organization could be found");
+ importEntity.setState(Import.State.FAILED);
+ emManagementApp.update(importEntity);
return;
} else {
@@ -422,9 +427,9 @@ public class ImportServiceImpl implements ImportService {
}
} catch (OrganizationNotFoundException | ApplicationNotFoundException e) {
- rootImportTask.setErrorMessage( e.getMessage() );
- rootImportTask.setState( Import.State.FAILED );
- emManagementApp.update(rootImportTask);
+ importEntity.setErrorMessage(e.getMessage());
+ importEntity.setState(Import.State.FAILED);
+ emManagementApp.update(importEntity);
return;
}
@@ -432,9 +437,9 @@ public class ImportServiceImpl implements ImportService {
// schedule a FileImport job for each file found in the bucket
if ( bucketFiles.isEmpty() ) {
- rootImportTask.setState( Import.State.FINISHED );
- rootImportTask.setErrorMessage( "No files found in the bucket: " + bucketName );
- emManagementApp.update(rootImportTask);
+ importEntity.setState(Import.State.FINISHED);
+ importEntity.setErrorMessage("No files found in the bucket: " + bucketName);
+ emManagementApp.update(importEntity);
} else {
@@ -445,10 +450,31 @@ public class ImportServiceImpl implements ImportService {
// create the Entity Connection and set up metadata for each job
for ( String bucketFile : bucketFiles ) {
- final JobData jobData = createFileTask(config, bucketFile, rootImportTask);
+ final JobData jobData = createFileTask(config, bucketFile, importEntity);
fileJobs.add( jobData) ;
}
+ int retries = 0;
+ int maxRetries = 60;
+ Results entities;
+ boolean done = false;
+ while ( !done && retries++ < maxRetries ) {
+
+ entities = emManagementApp.getConnectedEntities(
+ importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
+
+ logger.debug("Found {} jobs", entities.size());
+
+ if ( entities.size() == fileJobs.size() ) {
+ done = true;
+ } else {
+ Thread.sleep(1000);
+ }
+ }
+ if ( retries >= maxRetries ) {
+ throw new RuntimeException("Max retries was reached");
+ }
+
// schedule each job
for ( JobData jobData: fileJobs ) {
@@ -462,9 +488,9 @@ public class ImportServiceImpl implements ImportService {
}
fileMetadata.put("files", value);
- rootImportTask.addProperties(fileMetadata);
- rootImportTask.setFileCount( fileJobs.size() );
- emManagementApp.update(rootImportTask);
+ importEntity.addProperties(fileMetadata);
+ importEntity.setFileCount(fileJobs.size());
+ emManagementApp.update(importEntity);
}
}
@@ -546,31 +572,35 @@ public class ImportServiceImpl implements ImportService {
String randTag = RandomStringUtils.randomAlphanumeric(4);
logger.debug("{} Got importEntity {}", randTag, importEntity.getUuid() );
- int retries = 0;
- int maxRetries = 60;
- Results entities = null;
- boolean done = false;
- while ( !done && retries++ < maxRetries ) {
-
- // get all file import job siblings of the current job we're working now
- entities = emManagementApp.getConnectedEntities(
- importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
+ Results entities = emManagementApp.getConnectedEntities(
+ importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
- if ( entities.size() == importEntity.getFileCount() ) {
- logger.debug("{} got {} file_import entities, expected {} DONE!",
- new Object[] { randTag, entities.size(), importEntity.getFileCount() });
- done = true;
- } else {
- logger.debug("{} got {} file_import entities, expected {} waiting... ",
- new Object[] { randTag, entities.size(), importEntity.getFileCount() });
- Thread.sleep(1000);
- }
- }
-
- if ( retries >= maxRetries ) {
- throw new RuntimeException("Max retries was reached");
- }
+// int retries = 0;
+// int maxRetries = 60;
+// Results entities = null;
+// boolean done = false;
+// while ( !done && retries++ < maxRetries ) {
+//
+// // get all file import job siblings of the current job we're working now
+// entities = emManagementApp.getConnectedEntities(
+// importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
+//
+// if ( entities.size() == importEntity.getFileCount() ) {
+// logger.debug("{} got {} file_import entities, expected {} DONE!",
+// new Object[] { randTag, entities.size(), importEntity.getFileCount() });
+// done = true;
+//
+// } else {
+// logger.debug("{} got {} file_import entities, expected {} waiting... ",
+// new Object[] { randTag, entities.size(), importEntity.getFileCount() });
+// Thread.sleep(1000);
+// }
+// }
+//
+// if ( retries >= maxRetries ) {
+// throw new RuntimeException("Max retries was reached");
+// }
PagingResultsIterator itr = new PagingResultsIterator( entities );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2428b862/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index 24ed292..a9fd7db 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@ -285,9 +285,6 @@ public class ImportCollectionIT {
}
-
-
-
/**
* Simple import test but with multiple files.
*/
@@ -301,7 +298,7 @@ public class ImportCollectionIT {
// create 4 applications each with collection of 10 things, export all to S3
logger.debug("\n\nCreating 10 applications with 10 entities each\n");
- for (int i = 0; i < 4; i++) {
+ for (int i = 0; i < 10; i++) {
String appName = "import-test-" + i + RandomStringUtils.randomAlphanumeric(10);
UUID appId = setup.getMgmtSvc().createApplication(organization.getUuid(), appName).getId();
@@ -324,14 +321,12 @@ public class ImportCollectionIT {
logger.debug("\n\nQuery to see if we now have 100 entities\n");
-
Query query = Query.fromQL("select *").withLimit(101);
List<Entity> importedThings = emDefaultApp.getCollection(
emDefaultApp.getApplicationId(), "things", query, Level.ALL_PROPERTIES).getEntities();
-
assertTrue(!importedThings.isEmpty());
- assertEquals(40, importedThings.size());
+ assertEquals(100, importedThings.size());
} finally {
deleteBucket();