You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/02/10 14:44:24 UTC

[1/4] incubator-usergrid git commit: FileImportJob will now wait for other Import -> FileImport connections to be made before making the "are we done yet" call.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-import 93cbeb5b8 -> 62deba3fa


FileImportJob will now wait for other Import -> FileImport connections to be made before making the "are we done yet" call.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8bef092d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8bef092d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8bef092d

Branch: refs/heads/two-dot-o-import
Commit: 8bef092d275381427331a9319355a81db5a6d9cf
Parents: 18671e7
Author: Dave Johnson <dm...@apigee.com>
Authored: Mon Feb 9 17:41:58 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Mon Feb 9 17:41:58 2015 -0500

----------------------------------------------------------------------
 .../usergrid/persistence/entities/Import.java   |  20 ++-
 .../management/export/S3ExportImpl.java         |   2 -
 .../management/importer/ImportServiceImpl.java  | 162 ++++++++++++-------
 .../management/importer/ImportCollectionIT.java |   4 +-
 4 files changed, 121 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8bef092d/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Import.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Import.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Import.java
index d7a8751..83048b9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Import.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Import.java
@@ -28,12 +28,15 @@ import javax.xml.bind.annotation.XmlRootElement;
 @XmlRootElement
 public class Import extends TypedEntity {
 
-    //canceled  and expired states aren't used in current iteration.
+   //canceled  and expired states aren't used in current iteration.
     public static enum State {
         CREATED, FAILED, SCHEDULED, STARTED, FINISHED, CANCELED, EXPIRED
     }
 
     @EntityProperty
+    private int fileCount;
+
+    @EntityProperty
     protected State curState;
 
     /**
@@ -49,9 +52,22 @@ public class Import extends TypedEntity {
     protected String errorMessage;
 
 
-    public Import() {
+    public Import() {}
+
+
+    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+    @EntityProperty
+    public void setFileCount(int fileCount) {
+        this.fileCount = fileCount;
     }
 
+    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+    @EntityProperty
+    public int getFileCount() {
+        return fileCount;
+    }
+
+
     /**
      * get the started time for the import job
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8bef092d/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
index 0b1fac5..e27788b 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
@@ -19,10 +19,8 @@ package org.apache.usergrid.management.export;
 
 import com.amazonaws.SDKGlobalConfiguration;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Module;
 import org.jclouds.ContextBuilder;
-import org.jclouds.blobstore.AsyncBlobStore;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
 import org.jclouds.blobstore.domain.Blob;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8bef092d/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index f724536..06be766 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -18,6 +18,7 @@
 package org.apache.usergrid.management.importer;
 
 import com.amazonaws.SDKGlobalConfiguration;
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.usergrid.batch.JobExecution;
 import org.apache.usergrid.batch.service.SchedulerService;
 import org.apache.usergrid.corepersistence.CpSetup;
@@ -45,9 +46,7 @@ import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Subscriber;
 import rx.functions.Action1;
-import rx.functions.Func1;
 import rx.functions.Func2;
-import rx.schedulers.Schedulers;
 
 import javax.annotation.PostConstruct;
 import java.io.File;
@@ -168,10 +167,10 @@ public class ImportServiceImpl implements ImportService {
         logger.debug("scheduleFile() for import {}:{} file {}",
             new Object[]{importRef.getType(), importRef.getType(), file});
 
-        EntityManager rootEm = null;
+        EntityManager emManagementApp = null;
 
         try {
-            rootEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+            emManagementApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
         } catch (Exception e) {
             logger.error("application doesn't exist within the current context");
             return null;
@@ -181,33 +180,33 @@ public class ImportServiceImpl implements ImportService {
         String collectionName = config.get("collectionName").toString();
         UUID applicationId = (UUID)config.get("applicationId");
         FileImport fileImport = new FileImport( file, applicationId, collectionName );
-        fileImport = rootEm.create(fileImport);
+        fileImport = emManagementApp.create(fileImport);
 
-        Import importUG = rootEm.get(importRef, Import.class);
+        Import importEntity = emManagementApp.get(importRef, Import.class);
 
         try {
             // create a connection between the main import job and the sub FileImport Job
-            rootEm.createConnection(importUG, "includes", fileImport);
+            emManagementApp.createConnection(importEntity, "includes", fileImport);
         } catch (Exception e) {
             logger.error(e.getMessage());
             return null;
         }
 
         // mark the File Import Job as created
-        fileImport.setState( FileImport.State.CREATED );
-        rootEm.update( fileImport );
+        fileImport.setState(FileImport.State.CREATED);
+        emManagementApp.update( fileImport );
 
         // set data to be transferred to the FileImport Job
         JobData jobData = new JobData();
-        jobData.setProperty( "File", file );
+        jobData.setProperty("File", file);
         jobData.setProperty(FILE_IMPORT_ID, fileImport.getUuid());
         jobData.addProperties(config);
 
         // update state of the job to Scheduled
         fileImport.setState(FileImport.State.SCHEDULED);
-        rootEm.update(fileImport);
+        emManagementApp.update(fileImport);
 
-        rootEm.refreshIndex();
+        emf.refreshIndex();
 
         return jobData;
     }
@@ -453,7 +452,7 @@ public class ImportServiceImpl implements ImportService {
             // create the Entity Connection and set up metadata for each job
 
             for ( String bucketFile : bucketFiles ) {
-                final JobData jobData = createFileTask( config, bucketFile, rootImportTask );
+                final JobData jobData = createFileTask(config, bucketFile, rootImportTask);
                 fileJobs.add( jobData) ;
             }
 
@@ -470,7 +469,8 @@ public class ImportServiceImpl implements ImportService {
             }
 
             fileMetadata.put("files", value);
-            rootImportTask.addProperties( fileMetadata );
+            rootImportTask.addProperties(fileMetadata);
+            rootImportTask.setFileCount( fileJobs.size() );
             emManagementApp.update(rootImportTask);
         }
     }
@@ -543,17 +543,50 @@ public class ImportServiceImpl implements ImportService {
         // mark ImportJob FINISHED but only if all other FileImportJobs are complete
 
         // get parent import job of this file import job
+
         Results importJobResults =
             emManagementApp.getConnectingEntities( fileImport, "includes", null, Level.ALL_PROPERTIES );
         List<Entity> importEntities = importJobResults.getEntities();
         UUID importId = importEntities.get( 0 ).getUuid();
         Import importEntity = emManagementApp.get( importId, Import.class );
 
-        // get all file import job siblings of the current job we're working now
-        Results entities = emManagementApp.getConnectedEntities( importEntity, "includes", null, Level.ALL_PROPERTIES );
+        String randTag = RandomStringUtils.randomAlphanumeric(4);
+        logger.debug("{} Got importEntity {}", randTag, importEntity.getUuid() );
+
+        int retries = 0;
+        int maxRetries = 60;
+        Results entities = null;
+        boolean done = false;
+        while ( !done && retries++ < maxRetries ) {
+
+            // get all file import job siblings of the current job we're working now
+            entities = emManagementApp.getConnectedEntities(
+                importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
+
+            if ( entities.size() == importEntity.getFileCount() ) {
+                logger.debug("{} got {} file_import entities, expected {} DONE!",
+                    new Object[] { randTag, entities.size(), importEntity.getFileCount() });
+                done = true;
+
+            } else {
+                logger.debug("{} got {} file_import entities, expected {} waiting... ",
+                    new Object[] { randTag, entities.size(), importEntity.getFileCount() });
+                Thread.sleep(1000);
+            }
+        }
+
+        if ( retries >= maxRetries ) {
+            throw new RuntimeException("Max retries was reached");
+        }
+
+
         PagingResultsIterator itr = new PagingResultsIterator( entities );
 
+        logger.debug("{} Check {} jobs to see if we are done for file {}",
+            new Object[] { randTag, entities.size(), fileImport.getFileName() } );
+
         int failCount = 0;
+        int successCount = 0;
         while ( itr.hasNext() ) {
             FileImport fi = ( FileImport ) itr.next();
             switch ( fi.getState() ) {
@@ -561,13 +594,18 @@ public class ImportServiceImpl implements ImportService {
                     failCount++;
                     break;
                 case FINISHED:   // finished, we can continue checking
-                    break;
+                    successCount++;
+                    continue;
                 default:         // not something we recognize as complete, short circuit
+                    logger.debug("{} not done yet, bail out...", randTag);
                     return;
             }
         }
 
+        logger.debug("{} successCount = {} failCount = {}", new Object[] { randTag, successCount, failCount } );
+
         if ( failCount == 0 ) {
+            logger.debug("{} FINISHED", randTag);
             importEntity.setState(Import.State.FINISHED);
         }  else {
             // we had failures, set it to failed
@@ -641,35 +679,27 @@ public class ImportServiceImpl implements ImportService {
 
         // TODO: move JSON parser into observable creation so open/close happens within the stream
         final JsonEntityParserObservable jsonObservableEntities =
-            new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
-        final Observable<WriteEvent> entityEventObservable = Observable.create(jsonObservableEntities);
+            new JsonEntityParserObservable(jp, em, rootEm, fileImport, tracker, entitiesOnly);
 
-        // only take while our stats tell us we should continue processing
-        // potentially skip the first n if this is a resume operation
-        final int entityNumSkip = (int)tracker.getTotalEntityCount();
+        jsonObservableEntities.call( null );
 
-        // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
+//        final Observable<WriteEvent> entityEventObservable = Observable.create(jsonObservableEntities);
+//
+//        // only take while our stats tell us we should continue processing
+//        // potentially skip the first n if this is a resume operation
+//        final int entityNumSkip = (int)tracker.getTotalEntityCount();
+//
 //       final int entityCount =  entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
 //            @Override
 //            public Boolean call( final WriteEvent writeEvent ) {
 //                return !tracker.shouldStopProcessingEntities();
 //            }
 //        } ).skip(entityNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
-//            @Override
-//            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
-//                return entityWrapperObservable.doOnNext(doWork);
-//            }
-//        }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
-
-        entityEventObservable.parallel(
-            new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
-                @Override
-                public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
-                    return entityWrapperObservable.doOnNext(doWork);
-                }
-            }, Schedulers.io()).toBlocking().last();
-
-        jp.close();
+//           @Override
+//           public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+//               return entityWrapperObservable.doOnNext(doWork);
+//           }
+//       }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
 
         logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote entities\n");
 
@@ -684,15 +714,17 @@ public class ImportServiceImpl implements ImportService {
 
         // TODO: move JSON parser into observable creation so open/close happens within the stream
         final JsonEntityParserObservable jsonObservableOther =
-            new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
-
-        final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
+            new JsonEntityParserObservable(jp, em, rootEm, fileImport, tracker, entitiesOnly);
 
-        // only take while our stats tell us we should continue processing
-        // potentially skip the first n if this is a resume operation
-        final int connectionNumSkip = (int)tracker.getTotalConnectionCount();
+        jsonObservableOther.call( null );
 
-        // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
+//        final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
+//
+//        // only take while our stats tell us we should continue processing
+//        // potentially skip the first n if this is a resume operation
+//        final int connectionNumSkip = (int)tracker.getTotalConnectionCount();
+//
+//        // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
 //        final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
 //            @Override
 //            public Boolean call( final WriteEvent writeEvent ) {
@@ -705,16 +737,6 @@ public class ImportServiceImpl implements ImportService {
 //            }
 //        }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
 
-       otherEventObservable.parallel(
-            new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
-                @Override
-                public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
-                    return entityWrapperObservable.doOnNext(doWork);
-                }
-            }, Schedulers.io()).toBlocking().last();
-
-        jp.close();
-
         logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote others for file {}\n",
             fileImport.getFileName());
 
@@ -852,6 +874,7 @@ public class ImportServiceImpl implements ImportService {
         EntityManager em;
         EntityManager rootEm;
         FileImport fileImport;
+        FileImportTracker tracker;
         boolean entitiesOnly;
 
 
@@ -860,17 +883,19 @@ public class ImportServiceImpl implements ImportService {
             EntityManager em,
             EntityManager rootEm,
             FileImport fileImport,
+            FileImportTracker tracker,
             boolean entitiesOnly) {
 
             this.jp = parser;
             this.em = em;
             this.rootEm = rootEm;
             this.fileImport = fileImport;
+            this.tracker = tracker;
             this.entitiesOnly = entitiesOnly;
         }
 
 
-        @Override
+       @Override
         public void call(final Subscriber<? super WriteEvent> subscriber) {
             process(subscriber);
         }
@@ -910,7 +935,7 @@ public class ImportServiceImpl implements ImportService {
                             //logger.debug("{}Got entity with uuid {}", indent, lastEntity);
 
                             WriteEvent event = new EntityEvent(uuid, collectionType, entityMap);
-                            subscriber.onNext(event);
+                            processWriteEvent( subscriber, event );
                         }
 
                     } else if (token.equals(JsonToken.START_OBJECT) && "connections".equals(name)) {
@@ -929,7 +954,7 @@ public class ImportServiceImpl implements ImportService {
 
                                     EntityRef entryRef = new SimpleEntityRef(target);
                                     WriteEvent event = new ConnectionEvent(lastEntity, type, entryRef);
-                                    subscriber.onNext(event);
+                                    processWriteEvent( subscriber, event );
                                 }
                             }
                         }
@@ -945,7 +970,7 @@ public class ImportServiceImpl implements ImportService {
                                     //new Object[] {indent, dname, dmap.size() });
 
                                 WriteEvent event = new DictionaryEvent(lastEntity, dname, dmap);
-                                subscriber.onNext(event);
+                                processWriteEvent( subscriber, event );
                             }
                         }
 
@@ -961,7 +986,9 @@ public class ImportServiceImpl implements ImportService {
                     }
                 }
 
-                subscriber.onCompleted();
+                if ( subscriber != null ) {
+                    subscriber.onCompleted();
+                }
 
                 logger.debug("process(): done parsing JSON");
 
@@ -973,9 +1000,22 @@ public class ImportServiceImpl implements ImportService {
                 } catch (Exception ex) {
                     logger.error("Error updating file import record", ex);
                 }
-                subscriber.onError(e);
+                if ( subscriber != null ) {
+                    subscriber.onError(e);
+                }
             }
         }
+
+        private void processWriteEvent( final Subscriber<? super WriteEvent> subscriber, WriteEvent writeEvent ) {
+
+            if ( subscriber == null ) {
+                // no Rx, just do it
+                writeEvent.doWrite(em, fileImport, tracker);
+            } else {
+                subscriber.onNext( writeEvent );
+            }
+        }
+
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8bef092d/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index 9a8c977..62d9e1b 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@ -419,7 +419,7 @@ public class ImportCollectionIT {
             }});
         }});
 
-        int maxRetries = 20;
+        int maxRetries = 120;
         int retries = 0;
         while ( !importService.getState( importUUID ).equals( "FINISHED" ) && retries++ < maxRetries ) {
             logger.debug("Waiting for import...");
@@ -459,7 +459,7 @@ public class ImportCollectionIT {
             }});
         }});
 
-        int maxRetries = 20;
+        int maxRetries = 120;
         int retries = 0;
         while ( !exportService.getState( exportUUID ).equals( "FINISHED" ) && retries++ < maxRetries ) {
             logger.debug("Waiting for export...");


[2/4] incubator-usergrid git commit: Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import

Posted by sn...@apache.org.
Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import

Conflicts:
	stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9e76c56a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9e76c56a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9e76c56a

Branch: refs/heads/two-dot-o-import
Commit: 9e76c56a557dcfe86af97177a29fcadb379792a7
Parents: 8bef092 6855e17
Author: Dave Johnson <dm...@apigee.com>
Authored: Mon Feb 9 18:10:49 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Mon Feb 9 18:10:49 2015 -0500

----------------------------------------------------------------------
 .../organizations/OrganizationResource.java     | 104 ------------
 .../applications/ApplicationResource.java       | 129 +++++++--------
 .../rest/management/ImportResourceIT.java       |  90 +----------
 .../management/export/ExportServiceImpl.java    |  16 +-
 .../management/importer/ImportServiceImpl.java  | 157 ++++++++++---------
 .../management/importer/ImportCollectionIT.java |  47 +-----
 6 files changed, 178 insertions(+), 365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9e76c56a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 06be766,b59492f..a613fdb
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@@ -852,18 -821,20 +843,20 @@@ public class ImportServiceImpl implemen
                  em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
  
              } catch (Exception e) {
 +                logger.error("Error writing dictionary", e);
-                 fileImport.setErrorMessage(e.getMessage());
-                 try {
- 
-                     rootEm.update(fileImport);
  
-                 } catch (Exception ex) {
- 
-                     // TODO should we abort at this point?
-                     logger.error("Error updating file import report with error message: "
-                         + fileImport.getErrorMessage(), ex);
-                 }
+                 //TODO add statistics for dictionary writes and failures
 -//                logger.error("Error writing dictionary", e);
+ //                fileImport.setErrorMessage(e.getMessage());
+ //                try {
+ //
+ //                    rootEm.update(fileImport);
+ //
+ //                } catch (Exception ex) {
+ //
+ //                    // TODO should we abort at this point?
+ //                    logger.error("Error updating file import report with error message: "
+ //                        + fileImport.getErrorMessage(), ex);
+ //                }
              }
          }
      }
@@@ -904,85 -873,115 +898,108 @@@
          private void process(final Subscriber<? super WriteEvent> subscriber) {
  
              try {
-                 boolean done = false;
  
 -
                  // we ignore imported entity type information, entities get the type of the collection
-                 String collectionType = InflectionUtils.singularize( fileImport.getCollectionName() );
- 
-                 Stack tokenStack = new Stack();
+                 Stack<JsonToken> objectStartStack = new Stack();
+                 Stack<String> objectNameStack = new Stack();
                  EntityRef lastEntity = null;
  
-                 while (!done) {
+ //                String collectionName = null;
+                 String entityType = null;
+ 
+                 while ( true ) {
+ 
  
                      JsonToken token = jp.nextToken();
-                     String name = jp.getCurrentName();
  
-                     String indent = "";
-                     for (int i = 0; i < tokenStack.size(); i++) {
-                         indent += "   ";
+                     //nothing left to do.
+                     if ( token == null ) {
+                         break;
                      }
  
-                     //logger.debug("{}Token {} name {}", new Object[]{indent, token, name});
+                     String name = jp.getCurrentName();
  
-                     if (token.equals(JsonToken.START_OBJECT) && "Metadata".equals(name)) {
  
-                         Map<String, Object> entityMap = jp.readValueAs(HashMap.class);
+                     //start of an object with a field name
  
-                         UUID uuid = UUID.fromString((String) entityMap.get("uuid"));
-                         lastEntity = new SimpleEntityRef(collectionType, uuid);
+                     if ( token.equals( JsonToken.START_OBJECT ) ) {
  
-                         if (entitiesOnly) {
-                             //logger.debug("{}Got entity with uuid {}", indent, lastEntity);
+                         objectStartStack.push( token );
  
-                             WriteEvent event = new EntityEvent(uuid, collectionType, entityMap);
-                             processWriteEvent( subscriber, event );
+                         //nothing to do
+                         if ( name == null ) {
+                             continue;
                          }
  
-                     } else if (token.equals(JsonToken.START_OBJECT) && "connections".equals(name)) {
  
-                         Map<String, Object> connectionMap = jp.readValueAs(HashMap.class);
+                         if ( "Metadata".equals( name ) ) {
  
-                         for (String type : connectionMap.keySet()) {
-                             List targets = (List) connectionMap.get(type);
 -
+                             Map<String, Object> entityMap = jp.readValueAs( HashMap.class );
  
-                             for (Object targetObject : targets) {
-                                 UUID target = UUID.fromString((String) targetObject);
+                             UUID uuid = UUID.fromString( ( String ) entityMap.get( "uuid" ) );
+                             lastEntity = new SimpleEntityRef( entityType, uuid );
  
-                                 if (!entitiesOnly) {
-                                     //logger.debug("{}Got connection {} to {}",
-                                         //new Object[]{indent, type, target.toString()});
+                             if ( entitiesOnly ) {
+                                 //logger.debug("{}Got entity with uuid {}", indent, lastEntity);
  
-                                     EntityRef entryRef = new SimpleEntityRef(target);
-                                     WriteEvent event = new ConnectionEvent(lastEntity, type, entryRef);
-                                     processWriteEvent( subscriber, event );
-                                 }
 -                                WriteEvent event = new EntityEvent( uuid, entityType, entityMap );
 -                                subscriber.onNext( event );
++                                WriteEvent event = new EntityEvent(uuid, entityType, entityMap);
++                                processWriteEvent( subscriber, event);
                              }
 -
+                             objectStartStack.pop();
                          }
+                         else if ( "connections".equals( name ) ) {
  
-                     } else if (token.equals(JsonToken.START_OBJECT) && "dictionaries".equals(name)) {
 -
+                             Map<String, Object> connectionMap = jp.readValueAs( HashMap.class );
  
-                         Map<String, Object> dictionariesMap = jp.readValueAs(HashMap.class);
-                         for (String dname : dictionariesMap.keySet()) {
-                             Map dmap = (Map) dictionariesMap.get(dname);
+                             for ( String type : connectionMap.keySet() ) {
+                                 List targets = ( List ) connectionMap.get( type );
  
-                             if (!entitiesOnly) {
-                                 //logger.debug("{}Got dictionary {} size {}",
-                                     //new Object[] {indent, dname, dmap.size() });
+                                 for ( Object targetObject : targets ) {
+                                     UUID target = UUID.fromString( ( String ) targetObject );
  
-                                 WriteEvent event = new DictionaryEvent(lastEntity, dname, dmap);
-                                 processWriteEvent( subscriber, event );
+                                     if ( !entitiesOnly ) {
+                                         //logger.debug("{}Got connection {} to {}",
 -                                        //new Object[]{indent, type, target.toString()});
++                                            //new Object[]{indent, type, target.toString()});
+ 
 -                                        EntityRef entryRef = new SimpleEntityRef( target );
 -                                        WriteEvent event = new ConnectionEvent( lastEntity, type, entryRef );
 -                                        subscriber.onNext( event );
++                                        EntityRef entryRef = new SimpleEntityRef(target);
++                                        WriteEvent event = new ConnectionEvent(lastEntity, type, entryRef);
++                                        processWriteEvent(subscriber, event);
+                                     }
+                                 }
                              }
-                         }
  
-                     } else if (token.equals(JsonToken.START_OBJECT)) {
-                         tokenStack.push(token);
+                             objectStartStack.pop();
 -                        }
 -                        else if ( "dictionaries".equals( name ) ) {
 +
-                     } else if (token.equals(JsonToken.END_OBJECT)) {
-                         tokenStack.pop();
-                     }
++                        } else if ( "dictionaries".equals(name) ) {
+ 
+ 
+                             Map<String, Object> dictionariesMap = jp.readValueAs( HashMap.class );
+                             for ( String dname : dictionariesMap.keySet() ) {
+                                 Map dmap = ( Map ) dictionariesMap.get( dname );
+ 
+                                 if ( !entitiesOnly ) {
+                                     //logger.debug("{}Got dictionary {} size {}",
 -                                    //new Object[] {indent, dname, dmap.size() });
++                                        //new Object[] {indent, dname, dmap.size() });
+ 
 -                                    WriteEvent event = new DictionaryEvent( lastEntity, dname, dmap );
 -                                    subscriber.onNext( event );
++                                    WriteEvent event = new DictionaryEvent(lastEntity, dname, dmap);
++                                    processWriteEvent(subscriber, event);
+                                 }
+                             }
+ 
+                             objectStartStack.pop();
 -                        }
 -                        //push onto object names we don't immediately understand.  Used for parent detection
 -                        else{
++
++                        } else {
++                            // push onto object names we don't immediately understand.  Used for parent detection
+                             objectNameStack.push( name );
+                         }
+ 
 -                    }
 -
 -                    else if (token.equals( JsonToken.START_ARRAY )){
++                    }  else if (token.equals( JsonToken.START_ARRAY )) {
+                          if( objectNameStack.size() == 1 && COLLECTION_OBJECT_NAME.equals( objectNameStack.peek() )) {
+                             entityType = InflectionUtils.singularize( name );
+                          }
 -                    }
  
-                     if (token.equals(JsonToken.END_ARRAY) && tokenStack.isEmpty()) {
-                         done = true;
 -                    else if ( token.equals( JsonToken.END_OBJECT ) ) {
++                    } else if ( token.equals( JsonToken.END_OBJECT ) ) {
+                         objectStartStack.pop();
                      }
                  }
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9e76c56a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --cc stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index 62d9e1b,22ad931..24ed292
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@@ -337,10 -298,10 +298,10 @@@ public class ImportCollectionIT 
  
          try {
  
--            // create 10 applications each with collection of 10 things, export all to S3
++            // create 4 applications each with collection of 10 things, export all to S3
              logger.debug("\n\nCreating 10 applications with 10 entities each\n");
  
--            for (int i = 0; i < 10; i++) {
++            for (int i = 0; i < 4; i++) {
  
                  String appName = "import-test-" + i + RandomStringUtils.randomAlphanumeric(10);
                  UUID appId = setup.getMgmtSvc().createApplication(organization.getUuid(), appName).getId();
@@@ -354,7 -315,7 +315,7 @@@
              }
  
              // import all those exports from S3 into the default test application
--            logger.debug("\n\nCreating 10 applications with 10 entities each\n");
++            logger.debug("\n\nImporting\n");
  
              final EntityManager emDefaultApp = setup.getEmf().getEntityManager(applicationId);
              importCollection(emDefaultApp, "things");
@@@ -370,7 -331,7 +331,7 @@@
  
  
              assertTrue(!importedThings.isEmpty());
--            assertEquals(100, importedThings.size());
++            assertEquals(40, importedThings.size());
  
          } finally {
              deleteBucket();


[4/4] incubator-usergrid git commit: Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import

Posted by sn...@apache.org.
Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import

Conflicts:
	stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/62deba3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/62deba3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/62deba3f

Branch: refs/heads/two-dot-o-import
Commit: 62deba3fa9891880047fb0dad7b89b09e24e3e8e
Parents: 2428b86 93cbeb5
Author: Dave Johnson <dm...@apigee.com>
Authored: Tue Feb 10 07:27:44 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Tue Feb 10 07:27:44 2015 -0500

----------------------------------------------------------------------
 .../persistence/entities/FileImport.java        |  20 +-
 .../applications/ApplicationResource.java       |   5 +
 .../rest/management/ImportResourceIT.java       | 371 +++----------------
 .../management/importer/ImportServiceImpl.java  | 115 +++---
 .../management/importer/ImportCollectionIT.java |   7 +-
 .../resources/usergrid-custom-test.properties   |   5 +-
 6 files changed, 128 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/62deba3f/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 2d611c5,b90d835..5edaab7
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@@ -173,12 -174,11 +175,11 @@@ public class ImportServiceImpl implemen
          }
  
          // create a FileImport entity to store metadata about the fileImport job
-         String collectionName = config.get("collectionName").toString();
          UUID applicationId = (UUID)config.get("applicationId");
-         FileImport fileImport = new FileImport( file, applicationId, collectionName );
+         FileImport fileImport = new FileImport( file, applicationId );
 -        fileImport = rootEm.create(fileImport);
 +        fileImport = emManagementApp.create(fileImport);
  
 -        Import importUG = rootEm.get(importRef, Import.class);
 +        Import importEntity = emManagementApp.get(importRef, Import.class);
  
          try {
              // create a connection between the main import job and the sub FileImport Job
@@@ -213,6 -206,30 +214,29 @@@
      }
  
  
+     private int getConnectionCount( final Import importRoot ) {
+ 
+         try {
+             EntityManager rootEm = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
+ 
+             Results entities = rootEm.getConnectedEntities( importRoot, "includes", null, Level.ALL_PROPERTIES );
+             PagingResultsIterator itr = new PagingResultsIterator( entities );
+ 
+             int count = 0;
+ 
+             while ( itr.hasNext() ) {
+                 itr.next();
+                 count++;
+             }
+ 
 -
+             return count;
+         }
+         catch ( Exception e ) {
+             logger.error( "application doesn't exist within the current context" );
+             throw new RuntimeException( e );
+         }
+     }
+ 
      /**
       * Schedule the file tasks.  This must happen in 2 phases.  The first is linking the sub files to the master the
       * second is scheduling them to run.
@@@ -454,25 -471,16 +478,21 @@@
                  fileJobs.add( jobData) ;
              }
  
 -
 -            //TODO: A hack, but we have to be sure all the connections are consistent before we schedule jobs
 -            for(int i = 0; i < 20; i ++){
 -                final int count = getConnectionCount(rootImportTask);
 -
 -                if(count == fileJobs.size()){
 -                    break;
 +            int retries = 0;
 +            int maxRetries = 60;
 +            Results entities;
 +            boolean done = false;
 +            while ( !done && retries++ < maxRetries ) {
 +
-                 entities = emManagementApp.getConnectedEntities(
-                     importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
- 
-                 logger.debug("Found {} jobs", entities.size());
-                 
-                 if ( entities.size() == fileJobs.size() ) {
++                final int count = getConnectionCount(importEntity);
++                if ( count == fileJobs.size() ) {
 +                    done = true;
 +                } else {
 +                    Thread.sleep(1000);
                  }
 -
 -                Thread.sleep( 1000 );
 +            }
 +            if ( retries >= maxRetries ) {
 +                throw new RuntimeException("Max retries was reached");
              }
  
              // schedule each job
@@@ -702,27 -667,29 +722,28 @@@
  
          // TODO: move JSON parser into observable creation so open/close happens within the stream
          final JsonEntityParserObservable jsonObservableEntities =
 -            new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
 +            new JsonEntityParserObservable(jp, em, rootEm, fileImport, tracker, entitiesOnly);
 +
-         jsonObservableEntities.call( null );
+         final Observable<WriteEvent> entityEventObservable = Observable.create(jsonObservableEntities);
  
- //        final Observable<WriteEvent> entityEventObservable = Observable.create(jsonObservableEntities);
- //
- //        // only take while our stats tell us we should continue processing
- //        // potentially skip the first n if this is a resume operation
- //        final int entityNumSkip = (int)tracker.getTotalEntityCount();
- //
- //       final int entityCount =  entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
- //            @Override
- //            public Boolean call( final WriteEvent writeEvent ) {
- //                return !tracker.shouldStopProcessingEntities();
- //            }
- //        } ).skip(entityNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
- //           @Override
- //           public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
- //               return entityWrapperObservable.doOnNext(doWork);
- //           }
- //       }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+         // only take while our stats tell us we should continue processing
+         // potentially skip the first n if this is a resume operation
+         final int entityNumSkip = (int)tracker.getTotalEntityCount();
+ 
+         // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
+        final int entityCount =  entityEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
+             @Override
+             public Boolean call( final WriteEvent writeEvent ) {
+                 return !tracker.shouldStopProcessingEntities();
+             }
+         } ).skip(entityNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+             @Override
+             public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+                 return entityWrapperObservable.doOnNext(doWork);
+             }
+         }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+ 
 -
 -
+         jp.close();
  
          logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote entities\n");
  
@@@ -737,28 -704,28 +758,28 @@@
  
          // TODO: move JSON parser into observable creation so open/close happens within the stream
          final JsonEntityParserObservable jsonObservableOther =
 -            new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
 +            new JsonEntityParserObservable(jp, em, rootEm, fileImport, tracker, entitiesOnly);
  
-         jsonObservableOther.call( null );
+         final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
  
- //        final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
- //
- //        // only take while our stats tell us we should continue processing
- //        // potentially skip the first n if this is a resume operation
- //        final int connectionNumSkip = (int)tracker.getTotalConnectionCount();
- //
- //        // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
- //        final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
- //            @Override
- //            public Boolean call( final WriteEvent writeEvent ) {
- //                return !tracker.shouldStopProcessingConnections();
- //            }
- //        } ).skip(connectionNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
- //            @Override
- //            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
- //                return entityWrapperObservable.doOnNext(doWork);
- //            }
- //        }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+         // only take while our stats tell us we should continue processing
+         // potentially skip the first n if this is a resume operation
+         final int connectionNumSkip = (int)tracker.getTotalConnectionCount();
+ 
+         // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail
+         final int connectionCount = otherEventObservable.takeWhile( new Func1<WriteEvent, Boolean>() {
+             @Override
+             public Boolean call( final WriteEvent writeEvent ) {
+                 return !tracker.shouldStopProcessingConnections();
+             }
+         } ).skip(connectionNumSkip).parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+             @Override
+             public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+                 return entityWrapperObservable.doOnNext(doWork);
+             }
+         }, Schedulers.io()).reduce(0, heartbeatReducer).toBlocking().last();
+ 
+         jp.close();
  
          logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): Wrote others for file {}\n",
              fileImport.getFileName());

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/62deba3f/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --cc stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index a9fd7db,22ad931..334f465
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@@ -54,6 -54,6 +54,7 @@@ import org.slf4j.LoggerFactory
  import java.util.*;
  
  import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertNotNull;
  import static org.junit.Assert.assertTrue;
  
  
@@@ -321,12 -324,14 +322,14 @@@ public class ImportCollectionIT 
  
              logger.debug("\n\nQuery to see if we now have 100 entities\n");
  
 -
              Query query = Query.fromQL("select *").withLimit(101);
++            
              List<Entity> importedThings = emDefaultApp.getCollection(
                  emDefaultApp.getApplicationId(), "things", query, Level.ALL_PROPERTIES).getEntities();
  
 -
--            assertTrue(!importedThings.isEmpty());
--            assertEquals(100, importedThings.size());
++            assertNotNull("importedThings must not be null", !importedThings.isEmpty());
++            assertTrue("importedThings must not be empty", !importedThings.isEmpty());
++            assertEquals("there must be 100 importedThings", 100, importedThings.size());
  
          } finally {
              deleteBucket();


[3/4] incubator-usergrid git commit: After the Import->FileImport connections are made, wait for them to take effect before scheduling jobs.

Posted by sn...@apache.org.
After the Import->FileImport connections are made, wait for them to take effect before scheduling jobs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2428b862
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2428b862
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2428b862

Branch: refs/heads/two-dot-o-import
Commit: 2428b862a4b6993e89376e580dcd3dd2955182aa
Parents: 9e76c56
Author: Dave Johnson <dm...@apigee.com>
Authored: Mon Feb 9 19:57:33 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Mon Feb 9 19:57:33 2015 -0500

----------------------------------------------------------------------
 .../management/importer/ImportServiceImpl.java  | 122 ++++++++++++-------
 .../management/importer/ImportCollectionIT.java |   9 +-
 2 files changed, 78 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2428b862/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index a613fdb..2d611c5 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -183,6 +183,13 @@ public class ImportServiceImpl implements ImportService {
         try {
             // create a connection between the main import job and the sub FileImport Job
             emManagementApp.createConnection(importEntity, "includes", fileImport);
+
+            logger.debug("Created connection from {}:{} to {}:{}",
+                new Object[] {
+                    importEntity.getType(), importEntity.getUuid(),
+                    fileImport.getType(), fileImport.getUuid()
+                });
+
         } catch (Exception e) {
             logger.error(e.getMessage());
             return null;
@@ -202,8 +209,6 @@ public class ImportServiceImpl implements ImportService {
         fileImport.setState(FileImport.State.SCHEDULED);
         emManagementApp.update(fileImport);
 
-        emf.refreshIndex();
-
         return jobData;
     }
 
@@ -372,12 +377,12 @@ public class ImportServiceImpl implements ImportService {
 
         EntityManager emManagementApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
         UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
-        Import rootImportTask = emManagementApp.get(importId, Import.class);
+        Import importEntity = emManagementApp.get(importId, Import.class);
 
-        rootImportTask.setState(Import.State.STARTED);
-        rootImportTask.setStarted(System.currentTimeMillis());
-        rootImportTask.setErrorMessage( " " );
-        emManagementApp.update(rootImportTask);
+        importEntity.setState(Import.State.STARTED);
+        importEntity.setStarted(System.currentTimeMillis());
+        importEntity.setErrorMessage(" ");
+        emManagementApp.update(importEntity);
         logger.debug("doImport(): updated state");
 
         // if no S3 importer was passed in then create one
@@ -392,9 +397,9 @@ public class ImportServiceImpl implements ImportService {
             }
         } catch (Exception e) {
             logger.error("doImport(): Error creating S3Import", e);
-            rootImportTask.setErrorMessage(e.getMessage());
-            rootImportTask.setState( Import.State.FAILED );
-            emManagementApp.update(rootImportTask);
+            importEntity.setErrorMessage(e.getMessage());
+            importEntity.setState(Import.State.FAILED);
+            emManagementApp.update(importEntity);
             return;
         }
 
@@ -405,9 +410,9 @@ public class ImportServiceImpl implements ImportService {
 
             if (config.get("organizationId") == null) {
                 logger.error("doImport(): No organization could be found");
-                rootImportTask.setErrorMessage( "No organization could be found" );
-                rootImportTask.setState( Import.State.FAILED );
-                emManagementApp.update(rootImportTask);
+                importEntity.setErrorMessage("No organization could be found");
+                importEntity.setState(Import.State.FAILED);
+                emManagementApp.update(importEntity);
                 return;
 
             } else {
@@ -422,9 +427,9 @@ public class ImportServiceImpl implements ImportService {
             }
 
         } catch (OrganizationNotFoundException | ApplicationNotFoundException e) {
-            rootImportTask.setErrorMessage( e.getMessage() );
-            rootImportTask.setState( Import.State.FAILED );
-            emManagementApp.update(rootImportTask);
+            importEntity.setErrorMessage(e.getMessage());
+            importEntity.setState(Import.State.FAILED);
+            emManagementApp.update(importEntity);
             return;
         }
 
@@ -432,9 +437,9 @@ public class ImportServiceImpl implements ImportService {
         // schedule a FileImport job for each file found in the bucket
 
         if ( bucketFiles.isEmpty() )  {
-            rootImportTask.setState( Import.State.FINISHED );
-            rootImportTask.setErrorMessage( "No files found in the bucket: " + bucketName );
-            emManagementApp.update(rootImportTask);
+            importEntity.setState(Import.State.FINISHED);
+            importEntity.setErrorMessage("No files found in the bucket: " + bucketName);
+            emManagementApp.update(importEntity);
 
         } else {
 
@@ -445,10 +450,31 @@ public class ImportServiceImpl implements ImportService {
             // create the Entity Connection and set up metadata for each job
 
             for ( String bucketFile : bucketFiles ) {
-                final JobData jobData = createFileTask(config, bucketFile, rootImportTask);
+                final JobData jobData = createFileTask(config, bucketFile, importEntity);
                 fileJobs.add( jobData) ;
             }
 
+            int retries = 0;
+            int maxRetries = 60;
+            Results entities;
+            boolean done = false;
+            while ( !done && retries++ < maxRetries ) {
+
+                entities = emManagementApp.getConnectedEntities(
+                    importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
+
+                logger.debug("Found {} jobs", entities.size());
+                
+                if ( entities.size() == fileJobs.size() ) {
+                    done = true;
+                } else {
+                    Thread.sleep(1000);
+                }
+            }
+            if ( retries >= maxRetries ) {
+                throw new RuntimeException("Max retries was reached");
+            }
+
             // schedule each job
 
             for ( JobData jobData: fileJobs ) {
@@ -462,9 +488,9 @@ public class ImportServiceImpl implements ImportService {
             }
 
             fileMetadata.put("files", value);
-            rootImportTask.addProperties(fileMetadata);
-            rootImportTask.setFileCount( fileJobs.size() );
-            emManagementApp.update(rootImportTask);
+            importEntity.addProperties(fileMetadata);
+            importEntity.setFileCount(fileJobs.size());
+            emManagementApp.update(importEntity);
         }
     }
 
@@ -546,31 +572,35 @@ public class ImportServiceImpl implements ImportService {
         String randTag = RandomStringUtils.randomAlphanumeric(4);
         logger.debug("{} Got importEntity {}", randTag, importEntity.getUuid() );
 
-        int retries = 0;
-        int maxRetries = 60;
-        Results entities = null;
-        boolean done = false;
-        while ( !done && retries++ < maxRetries ) {
-
-            // get all file import job siblings of the current job we're working now
-            entities = emManagementApp.getConnectedEntities(
-                importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
+        Results entities = emManagementApp.getConnectedEntities(
+            importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
 
-            if ( entities.size() == importEntity.getFileCount() ) {
-                logger.debug("{} got {} file_import entities, expected {} DONE!",
-                    new Object[] { randTag, entities.size(), importEntity.getFileCount() });
-                done = true;
 
-            } else {
-                logger.debug("{} got {} file_import entities, expected {} waiting... ",
-                    new Object[] { randTag, entities.size(), importEntity.getFileCount() });
-                Thread.sleep(1000);
-            }
-        }
-
-        if ( retries >= maxRetries ) {
-            throw new RuntimeException("Max retries was reached");
-        }
+//        int retries = 0;
+//        int maxRetries = 60;
+//        Results entities = null;
+//        boolean done = false;
+//        while ( !done && retries++ < maxRetries ) {
+//
+//            // get all file import job siblings of the current job we're working now
+//            entities = emManagementApp.getConnectedEntities(
+//                importEntity, "includes", "file_import", Level.ALL_PROPERTIES);
+//
+//            if ( entities.size() == importEntity.getFileCount() ) {
+//                logger.debug("{} got {} file_import entities, expected {} DONE!",
+//                    new Object[] { randTag, entities.size(), importEntity.getFileCount() });
+//                done = true;
+//
+//            } else {
+//                logger.debug("{} got {} file_import entities, expected {} waiting... ",
+//                    new Object[] { randTag, entities.size(), importEntity.getFileCount() });
+//                Thread.sleep(1000);
+//            }
+//        }
+//
+//        if ( retries >= maxRetries ) {
+//            throw new RuntimeException("Max retries was reached");
+//        }
 
 
         PagingResultsIterator itr = new PagingResultsIterator( entities );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2428b862/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
index 24ed292..a9fd7db 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportCollectionIT.java
@@ -285,9 +285,6 @@ public class ImportCollectionIT {
     }
 
 
-
-
-
    /**
      * Simple import test but with multiple files.
      */
@@ -301,7 +298,7 @@ public class ImportCollectionIT {
             // create 4 applications each with collection of 10 things, export all to S3
             logger.debug("\n\nCreating 10 applications with 10 entities each\n");
 
-            for (int i = 0; i < 4; i++) {
+            for (int i = 0; i < 10; i++) {
 
                 String appName = "import-test-" + i + RandomStringUtils.randomAlphanumeric(10);
                 UUID appId = setup.getMgmtSvc().createApplication(organization.getUuid(), appName).getId();
@@ -324,14 +321,12 @@ public class ImportCollectionIT {
 
             logger.debug("\n\nQuery to see if we now have 100 entities\n");
 
-
             Query query = Query.fromQL("select *").withLimit(101);
             List<Entity> importedThings = emDefaultApp.getCollection(
                 emDefaultApp.getApplicationId(), "things", query, Level.ALL_PROPERTIES).getEntities();
 
-
             assertTrue(!importedThings.isEmpty());
-            assertEquals(40, importedThings.size());
+            assertEquals(100, importedThings.size());
 
         } finally {
             deleteBucket();