You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/02/10 14:44:24 UTC
[1/4] incubator-usergrid git commit: FileImportJob will now wait for
other Import -> FileImport connections to be made before making the "are we
done yet" call.
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-import 93cbeb5b8 -> 62deba3fa
FileImportJob will now wait for other Import -> FileImport connections to be made before making the "are we done yet" call.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8bef092d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8bef092d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8bef092d
Branch: refs/heads/two-dot-o-import
Commit: 8bef092d275381427331a9319355a81db5a6d9cf
Parents: 18671e7
Author: Dave Johnson <dm...@apigee.com>
Authored: Mon Feb 9 17:41:58 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Mon Feb 9 17:41:58 2015 -0500
----------------------------------------------------------------------
.../usergrid/persistence/entities/Import.java | 20 ++-
.../management/export/S3ExportImpl.java | 2 -
.../management/importer/ImportServiceImpl.java | 162 ++++++++++++-------
.../management/importer/ImportCollectionIT.java | 4 +-
4 files changed, 121 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8bef092d/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Import.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Import.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Import.java
index d7a8751..83048b9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Import.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Import.java
@@ -28,12 +28,15 @@ import javax.xml.bind.annotation.XmlRootElement;
@XmlRootElement
public class Import extends TypedEntity {
- //canceled and expired states aren't used in current iteration.
+ //canceled and expired states aren't used in current iteration.
public static enum State {
CREATED, FAILED, SCHEDULED, STARTED, FINISHED, CANCELED, EXPIRED
}
@EntityProperty
+ private int fileCount;
+
+ @EntityProperty
protected State curState;
/**
@@ -49,9 +52,22 @@ public class Import extends TypedEntity {
protected String errorMessage;
- public Import() {
+ public Import() {}
+
+
+ @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+ @EntityProperty
+ public void setFileCount(int fileCount) {
+ this.fileCount = fileCount;
}
+ @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+ @EntityProperty
+ public int getFileCount() {
+ return fileCount;
+ }
+
+
/**
* get the started time for the import job
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8bef092d/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
index 0b1fac5..e27788b 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
@@ -19,10 +19,8 @@ package org.apache.usergrid.management.export;
import com.amazonaws.SDKGlobalConfiguration;
import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Module;
import org.jclouds.ContextBuilder;
-import org.jclouds.blobstore.AsyncBlobStore;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8bef092d/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index f724536..06be766 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -18,6 +18,7 @@
package org.apache.usergrid.management.importer;
import com.amazonaws.SDKGlobalConfiguration;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.batch.JobExecution;
import org.apache.usergrid.batch.service.SchedulerService;
import org.apache.usergrid.corepersistence.CpSetup;
@@ -45,9 +46,7 @@ import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
-import rx.functions.Func1;
import rx.functions.Func2;
-import rx.schedulers.Schedulers;
import javax.annotation.PostConstruct;
import java.io.File;
@@ -168,10 +167,10 @@ public class ImportServiceImpl implements ImportService {
logger.debug("scheduleFile() for import {}:{} file {}",
new Object[]{importRef.getType(), importRef.getType(), file});
- EntityManager rootEm = null;
+ EntityManager emManagementApp = null;
try {
- rootEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+ emManagementApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
} catch (Exception e) {
logger.error("application doesn't exist within the current context");
return null;
@@ -181,33 +180,33 @@ public class ImportServiceImpl implements ImportService {
String collectionName = config.get("collectionName").toString();
UUID applicationId = (UUID)config.get("applicationId");
FileImport fileImport = new FileImport( file, applicationId, collectionName );
- fileImport = rootEm.create(fileImport);
+ fileImport = emManagementApp.create(fileImport);
- Import importUG = rootEm.get(importRef, Import.class);
+ Import importEntity = emManagementApp.get(importRef, Import.class);
try {
// create a connection between the main import job and the sub FileImport Job
- rootEm.createConnection(importUG, "includes", fileImport);
+ emManagementApp.createConnection(importEntity, "includes", fileImport);
} catch (Exception e) {
logger.error(e.getMessage());
return null;
}
// mark the File Import Job as created
- fileImport.setState( FileImport.State.CREATED );
- rootEm.update( fileImport );
+ fileImport.setState(FileImport.State.CREATED);
+ emManagementApp.update( fileImport );
// set data to be transferred to the FileImport Job
JobData jobData = new JobData();
- jobData.setProperty( "File", file );
+ jobData.setProperty("File", file);
jobData.setProperty(FILE_IMPORT_ID, fileImport.getUuid());
jobData.addProperties(config);
// update state of the job to Scheduled
fileImport.setState(FileImport.State.SCHEDULED);
- rootEm.update(fileImport);
+ emManagementApp.update(fileImport);
- rootEm.refreshIndex();
+ emf.refreshIndex();
return jobData;
}
@@ -453,7 +452,7 @@ public class ImportServiceImpl implements ImportService {
// create the Entity Connection and set up metadata for each job
for ( String bucketFile : bucketFiles ) {
- final JobData jobData = createFileTask( config, bucketFile, rootImportTask );
+ final JobData jobData = createFileTask(config, bucketFile, rootImportTask);
fileJobs.add( jobData) ;
}
@@ -470,7 +469,8 @@ public class ImportServiceImpl implements ImportService {
}
fileMetadata.put("files", value);
- rootImportTask.addProperties( fileMetadata );
+ rootImportTask.addProperties(fileMetadata);
+ rootImportTask.setFileCount( fileJobs.size() );
emManagementApp.update(rootImportTask);
}
}
@@ -543,17 +543,50 @@ public class ImportServiceImpl implements ImportService {
// mark ImportJob FINISHED but only if all other FileImportJobs are complete
// get parent import job of this file import job
+
Results importJobResults =
emManagementApp.getConnectingEntities( fileImport, "includes", null, Level.ALL_PROPERTIES );
List<Entity> importEntities = importJobResults.getEntities();
UUID importId = importEntities.get( 0 ).getUuid();
Import importEntity = emManagementApp.get( importId, Import.class );
- // get all file import job siblings of the current job we're working now
- Results entities = emManagementApp.getConnectedEntities( importEntity, "includes", null, Level.ALL_PROPERTIES );
+ String randTag = RandomStringUtils.randomAlphanumeric(4);
+ logger.debug("{} Got importEntity {}", randTag, importEntity.getUuid() );
+
+ int retries = 0;
+ int maxRetries = 60;
+ Results entities = null;
+ boolean done = false;
+ while ( !done && retries++ < maxRetries ) {
+
+ // get all file import job siblings of the current job we're working now
+ entities = emManagementApp.getConnectedEntities(
+ importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
+
+ if ( entities.size() == importEntity.getFileCount() ) {
+ logger.debug("{} got {} file_import entities, expected {} DONE!",
+ new Object[] { randTag, entities.size(), importEntity.getFileCount() });
+ done = true;
+
+ } else {
+ logger.debug("{} got {} file_import entities, expected {} waiting... ",
+ new Object[] { randTag, entities.size(), importEntity.getFileCount() });
+ Thread.sleep(1000);
+ }
+ }
+
+ if ( retries >= maxRetries ) {
+ throw new RuntimeException("Max retries was reached");
+ }
+
+
PagingResultsIterator itr = new PagingResultsIterator( entities );
+ logger.debug("{} Check {} jobs to see if we are done for file {}",
+ new Object[] { randTag, entities.size(), fileImport.getFileName() } );
+
int failCount = 0;
+ int successCount = 0;
while ( itr.hasNext() ) {
FileImport fi = ( FileImport ) itr.next();
switch ( fi.getState() ) {
@@ -561,13 +594,18 @@ public class ImportServiceImpl implements ImportService {
failCount++;
break;
case FINISHED: // finished, we can continue checking
- break;
+ successCount++;
+ continue;
default: // not something we recognize as complete, short circuit
+ logger.debug("{} not done yet, bail out...", randTag);
return;
}
}
+ logger.debug("{} successCount = {} failCount = {}", new Object[] { randTag, successCount, failCount } );
+
if ( failCount == 0 ) {
+ logger.debug("{} FINISHED", randTag);
importEntity.setState(Import.State.FINISHED);
} else {
// we had failures, set it to failed
@@ -641,35 +679,27 @@ public class ImportServiceImpl implements ImportService {
// TODO: move JSON parser into observable creation so open/close happens within the stream
final JsonEntityParserObservable jsonObservableEntities =
- new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
- final Observable<WriteEvent> entityEventObservable = Observable.create(jsonObservableEntities);
+ new JsonEntityParserObservable(jp, em, rootEm, fileImport, tracker, entitiesOnly);
- // only take while our stats tell us we should continue processing
- // potentially skip the first n if this is a resume operation
- final int entityNumSkip = (int)tracker.getTotalEntityCount();
+ jsonObservableEntities.call( null );
- // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
+// final Observable<WriteEvent> entityEventObservable = Observable.create(jsonObservableEntities);
+//
+// // only take while our stats tell us we should continue processing
+// // potentially skip the first n if this is a resume operation
+// final int entityNumSkip = (int)tracker.getTotalEntityCount();
+//
// final int entityCount = entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
// @Override
// public Boolean call( final WriteEvent writeEvent ) {
// return !tracker.shouldStopProcessingEntities();
// }
// } ).skip(entityNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
-// @Override
-// public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
-// return entityWrapperObservable.doOnNext(doWork);
-// }
-// }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
-
- entityEventObservable.parallel(
- new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
- @Override
- public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
- return entityWrapperObservable.doOnNext(doWork);
- }
- }, Schedulers.io()).toBlocking().last();
-
- jp.close();
+// @Override
+// public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+// return entityWrapperObservable.doOnNext(doWork);
+// }
+// }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote entities\n");
@@ -684,15 +714,17 @@ public class ImportServiceImpl implements ImportService {
// TODO: move JSON parser into observable creation so open/close happens within the stream
final JsonEntityParserObservable jsonObservableOther =
- new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
-
- final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
+ new JsonEntityParserObservable(jp, em, rootEm, fileImport, tracker, entitiesOnly);
- // only take while our stats tell us we should continue processing
- // potentially skip the first n if this is a resume operation
- final int connectionNumSkip = (int)tracker.getTotalConnectionCount();
+ jsonObservableOther.call( null );
- // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
+// final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
+//
+// // only take while our stats tell us we should continue processing
+// // potentially skip the first n if this is a resume operation
+// final int connectionNumSkip = (int)tracker.getTotalConnectionCount();
+//
+// // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
// final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
// @Override
// public Boolean call( final WriteEvent writeEvent ) {
@@ -705,16 +737,6 @@ public class ImportServiceImpl implements ImportService {
// }
// }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
- otherEventObservable.parallel(
- new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
- @Override
- public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
- return entityWrapperObservable.doOnNext(doWork);
- }
- }, Schedulers.io()).toBlocking().last();
-
- jp.close();
-
logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote others for file {}\n",
fileImport.getFileName());
@@ -852,6 +874,7 @@ public class ImportServiceImpl implements ImportService {
EntityManager em;
EntityManager rootEm;
FileImport fileImport;
+ FileImportTracker tracker;
boolean entitiesOnly;
@@ -860,17 +883,19 @@ public class ImportServiceImpl implements ImportService {
EntityManager em,
EntityManager rootEm,
FileImport fileImport,
+ FileImportTracker tracker,
boolean entitiesOnly) {
this.jp = parser;
this.em = em;
this.rootEm = rootEm;
this.fileImport = fileImport;
+ this.tracker = tracker;
this.entitiesOnly = entitiesOnly;
}
- @Override
+ @Override
public void call(final Subscriber<? super WriteEvent> subscriber) {
process(subscriber);
}
@@ -910,7 +935,7 @@ public class ImportServiceImpl implements ImportService {
//logger.debug("{}Got entity with uuid {}", indent, lastEntity);
WriteEvent event = new EntityEvent(uuid, collectionType, entityMap);
- subscriber.onNext(event);
+ processWriteEvent( subscriber, event );
}
} else if (token.equals(JsonToken.START_OBJECT) && "connections".equals(name)) {
@@ -929,7 +954,7 @@ public class ImportServiceImpl implements ImportService {
EntityRef entryRef = new SimpleEntityRef(target);
WriteEvent event = new ConnectionEvent(lastEntity, type, entryRef);
- subscriber.onNext(event);
+ processWriteEvent( subscriber, event );
}
}
}
@@ -945,7 +970,7 @@ public class ImportServiceImpl implements ImportService {
//new Object[] {indent, dname, dmap.size() });
WriteEvent event = new DictionaryEvent(lastEntity, dname, dmap);
- subscriber.onNext(event);
+ processWriteEvent( subscriber, event );
}
}
@@ -961,7 +986,9 @@ public class ImportServiceImpl implements ImportService {
}
}
- subscriber.onCompleted();
+ if ( subscriber != null ) {
+ subscriber.onCompleted();
+ }
logger.debug("process(): done parsing JSON");
@@ -973,9 +1000,22 @@ public class ImportServiceImpl implements ImportService {
} catch (Exception ex) {
logger.error("Error updating file import record", ex);
}
- subscriber.onError(e);
+ if ( subscriber != null ) {
+ subscriber.onError(e);
+ }
}
}
+
+ private void processWriteEvent( final Subscriber<? super WriteEvent> subscriber, WriteEvent writeEvent ) {
+
+ if ( subscriber == null ) {
+ // no Rx, just do it
+ writeEvent.doWrite(em, fileImport, tracker);
+ } else {
+ subscriber.onNext( writeEvent );
+ }
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8bef092d/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index 9a8c977..62d9e1b 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@ -419,7 +419,7 @@ public class ImportCollectionIT {
}});
}});
- int maxRetries = 20;
+ int maxRetries = 120;
int retries = 0;
while ( !importService.getState( importUUID ).equals( "FINISHED" ) && retries++ < maxRetries ) {
logger.debug("Waiting for import...");
@@ -459,7 +459,7 @@ public class ImportCollectionIT {
}});
}});
- int maxRetries = 20;
+ int maxRetries = 120;
int retries = 0;
while ( !exportService.getState( exportUUID ).equals( "FINISHED" ) && retries++ < maxRetries ) {
logger.debug("Waiting for export...");
[2/4] incubator-usergrid git commit: Merge branch 'two-dot-o-import'
of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
two-dot-o-import
Posted by sn...@apache.org.
Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import
Conflicts:
stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9e76c56a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9e76c56a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9e76c56a
Branch: refs/heads/two-dot-o-import
Commit: 9e76c56a557dcfe86af97177a29fcadb379792a7
Parents: 8bef092 6855e17
Author: Dave Johnson <dm...@apigee.com>
Authored: Mon Feb 9 18:10:49 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Mon Feb 9 18:10:49 2015 -0500
----------------------------------------------------------------------
.../organizations/OrganizationResource.java | 104 ------------
.../applications/ApplicationResource.java | 129 +++++++--------
.../rest/management/ImportResourceIT.java | 90 +----------
.../management/export/ExportServiceImpl.java | 16 +-
.../management/importer/ImportServiceImpl.java | 157 ++++++++++---------
.../management/importer/ImportCollectionIT.java | 47 +-----
6 files changed, 178 insertions(+), 365 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9e76c56a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 06be766,b59492f..a613fdb
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@@ -852,18 -821,20 +843,20 @@@ public class ImportServiceImpl implemen
em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
} catch (Exception e) {
+ logger.error("Error writing dictionary", e);
- fileImport.setErrorMessage(e.getMessage());
- try {
-
- rootEm.update(fileImport);
- } catch (Exception ex) {
-
- // TODO should we abort at this point?
- logger.error("Error updating file import report with error message: "
- + fileImport.getErrorMessage(), ex);
- }
+ //TODO add statistics for dictionary writes and failures
-// logger.error("Error writing dictionary", e);
+ // fileImport.setErrorMessage(e.getMessage());
+ // try {
+ //
+ // rootEm.update(fileImport);
+ //
+ // } catch (Exception ex) {
+ //
+ // // TODO should we abort at this point?
+ // logger.error("Error updating file import report with error message: "
+ // + fileImport.getErrorMessage(), ex);
+ // }
}
}
}
@@@ -904,85 -873,115 +898,108 @@@
private void process(final Subscriber<? super WriteEvent> subscriber) {
try {
- boolean done = false;
-
// we ignore imported entity type information, entities get the type of the collection
- String collectionType = InflectionUtils.singularize( fileImport.getCollectionName() );
-
- Stack tokenStack = new Stack();
+ Stack<JsonToken> objectStartStack = new Stack();
+ Stack<String> objectNameStack = new Stack();
EntityRef lastEntity = null;
- while (!done) {
+ // String collectionName = null;
+ String entityType = null;
+
+ while ( true ) {
+
JsonToken token = jp.nextToken();
- String name = jp.getCurrentName();
- String indent = "";
- for (int i = 0; i < tokenStack.size(); i++) {
- indent += " ";
+ //nothing left to do.
+ if ( token == null ) {
+ break;
}
- //logger.debug("{}Token {} name {}", new Object[]{indent, token, name});
+ String name = jp.getCurrentName();
- if (token.equals(JsonToken.START_OBJECT) && "Metadata".equals(name)) {
- Map<String, Object> entityMap = jp.readValueAs(HashMap.class);
+ //start of an object with a field name
- UUID uuid = UUID.fromString((String) entityMap.get("uuid"));
- lastEntity = new SimpleEntityRef(collectionType, uuid);
+ if ( token.equals( JsonToken.START_OBJECT ) ) {
- if (entitiesOnly) {
- //logger.debug("{}Got entity with uuid {}", indent, lastEntity);
+ objectStartStack.push( token );
- WriteEvent event = new EntityEvent(uuid, collectionType, entityMap);
- processWriteEvent( subscriber, event );
+ //nothing to do
+ if ( name == null ) {
+ continue;
}
- } else if (token.equals(JsonToken.START_OBJECT) && "connections".equals(name)) {
- Map<String, Object> connectionMap = jp.readValueAs(HashMap.class);
+ if ( "Metadata".equals( name ) ) {
- for (String type : connectionMap.keySet()) {
- List targets = (List) connectionMap.get(type);
-
+ Map<String, Object> entityMap = jp.readValueAs( HashMap.class );
- for (Object targetObject : targets) {
- UUID target = UUID.fromString((String) targetObject);
+ UUID uuid = UUID.fromString( ( String ) entityMap.get( "uuid" ) );
+ lastEntity = new SimpleEntityRef( entityType, uuid );
- if (!entitiesOnly) {
- //logger.debug("{}Got connection {} to {}",
- //new Object[]{indent, type, target.toString()});
+ if ( entitiesOnly ) {
+ //logger.debug("{}Got entity with uuid {}", indent, lastEntity);
- EntityRef entryRef = new SimpleEntityRef(target);
- WriteEvent event = new ConnectionEvent(lastEntity, type, entryRef);
- processWriteEvent( subscriber, event );
- }
- WriteEvent event = new EntityEvent( uuid, entityType, entityMap );
- subscriber.onNext( event );
++ WriteEvent event = new EntityEvent(uuid, entityType, entityMap);
++ processWriteEvent( subscriber, event);
}
-
+ objectStartStack.pop();
}
+ else if ( "connections".equals( name ) ) {
- } else if (token.equals(JsonToken.START_OBJECT) && "dictionaries".equals(name)) {
-
+ Map<String, Object> connectionMap = jp.readValueAs( HashMap.class );
- Map<String, Object> dictionariesMap = jp.readValueAs(HashMap.class);
- for (String dname : dictionariesMap.keySet()) {
- Map dmap = (Map) dictionariesMap.get(dname);
+ for ( String type : connectionMap.keySet() ) {
+ List targets = ( List ) connectionMap.get( type );
- if (!entitiesOnly) {
- //logger.debug("{}Got dictionary {} size {}",
- //new Object[] {indent, dname, dmap.size() });
+ for ( Object targetObject : targets ) {
+ UUID target = UUID.fromString( ( String ) targetObject );
- WriteEvent event = new DictionaryEvent(lastEntity, dname, dmap);
- processWriteEvent( subscriber, event );
+ if ( !entitiesOnly ) {
+ //logger.debug("{}Got connection {} to {}",
- //new Object[]{indent, type, target.toString()});
++ //new Object[]{indent, type, target.toString()});
+
- EntityRef entryRef = new SimpleEntityRef( target );
- WriteEvent event = new ConnectionEvent( lastEntity, type, entryRef );
- subscriber.onNext( event );
++ EntityRef entryRef = new SimpleEntityRef(target);
++ WriteEvent event = new ConnectionEvent(lastEntity, type, entryRef);
++ processWriteEvent(subscriber, event);
+ }
+ }
}
- }
- } else if (token.equals(JsonToken.START_OBJECT)) {
- tokenStack.push(token);
+ objectStartStack.pop();
- }
- else if ( "dictionaries".equals( name ) ) {
+
- } else if (token.equals(JsonToken.END_OBJECT)) {
- tokenStack.pop();
- }
++ } else if ( "dictionaries".equals(name) ) {
+
+
+ Map<String, Object> dictionariesMap = jp.readValueAs( HashMap.class );
+ for ( String dname : dictionariesMap.keySet() ) {
+ Map dmap = ( Map ) dictionariesMap.get( dname );
+
+ if ( !entitiesOnly ) {
+ //logger.debug("{}Got dictionary {} size {}",
- //new Object[] {indent, dname, dmap.size() });
++ //new Object[] {indent, dname, dmap.size() });
+
- WriteEvent event = new DictionaryEvent( lastEntity, dname, dmap );
- subscriber.onNext( event );
++ WriteEvent event = new DictionaryEvent(lastEntity, dname, dmap);
++ processWriteEvent(subscriber, event);
+ }
+ }
+
+ objectStartStack.pop();
- }
- //push onto object names we don't immediately understand. Used for parent detection
- else{
++
++ } else {
++ // push onto object names we don't immediately understand. Used for parent detection
+ objectNameStack.push( name );
+ }
+
- }
-
- else if (token.equals( JsonToken.START_ARRAY )){
++ } else if (token.equals( JsonToken.START_ARRAY )) {
+ if( objectNameStack.size() == 1 && COLLECTION_OBJECT_NAME.equals( objectNameStack.peek() )) {
+ entityType = InflectionUtils.singularize( name );
+ }
- }
- if (token.equals(JsonToken.END_ARRAY) && tokenStack.isEmpty()) {
- done = true;
- else if ( token.equals( JsonToken.END_OBJECT ) ) {
++ } else if ( token.equals( JsonToken.END_OBJECT ) ) {
+ objectStartStack.pop();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9e76c56a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --cc stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index 62d9e1b,22ad931..24ed292
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@@ -337,10 -298,10 +298,10 @@@ public class ImportCollectionIT
try {
-- // create 10 applications each with collection of 10 things, export all to S3
++ // create 4 applications each with collection of 10 things, export all to S3
logger.debug("\n\nCreating 10 applications with 10 entities each\n");
-- for (int i = 0; i < 10; i++) {
++ for (int i = 0; i < 4; i++) {
String appName = "import-test-" + i + RandomStringUtils.randomAlphanumeric(10);
UUID appId = setup.getMgmtSvc().createApplication(organization.getUuid(), appName).getId();
@@@ -354,7 -315,7 +315,7 @@@
}
// import all those exports from S3 into the default test application
-- logger.debug("\n\nCreating 10 applications with 10 entities each\n");
++ logger.debug("\n\nImporting\n");
final EntityManager emDefaultApp = setup.getEmf().getEntityManager(applicationId);
importCollection(emDefaultApp, "things");
@@@ -370,7 -331,7 +331,7 @@@
assertTrue(!importedThings.isEmpty());
-- assertEquals(100, importedThings.size());
++ assertEquals(40, importedThings.size());
} finally {
deleteBucket();
[4/4] incubator-usergrid git commit: Merge branch 'two-dot-o-import'
of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
two-dot-o-import
Posted by sn...@apache.org.
Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import
Conflicts:
stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/62deba3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/62deba3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/62deba3f
Branch: refs/heads/two-dot-o-import
Commit: 62deba3fa9891880047fb0dad7b89b09e24e3e8e
Parents: 2428b86 93cbeb5
Author: Dave Johnson <dm...@apigee.com>
Authored: Tue Feb 10 07:27:44 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Tue Feb 10 07:27:44 2015 -0500
----------------------------------------------------------------------
.../persistence/entities/FileImport.java | 20 +-
.../applications/ApplicationResource.java | 5 +
.../rest/management/ImportResourceIT.java | 371 +++----------------
.../management/importer/ImportServiceImpl.java | 115 +++---
.../management/importer/ImportCollectionIT.java | 7 +-
.../resources/usergrid-custom-test.properties | 5 +-
6 files changed, 128 insertions(+), 395 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/62deba3f/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 2d611c5,b90d835..5edaab7
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@@ -173,12 -174,11 +175,11 @@@ public class ImportServiceImpl implemen
}
// create a FileImport entity to store metadata about the fileImport job
- String collectionName = config.get("collectionName").toString();
UUID applicationId = (UUID)config.get("applicationId");
- FileImport fileImport = new FileImport( file, applicationId, collectionName );
+ FileImport fileImport = new FileImport( file, applicationId );
- fileImport = rootEm.create(fileImport);
+ fileImport = emManagementApp.create(fileImport);
- Import importUG = rootEm.get(importRef, Import.class);
+ Import importEntity = emManagementApp.get(importRef, Import.class);
try {
// create a connection between the main import job and the sub FileImport Job
@@@ -213,6 -206,30 +214,29 @@@
}
+ private int getConnectionCount( final Import importRoot ) {
+
+ try {
+ EntityManager rootEm = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
+
+ Results entities = rootEm.getConnectedEntities( importRoot, "includes", null, Level.ALL_PROPERTIES );
+ PagingResultsIterator itr = new PagingResultsIterator( entities );
+
+ int count = 0;
+
+ while ( itr.hasNext() ) {
+ itr.next();
+ count++;
+ }
+
-
+ return count;
+ }
+ catch ( Exception e ) {
+ logger.error( "application doesn't exist within the current context" );
+ throw new RuntimeException( e );
+ }
+ }
+
/**
* Schedule the file tasks. This must happen in 2 phases. The first is linking the sub files to the master the
* second is scheduling them to run.
@@@ -454,25 -471,16 +478,21 @@@
fileJobs.add( jobData) ;
}
-
- //TODO: A hack, but we have to be sure all the connections are consistent before we schedule jobs
- for(int i = 0; i < 20; i ++){
- final int count = getConnectionCount(rootImportTask);
-
- if(count == fileJobs.size()){
- break;
+ int retries = 0;
+ int maxRetries = 60;
+ Results entities;
+ boolean done = false;
+ while ( !done && retries++ < maxRetries ) {
+
- entities = emManagementApp.getConnectedEntities(
- importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
-
- logger.debug("Found {} jobs", entities.size());
-
- if ( entities.size() == fileJobs.size() ) {
++ final int count = getConnectionCount(importEntity);
++ if ( count == fileJobs.size() ) {
+ done = true;
+ } else {
+ Thread.sleep(1000);
}
-
- Thread.sleep( 1000 );
+ }
+ if ( retries >= maxRetries ) {
+ throw new RuntimeException("Max retries was reached");
}
// schedule each job
@@@ -702,27 -667,29 +722,28 @@@
// TODO: move JSON parser into observable creation so open/close happens within the stream
final JsonEntityParserObservable jsonObservableEntities =
- new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
+ new JsonEntityParserObservable(jp, em, rootEm, fileImport, tracker, entitiesOnly);
+
- jsonObservableEntities.call( null );
+ final Observable<WriteEvent> entityEventObservable = Observable.create(jsonObservableEntities);
- // final Observable<WriteEvent> entityEventObservable = Observable.create(jsonObservableEntities);
- //
- // // only take while our stats tell us we should continue processing
- // // potentially skip the first n if this is a resume operation
- // final int entityNumSkip = (int)tracker.getTotalEntityCount();
- //
- // final int entityCount = entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
- // @Override
- // public Boolean call( final WriteEvent writeEvent ) {
- // return !tracker.shouldStopProcessingEntities();
- // }
- // } ).skip(entityNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
- // @Override
- // public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
- // return entityWrapperObservable.doOnNext(doWork);
- // }
- // }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+ // only take while our stats tell us we should continue processing
+ // potentially skip the first n if this is a resume operation
+ final int entityNumSkip = (int)tracker.getTotalEntityCount();
+
+ // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
+ final int entityCount = entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
+ @Override
+ public Boolean call( final WriteEvent writeEvent ) {
+ return !tracker.shouldStopProcessingEntities();
+ }
+ } ).skip(entityNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+ @Override
+ public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+ return entityWrapperObservable.doOnNext(doWork);
+ }
+ }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+
-
-
+ jp.close();
logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote entities\n");
@@@ -737,28 -704,28 +758,28 @@@
// TODO: move JSON parser into observable creation so open/close happens within the stream
final JsonEntityParserObservable jsonObservableOther =
- new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
+ new JsonEntityParserObservable(jp, em, rootEm, fileImport, tracker, entitiesOnly);
- jsonObservableOther.call( null );
+ final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
- // final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
- //
- // // only take while our stats tell us we should continue processing
- // // potentially skip the first n if this is a resume operation
- // final int connectionNumSkip = (int)tracker.getTotalConnectionCount();
- //
- // // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
- // final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
- // @Override
- // public Boolean call( final WriteEvent writeEvent ) {
- // return !tracker.shouldStopProcessingConnections();
- // }
- // } ).skip(connectionNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
- // @Override
- // public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
- // return entityWrapperObservable.doOnNext(doWork);
- // }
- // }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+ // only take while our stats tell us we should continue processing
+ // potentially skip the first n if this is a resume operation
+ final int connectionNumSkip = (int)tracker.getTotalConnectionCount();
+
+ // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
+ final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
+ @Override
+ public Boolean call( final WriteEvent writeEvent ) {
+ return !tracker.shouldStopProcessingConnections();
+ }
+ } ).skip(connectionNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+ @Override
+ public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+ return entityWrapperObservable.doOnNext(doWork);
+ }
+ }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+
+ jp.close();
logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote others for file {}\n",
fileImport.getFileName());
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/62deba3f/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --cc stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index a9fd7db,22ad931..334f465
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@@ -54,6 -54,6 +54,7 @@@ import org.slf4j.LoggerFactory
import java.util.*;
import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@@ -321,12 -324,14 +322,14 @@@ public class ImportCollectionIT
logger.debug("\n\nQuery to see if we now have 100 entities\n");
-
Query query = Query.fromQL("select *").withLimit(101);
++
List<Entity> importedThings = emDefaultApp.getCollection(
emDefaultApp.getApplicationId(), "things", query, Level.ALL_PROPERTIES).getEntities();
-
-- assertTrue(!importedThings.isEmpty());
-- assertEquals(100, importedThings.size());
++ assertNotNull("importedThings must not be null", !importedThings.isEmpty());
++ assertTrue("importedThings must not be empty", !importedThings.isEmpty());
++ assertEquals("there must be 100 importedThings", 100, importedThings.size());
} finally {
deleteBucket();
[3/4] incubator-usergrid git commit: After the Import->FileImport
connections are made, wait for them to take effect before scheduling jobs.
Posted by sn...@apache.org.
After the Import->FileImport connections are made, wait for them to take effect before scheduling jobs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2428b862
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2428b862
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2428b862
Branch: refs/heads/two-dot-o-import
Commit: 2428b862a4b6993e89376e580dcd3dd2955182aa
Parents: 9e76c56
Author: Dave Johnson <dm...@apigee.com>
Authored: Mon Feb 9 19:57:33 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Mon Feb 9 19:57:33 2015 -0500
----------------------------------------------------------------------
.../management/importer/ImportServiceImpl.java | 122 ++++++++++++-------
.../management/importer/ImportCollectionIT.java | 9 +-
2 files changed, 78 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2428b862/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index a613fdb..2d611c5 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -183,6 +183,13 @@ public class ImportServiceImpl implements ImportService {
try {
// create a connection between the main import job and the sub FileImport Job
emManagementApp.createConnection(importEntity, "includes", fileImport);
+
+ logger.debug("Created connection from {}:{} to {}:{}",
+ new Object[] {
+ importEntity.getType(), importEntity.getUuid(),
+ fileImport.getType(), fileImport.getUuid()
+ });
+
} catch (Exception e) {
logger.error(e.getMessage());
return null;
@@ -202,8 +209,6 @@ public class ImportServiceImpl implements ImportService {
fileImport.setState(FileImport.State.SCHEDULED);
emManagementApp.update(fileImport);
- emf.refreshIndex();
-
return jobData;
}
@@ -372,12 +377,12 @@ public class ImportServiceImpl implements ImportService {
EntityManager emManagementApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
- Import rootImportTask = emManagementApp.get(importId, Import.class);
+ Import importEntity = emManagementApp.get(importId, Import.class);
- rootImportTask.setState(Import.State.STARTED);
- rootImportTask.setStarted(System.currentTimeMillis());
- rootImportTask.setErrorMessage( " " );
- emManagementApp.update(rootImportTask);
+ importEntity.setState(Import.State.STARTED);
+ importEntity.setStarted(System.currentTimeMillis());
+ importEntity.setErrorMessage(" ");
+ emManagementApp.update(importEntity);
logger.debug("doImport(): updated state");
// if no S3 importer was passed in then create one
@@ -392,9 +397,9 @@ public class ImportServiceImpl implements ImportService {
}
} catch (Exception e) {
logger.error("doImport(): Error creating S3Import", e);
- rootImportTask.setErrorMessage(e.getMessage());
- rootImportTask.setState( Import.State.FAILED );
- emManagementApp.update(rootImportTask);
+ importEntity.setErrorMessage(e.getMessage());
+ importEntity.setState(Import.State.FAILED);
+ emManagementApp.update(importEntity);
return;
}
@@ -405,9 +410,9 @@ public class ImportServiceImpl implements ImportService {
if (config.get("organizationId") == null) {
logger.error("doImport(): No organization could be found");
- rootImportTask.setErrorMessage( "No organization could be found" );
- rootImportTask.setState( Import.State.FAILED );
- emManagementApp.update(rootImportTask);
+ importEntity.setErrorMessage("No organization could be found");
+ importEntity.setState(Import.State.FAILED);
+ emManagementApp.update(importEntity);
return;
} else {
@@ -422,9 +427,9 @@ public class ImportServiceImpl implements ImportService {
}
} catch (OrganizationNotFoundException | ApplicationNotFoundException e) {
- rootImportTask.setErrorMessage( e.getMessage() );
- rootImportTask.setState( Import.State.FAILED );
- emManagementApp.update(rootImportTask);
+ importEntity.setErrorMessage(e.getMessage());
+ importEntity.setState(Import.State.FAILED);
+ emManagementApp.update(importEntity);
return;
}
@@ -432,9 +437,9 @@ public class ImportServiceImpl implements ImportService {
// schedule a FileImport job for each file found in the bucket
if ( bucketFiles.isEmpty() ) {
- rootImportTask.setState( Import.State.FINISHED );
- rootImportTask.setErrorMessage( "No files found in the bucket: " + bucketName );
- emManagementApp.update(rootImportTask);
+ importEntity.setState(Import.State.FINISHED);
+ importEntity.setErrorMessage("No files found in the bucket: " + bucketName);
+ emManagementApp.update(importEntity);
} else {
@@ -445,10 +450,31 @@ public class ImportServiceImpl implements ImportService {
// create the Entity Connection and set up metadata for each job
for ( String bucketFile : bucketFiles ) {
- final JobData jobData = createFileTask(config, bucketFile, rootImportTask);
+ final JobData jobData = createFileTask(config, bucketFile, importEntity);
fileJobs.add( jobData) ;
}
+ int retries = 0;
+ int maxRetries = 60;
+ Results entities;
+ boolean done = false;
+ while ( !done && retries++ < maxRetries ) {
+
+ entities = emManagementApp.getConnectedEntities(
+ importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
+
+ logger.debug("Found {} jobs", entities.size());
+
+ if ( entities.size() == fileJobs.size() ) {
+ done = true;
+ } else {
+ Thread.sleep(1000);
+ }
+ }
+ if ( retries >= maxRetries ) {
+ throw new RuntimeException("Max retries was reached");
+ }
+
// schedule each job
for ( JobData jobData: fileJobs ) {
@@ -462,9 +488,9 @@ public class ImportServiceImpl implements ImportService {
}
fileMetadata.put("files", value);
- rootImportTask.addProperties(fileMetadata);
- rootImportTask.setFileCount( fileJobs.size() );
- emManagementApp.update(rootImportTask);
+ importEntity.addProperties(fileMetadata);
+ importEntity.setFileCount(fileJobs.size());
+ emManagementApp.update(importEntity);
}
}
@@ -546,31 +572,35 @@ public class ImportServiceImpl implements ImportService {
String randTag = RandomStringUtils.randomAlphanumeric(4);
logger.debug("{} Got importEntity {}", randTag, importEntity.getUuid() );
- int retries = 0;
- int maxRetries = 60;
- Results entities = null;
- boolean done = false;
- while ( !done && retries++ < maxRetries ) {
-
- // get all file import job siblings of the current job we're working now
- entities = emManagementApp.getConnectedEntities(
- importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
+ Results entities = emManagementApp.getConnectedEntities(
+ importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
- if ( entities.size() == importEntity.getFileCount() ) {
- logger.debug("{} got {} file_import entities, expected {} DONE!",
- new Object[] { randTag, entities.size(), importEntity.getFileCount() });
- done = true;
- } else {
- logger.debug("{} got {} file_import entities, expected {} waiting... ",
- new Object[] { randTag, entities.size(), importEntity.getFileCount() });
- Thread.sleep(1000);
- }
- }
-
- if ( retries >= maxRetries ) {
- throw new RuntimeException("Max retries was reached");
- }
+// int retries = 0;
+// int maxRetries = 60;
+// Results entities = null;
+// boolean done = false;
+// while ( !done && retries++ < maxRetries ) {
+//
+// // get all file import job siblings of the current job we're working now
+// entities = emManagementApp.getConnectedEntities(
+// importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
+//
+// if ( entities.size() == importEntity.getFileCount() ) {
+// logger.debug("{} got {} file_import entities, expected {} DONE!",
+// new Object[] { randTag, entities.size(), importEntity.getFileCount() });
+// done = true;
+//
+// } else {
+// logger.debug("{} got {} file_import entities, expected {} waiting... ",
+// new Object[] { randTag, entities.size(), importEntity.getFileCount() });
+// Thread.sleep(1000);
+// }
+// }
+//
+// if ( retries >= maxRetries ) {
+// throw new RuntimeException("Max retries was reached");
+// }
PagingResultsIterator itr = new PagingResultsIterator( entities );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2428b862/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index 24ed292..a9fd7db 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@ -285,9 +285,6 @@ public class ImportCollectionIT {
}
-
-
-
/**
* Simple import test but with multiple files.
*/
@@ -301,7 +298,7 @@ public class ImportCollectionIT {
// create 4 applications each with collection of 10 things, export all to S3
logger.debug("\n\nCreating 10 applications with 10 entities each\n");
- for (int i = 0; i < 4; i++) {
+ for (int i = 0; i < 10; i++) {
String appName = "import-test-" + i + RandomStringUtils.randomAlphanumeric(10);
UUID appId = setup.getMgmtSvc().createApplication(organization.getUuid(), appName).getId();
@@ -324,14 +321,12 @@ public class ImportCollectionIT {
logger.debug("\n\nQuery to see if we now have 100 entities\n");
-
Query query = Query.fromQL("select *").withLimit(101);
List<Entity> importedThings = emDefaultApp.getCollection(
emDefaultApp.getApplicationId(), "things", query, Level.ALL_PROPERTIES).getEntities();
-
assertTrue(!importedThings.isEmpty());
- assertEquals(40, importedThings.size());
+ assertEquals(100, importedThings.size());
} finally {
deleteBucket();