You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/02/03 16:32:47 UTC

[1/3] incubator-usergrid git commit: Using two-pass approach to ensure Entities are created before Connections.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-import 63088a2fa -> c652c39bc


Using two-pass approach to ensure Entities are created before Connections.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/858cbc0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/858cbc0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/858cbc0e

Branch: refs/heads/two-dot-o-import
Commit: 858cbc0e8003fbbb79763e0602a8efc9a6b93783
Parents: 41852d6
Author: Dave Johnson <dm...@apigee.com>
Authored: Fri Jan 30 08:10:04 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Fri Jan 30 08:10:04 2015 -0500

----------------------------------------------------------------------
 .../usergrid/management/export/ExportJob.java   |  2 +-
 .../usergrid/management/importer/ImportJob.java |  6 +-
 .../management/importer/ImportServiceImpl.java  | 91 +++++++++++++++-----
 .../management/importer/ImportServiceIT.java    | 19 ++--
 4 files changed, 79 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/858cbc0e/stack/services/src/main/java/org/apache/usergrid/management/export/ExportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/export/ExportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/export/ExportJob.java
index 8e845b4..3bdfac9 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/export/ExportJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/export/ExportJob.java
@@ -46,7 +46,7 @@ public class ExportJob extends OnlyOnceJob {
 
     @Override
     public void doJob( JobExecution jobExecution ) throws Exception {
-        logger.info( "execute ExportJob {}", jobExecution );
+        logger.info( "execute ExportJob {}", jobExecution.getJobId().toString() );
 
         JobData jobData = jobExecution.getJobData();
         if ( jobData == null ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/858cbc0e/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
index 9ebb65e..24393cd 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
@@ -42,13 +42,13 @@ public class ImportJob extends OnlyOnceJob {
     @Autowired
     ImportService importService;
 
-    public ImportJob() {
+    public ImportJob(){
         logger.info( "ImportJob created " + this );
     }
 
     @Override
     protected void doJob(JobExecution jobExecution) throws Exception {
-        logger.info( "execute ImportJob {}", jobExecution );
+        logger.info( "execute ImportJob {}", jobExecution.getJobId().toString() );
 
         JobData jobData = jobExecution.getJobData();
         if ( jobData == null ) {
@@ -76,7 +76,7 @@ public class ImportJob extends OnlyOnceJob {
         this.importService = importService;
     }
 
-    
+
     /**
      * This method is called when the job is retried maximum times by the
      * scheduler but still fails. Thus the scheduler marks it as DEAD.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/858cbc0e/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 128f69e..e78675b 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -861,9 +861,16 @@ public class ImportServiceImpl implements ImportService {
                 if ( entityRef.getType() == null ) {
                     entityRef = em.get(ownerEntityRef.getUuid());
                 }
+
+                logger.debug("Creating connection from {}:{} to {}:{}",
+                    new Object[] {
+                        ownerEntityRef.getType(), ownerEntityRef.getUuid(),
+                        entityRef.getType(), entityRef.getUuid() });
+
                 em.createConnection(ownerEntityRef, connectionType, entityRef);
 
             } catch (Exception e) {
+                logger.error("Error writing connection", e);
                 fileImport.setErrorMessage(e.getMessage());
                 try {
                     rootEm.update(fileImport);
@@ -895,11 +902,19 @@ public class ImportServiceImpl implements ImportService {
         public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
             EntityManager rootEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
             try {
+                
+                logger.debug("Adding map to {}:{} dictionary {}",
+                    new Object[] {ownerEntityRef.getType(), ownerEntityRef.getType(), dictionaryName });
+
                 em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
+
             } catch (Exception e) {
+                logger.error("Error writing dictionary", e);
                 fileImport.setErrorMessage(e.getMessage());
                 try {
+
                     rootEm.update(fileImport);
+
                 } catch (Exception ex) {
 
                     // TODO should we abort at this point?
@@ -925,19 +940,36 @@ public class ImportServiceImpl implements ImportService {
             this.fileImport = fileImport;
         }
 
+
         @Override
         public void call(final Subscriber<? super WriteEvent> subscriber) {
 
-            WriteEvent entityWrapper = null;
-            EntityRef ownerEntityRef = null;
-            String entityUuid = "";
-            String entityType = "";
+            // have to do this in two passes so that entities are created before connections
+
+            // first entities
+            process( subscriber, true );
+
+            // next connections and dictionaries
+            process( subscriber, false);
+        }
+
+
+        private void process(final Subscriber<? super WriteEvent> subscriber, boolean entities ) {
+
+            logger.debug("process(): entities = " + entities );
+
+            EntityRef lastEntitySeenRef = null;
+
             try {
+
                 while (!subscriber.isUnsubscribed() && jp.nextToken() != JsonToken.END_OBJECT) {
+
                     String collectionName = jp.getCurrentName();
 
+                    logger.debug("Processing currentName: " + jp.getCurrentName());
+
                     // create the  wrapper for connections
-                    if ( collectionName != null && collectionName.equals("connections")) {
+                    if (collectionName != null && collectionName.equals("connections")) {
 
                         jp.nextToken(); // START_OBJECT
                         while (jp.nextToken() != JsonToken.END_OBJECT) {
@@ -947,49 +979,59 @@ public class ImportServiceImpl implements ImportService {
                             while (jp.nextToken() != JsonToken.END_ARRAY) {
                                 String entryId = jp.getText();
 
-                                EntityRef entryRef = new SimpleEntityRef(UUID.fromString(entryId));
-                                entityWrapper = new ConnectionEvent(ownerEntityRef, connectionType, entryRef);
-
-                                // Creates a new subscriber to the observer with the given connection wrapper
-                                subscriber.onNext(entityWrapper);
+                                if ( !entities ) {
+                                    EntityRef entryRef = new SimpleEntityRef(UUID.fromString(entryId));
+                                    WriteEvent entityWrapper = new ConnectionEvent(lastEntitySeenRef, connectionType, entryRef);
+                                    subscriber.onNext(entityWrapper);
+                                }
                             }
                         }
 
                     }
                     // create the  wrapper for dictionaries
-                    else if ( collectionName != null && collectionName.equals("dictionaries")) {
+                    else if (collectionName != null && collectionName.equals("dictionaries")) {
 
                         jp.nextToken(); // START_OBJECT
                         while (jp.nextToken() != JsonToken.END_OBJECT) {
 
                             String dictionaryName = jp.getCurrentName();
-
                             jp.nextToken();
-
                             Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
-                            entityWrapper = new DictionaryEvent(ownerEntityRef, dictionaryName, dictionary);
 
-                            // Creates a new subscriber to the observer with the given dictionary wrapper
-                            subscriber.onNext(entityWrapper);
+                            if ( !entities ) {
+                                WriteEvent entityWrapper = new DictionaryEvent(
+                                    lastEntitySeenRef, dictionaryName, dictionary);
+                                subscriber.onNext(entityWrapper);
+                            }
                         }
                         subscriber.onCompleted();
 
                     } else {
 
                         // Regular collections
+
                         jp.nextToken(); // START_OBJECT
 
-                        Map<String, Object> properties = new HashMap<String, Object>();
                         JsonToken token = jp.nextToken();
 
+                        Map<String, Object> properties = new HashMap<>();
+
+                        String entityUuid = null;
+                        String entityType = null;
+
                         while (token != JsonToken.END_OBJECT) {
+
                             if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
+
                                 String key = jp.getCurrentName();
+                                logger.debug("   currentName: " + jp.getText());
+
                                 if (key.equals("uuid")) {
                                     entityUuid = jp.getText();
 
                                 } else if (key.equals("type")) {
                                     entityType = jp.getText();
+
                                 } else if (key.length() != 0 && jp.getText().length() != 0) {
                                     String value = jp.getText();
                                     properties.put(key, value);
@@ -998,24 +1040,29 @@ public class ImportServiceImpl implements ImportService {
                             token = jp.nextToken();
                         }
 
-                        ownerEntityRef = new SimpleEntityRef(entityType, UUID.fromString(entityUuid));
-                        entityWrapper = new EntityEvent(UUID.fromString(entityUuid), entityType, properties);
-
-                        // Creates a new subscriber to the observer with the given dictionary wrapper
-                        subscriber.onNext(entityWrapper);
+                        if ( entities ) {
+                            WriteEvent entityWrapper = new EntityEvent(
+                                UUID.fromString(entityUuid), entityType, properties);
+                            subscriber.onNext(entityWrapper);
+                        }
 
+                        // don't save it, but do keep track of last entity seen
+                        lastEntitySeenRef = new SimpleEntityRef( entityType, UUID.fromString(entityUuid) );
                     }
                 }
+
             } catch (Exception e) {
                 // skip illegal entity UUID and go to next one
                 fileImport.setErrorMessage(e.getMessage());
                 try {
                     rootEm.update(fileImport);
                 } catch (Exception ex) {
+                    logger.error("Error updating file import record", ex);
                 }
                 subscriber.onError(e);
             }
         }
+
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/858cbc0e/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
index 4a028b9..178909b 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
@@ -202,21 +202,14 @@ public class ImportServiceIT {
 
             assertTrue( !importedThings.isEmpty() );
 
-            // first two things have connections
-            for (int i = 0; i < 2; i++) {
-                Results r = em2.getConnectedEntities(
-                    importedThings.get(i), "related", null, Level.IDS);
+            // two things have connections
+            int conCount = 0;
+            for ( Entity e : importedThings ) {
+                Results r = em2.getConnectedEntities( e, "related", null, Level.IDS);
                 List<ConnectionRef> connections = r.getConnections();
-                assertTrue( !connections.isEmpty() );
-            }
-
-            // other things do not have connections
-            for (int i = 3; i < 10; i++) {
-                Results r = em2.getConnectedEntities(
-                    importedThings.get(i), "related", null, Level.IDS);
-                List<ConnectionRef> connections = r.getConnections();
-                assertTrue( connections.isEmpty() );
+                conCount += connections.size();
             }
+            assertEquals( 2, conCount );
 
             logger.debug("\n\nCheck dictionary\n");
 


[3/3] incubator-usergrid git commit: Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import

Posted by sn...@apache.org.
Merge branch 'two-dot-o-import' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-import

Conflicts:
	stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
	stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c652c39b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c652c39b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c652c39b

Branch: refs/heads/two-dot-o-import
Commit: c652c39bc7a87db15dd68349b7709df60cc8566d
Parents: b22c37a 63088a2
Author: Dave Johnson <dm...@apigee.com>
Authored: Tue Feb 3 10:32:32 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Tue Feb 3 10:32:32 2015 -0500

----------------------------------------------------------------------
 .../queue/impl/QueueScopeFactoryImpl.java       |   1 +
 .../queue/impl/SQSQueueManagerImpl.java         |  51 --------
 .../queue/impl/UsergridAwsCredentials.java      |  59 +++++++++
 .../impl/UsergridAwsCredentialsProvider.java    |  59 +++++++++
 .../organizations/OrganizationResource.java     |  26 ++--
 .../applications/ApplicationResource.java       |  48 ++++---
 .../rest/management/ExportResourceIT.java       |  11 +-
 .../rest/management/ImportResourceIT.java       |  14 +-
 .../management/export/ExportServiceImpl.java    |   2 +-
 .../management/export/S3ExportImpl.java         |   5 +-
 .../management/importer/FileImportJob.java      |   4 +-
 .../management/importer/ImportService.java      |  19 +--
 .../management/importer/ImportServiceImpl.java  | 128 +++++++++++++------
 .../management/importer/S3ImportImpl.java       |   6 +-
 .../notifications/NotificationsService.java     |   2 +-
 .../services/queues/ImportQueueListener.java    |  26 ++--
 .../services/queues/ImportQueueManager.java     |  65 ++++++++++
 .../services/queues/ImportQueueMessage.java     |  42 +++++-
 .../usergrid/services/queues/QueueListener.java |   4 +-
 .../resources/usergrid-services-context.xml     |  23 +++-
 .../management/export/ExportServiceIT.java      |  23 ++--
 .../management/importer/ImportServiceIT.java    |  48 +++++--
 .../src/test/resources/log4j.properties         |   1 +
 23 files changed, 466 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c652c39b/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
index 6f2eddd,6f2eddd..0b1fac5
--- a/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/export/S3ExportImpl.java
@@@ -17,6 -17,6 +17,7 @@@
  package org.apache.usergrid.management.export;
  
  
++import com.amazonaws.SDKGlobalConfiguration;
  import com.google.common.collect.ImmutableSet;
  import com.google.common.util.concurrent.ListenableFuture;
  import com.google.inject.Module;
@@@ -50,8 -50,8 +51,8 @@@ public class S3ExportImpl implements S3
          Map<String, Object> storage_info = (Map<String,Object>)properties.get( "storage_info" );
  
          String bucketName = ( String ) storage_info.get( "bucket_location" );
--        String accessId = ( String ) storage_info.get( "s3_access_id" );
--        String secretKey = ( String ) storage_info.get( "s3_key" );
++        String accessId = ( String ) storage_info.get( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR );
++        String secretKey = ( String ) storage_info.get( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR );
  
          Properties overrides = new Properties();
          overrides.setProperty( "s3" + ".identity", accessId );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c652c39b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
index cb83ce8,edeed1c..089c8e5
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
@@@ -54,33 -54,19 +54,33 @@@ public class FileImportJob extends Only
  
      @Override
      protected void doJob(JobExecution jobExecution) throws Exception {
 -        logger.info( "execute FileImportJob {}", jobExecution.toString() );
 +        logger.info("execute FileImportJob {}", jobExecution.toString());
  
 -        JobData jobData = jobExecution.getJobData();
 -        if ( jobData == null ) {
 -            logger.error( "jobData cannot be null" );
 -            return;
 -        }
 +        try {
 +            JobData jobData = jobExecution.getJobData();
 +            if (jobData == null) {
 +                logger.error("jobData cannot be null");
 +                return;
 +            }
 +
 +            // heartbeat to indicate job has started
 +            jobExecution.heartbeat();
  
 -        // heartbeat to indicate job has started
 -        jobExecution.heartbeat();
 +            // call the File Parser for the file set in job execution
 +            importService.parseFileToEntities(jobExecution);
-             
+ 
 -        // call the File Parser for the file set in job execution
 -        //importService.parseFileToEntities(jobExecution);
 +        } catch ( Throwable t ) {
 +            logger.debug("Error importing file", t);
 +
 +            // update file import record
 +            UUID fileImportId = (UUID) jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
 +            EntityManager em = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
 +            FileImport fileImport = em.get(fileImportId, FileImport.class);
 +            fileImport.setState( FileImport.State.FAILED );
 +            em.update( fileImport );
 +
 +            throw t;
 +        }
  
          logger.error("File Import Service completed job");
      }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c652c39b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
index bc90f49,26fb31f..2275888
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportService.java
@@@ -46,17 -47,17 +47,18 @@@ public interface ImportService 
  
      /**
       * Parses the input file and creates entities
--     *
--     * @param jobExecution
--     * @throws Exception
+      */
+     void parseFileToEntities(ImportQueueMessage importQueueMessage) throws Exception;
+ 
+     /**
++     * Parses the input file and creates entities
 +     */
 +    void parseFileToEntities(JobExecution jobExecution) throws Exception;
 +
 +    /**
       * Get the state for the Job with UUID
       * @param uuid Job UUID
       * @return State of Job
--     * @throws Exception
       */
      String getState(UUID uuid) throws Exception;
  
@@@ -64,21 -65,21 +66,22 @@@
       * Returns error message for the job with UUID
       * @param uuid Job UUID
       * @return error message
--     * @throws Exception
       */
      String getErrorMessage(UUID uuid) throws Exception;
  
      /**
--     * @param jobExecution
       * @return FileImportEntity
--     * @throws Exception
+      */
+     FileImport getFileImportEntity(final ImportQueueMessage importQueueMessage) throws Exception;
+ 
+     /**
++     * @return FileImportEntity
 +     */
 +    FileImport getFileImportEntity(final JobExecution jobExecution) throws Exception;
 +
 +    /**
       * @param jobExecution
       * @return ImportEntity
--     * @throws Exception
       */
      Import getImportEntity(final JobExecution jobExecution) throws Exception;
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c652c39b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index 7339087,b8431fa..1357ccb
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@@ -27,6 -28,9 +28,7 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.entities.FileImport;
  import org.apache.usergrid.persistence.entities.Import;
  import org.apache.usergrid.persistence.entities.JobData;
+ 
 -import org.aspectj.lang.annotation.Before;
  import org.codehaus.jackson.JsonFactory;
  import org.codehaus.jackson.JsonParseException;
  import org.codehaus.jackson.JsonParser;
@@@ -44,7 -48,18 +46,17 @@@ import java.io.File
  import java.io.IOException;
  import java.util.*;
  
+ import javax.annotation.PostConstruct;
+ 
+ 
  import org.apache.usergrid.persistence.index.query.Query.Level;
 -import org.apache.usergrid.persistence.index.utils.UUIDUtils;
+ import org.apache.usergrid.persistence.queue.QueueManager;
+ import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+ import org.apache.usergrid.persistence.queue.QueueScope;
+ import org.apache.usergrid.persistence.queue.QueueScopeFactory;
+ import org.apache.usergrid.services.ServiceManagerFactory;
+ import org.apache.usergrid.services.queues.ImportQueueListener;
+ import org.apache.usergrid.services.queues.ImportQueueMessage;
  
  
  public class ImportServiceImpl implements ImportService {
@@@ -62,16 -77,39 +74,38 @@@
      //dependency injection
      private SchedulerService sch;
  
+     private ServiceManagerFactory smf;
+ 
+     //Dependency injection through spring
+     private QueueManager qm;
+ 
+     private QueueManagerFactory queueManagerFactory;
+ 
 -
      //inject Management Service to access Organization Data
      private ManagementService managementService;
      private JsonFactory jsonFactory = new JsonFactory();
  
++
+     @PostConstruct
+     public void init(){
++
+         //TODO: move this to a before or initialization method.
+ 
+         //TODO: made queueName clearly defined.
+         //smf = getApplicationContext().getBean(ServiceManagerFactory.class);
+ 
+         String name = ImportQueueListener.QUEUE_NAME;
+         QueueScopeFactory queueScopeFactory = CpSetup.getInjector().getInstance(QueueScopeFactory.class);
+         QueueScope queueScope = queueScopeFactory.getScope(CpNamingUtils.MANAGEMENT_APPLICATION_ID, name);
+         queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
+         qm = queueManagerFactory.getQueueManager(queueScope);
 -
+     }
+ 
      /**
       * This schedules the main import Job
       *
       * @param config configuration of the job to be scheduled
       * @return it returns the UUID of the scheduled job
--     * @throws Exception
       */
      @Override
      public UUID schedule(Map<String, Object> config) throws Exception {
@@@ -258,27 -302,19 +299,32 @@@
  
      /**
       * Returns the File Import Entity that stores all meta-data for the particular sub File import Job
-      *
-      * @param jobExecution the file import job details
       * @return File Import Entity
--     * @throws Exception
+      */
+     @Override
+     public FileImport getFileImportEntity(final ImportQueueMessage queueMessage) throws Exception {
+ 
+         EntityManager em = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+ 
+         return em.get(queueMessage.getFileId(), FileImport.class);
+     }
+ 
+ 
++    /**
++     * Returns the File Import Entity that stores all meta-data for the particular sub File import Job
++     * @return File Import Entity
 +     */
 +    @Override
 +    public FileImport getFileImportEntity(final JobExecution jobExecution) throws Exception {
 +
 +        UUID fileImportId = (UUID) jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
++
 +        EntityManager em = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
 +
 +        return em.get(fileImportId, FileImport.class);
 +    }
 +
-     /**
-      * This returns the temporary files downloaded form s3
-      */
- //    @Override
- //    public ArrayList<File> getEphemeralFile() {
- //        return files;
- //    }
+ 
      public SchedulerService getSch() {
          return sch;
      }
@@@ -557,21 -591,21 +603,33 @@@
      }
  
  
--    /**
--     * The loops through each temp file and parses it to store the entities from the json back into usergrid
--     *
--     * @throws Exception
--     */
      @Override
-     public void parseFileToEntities(JobExecution jobExecution) throws Exception {
++    // TODO: ImportService should not have to know about ImportQueueMessage
+     public void parseFileToEntities(ImportQueueMessage queueMessage) throws Exception {
  
-         logger.debug("parseFileToEntities() for job {} status {}",
-             jobExecution.getJobName(), jobExecution.getStatus().toString());
 -        logger.debug("parseFileToEntities() for job {} ",
 -            queueMessage.getFileName());
 -
 -        // add properties to the import entity
+         FileImport fileImport = getFileImportEntity(queueMessage);
 -
+         File file = new File(queueMessage.getFileName());
++        UUID targetAppId = queueMessage.getApplicationId();
++
++        parseFileToEntities( fileImport, file, targetAppId );
++    }
 +
-         // add properties to the import entity
-         FileImport fileImport = getFileImportEntity(jobExecution);
 +
++    @Override
++    // TODO: ImportService should not have to know about JobExecution
++    public void parseFileToEntities(JobExecution jobExecution) throws Exception {
++
++        FileImport fileImport = getFileImportEntity(jobExecution);
 +        File file = new File(jobExecution.getJobData().getProperty("File").toString());
++        UUID targetAppId = (UUID) jobExecution.getJobData().getProperty("applicationId");
++
++        parseFileToEntities( fileImport, file, targetAppId );
++    }
++
++
++    public void parseFileToEntities( FileImport fileImport, File file, UUID targetAppId ) throws Exception {
++
++        logger.debug("parseFileToEntities() for file {} ", file.getAbsolutePath());
  
          EntityManager emManagementApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
          emManagementApp.update(fileImport);
@@@ -588,46 -622,44 +646,43 @@@
                  fileImport.setState(FileImport.State.STARTED);
                  emManagementApp.update(fileImport);
  
--                // Get target application ID from the job data (NOT from the filename)
-                 UUID targetAppId = (UUID) jobExecution.getJobData().getProperty("applicationId");
 -                UUID targetAppId = queueMessage.getApplicationId();
--
 -                if ( emManagementApp.get( targetAppId ) == null ) {
 +                if (emManagementApp.get(targetAppId) == null) {
                      throw new IllegalArgumentException("Application does not exist: " + targetAppId.toString());
                  }
 -                EntityManager targetEm = emf.getEntityManager( targetAppId );
 +                EntityManager targetEm = emf.getEntityManager(targetAppId);
                  logger.debug("   importing into app {} file {}", targetAppId.toString(), file.getAbsolutePath());
  
-                 importEntitiesFromFile(file, targetEm, emManagementApp, fileImport, jobExecution);
 -                JsonParser jp = getJsonParserForFile(file);
 -
 -                // in case of resume, retrieve the last updated UUID for this file
 -                String lastUpdatedUUID = fileImport.getLastUpdatedUUID();
 -
 -                // this handles partially completed files by updating entities from the point of failure
 -                if (!lastUpdatedUUID.equals(" ")) {
 -
 -                    // go till the last updated entity
 -                    while (!jp.getText().equals(lastUpdatedUUID)) {
 -                        jp.nextToken();
 -                    }
 -
 -                    // skip the last one and start from the next one
 -                    while (!(jp.getCurrentToken() == JsonToken.END_OBJECT
 -                            && jp.nextToken() == JsonToken.START_OBJECT)) {
 -                        jp.nextToken();
 -                    }
 -                }
++                importEntitiesFromFile(file, targetEm, emManagementApp, fileImport );
  
 -                // get to start of an object i.e next entity.
 -                while (jp.getCurrentToken() != JsonToken.START_OBJECT) {
 -                    jp.nextToken();
 -                }
 +                // TODO: fix the resume on error feature
  
 -                while (jp.nextToken() != JsonToken.END_ARRAY) {
 -                    importEntitiesFromFile(jp, targetEm, emManagementApp, fileImport);
 -                }
 -                jp.close();
 +//                // in case of resume, retrieve the last updated UUID for this file
 +//                String lastUpdatedUUID = fileImport.getLastUpdatedUUID();
 +//
 +//                // this handles partially completed files by updating entities from the point of failure
 +//                if (!lastUpdatedUUID.equals(" ")) {
 +//
 +//                    // go till the last updated entity
 +//                    while (!jp.getText().equals(lastUpdatedUUID)) {
 +//                        jp.nextToken();
 +//                    }
 +//
 +//                    // skip the last one and start from the next one
 +//                    while (!(jp.getCurrentToken() == JsonToken.END_OBJECT
 +//                        && jp.nextToken() == JsonToken.START_OBJECT)) {
 +//                        jp.nextToken();
 +//                    }
 +//                }
 +//
 +//                // get to start of an object i.e next entity.
 +//                while (jp.getCurrentToken() != JsonToken.START_OBJECT) {
 +//                    jp.nextToken();
 +//                }
 +//
 +//                while (jp.nextToken() != JsonToken.END_ARRAY) {
 +//                    importEntitiesFromFile(jp, targetEm, emManagementApp, fileImport, jobExecution);
 +//                }
 +//                jp.close();
  
                  // Updates the state of file import job
                  if (!fileImport.getState().toString().equals("FAILED")) {
@@@ -711,33 -741,26 +766,32 @@@
      }
  
  
 -
      /**
       * Imports the entity's connecting references (collections, connections and dictionaries)
 -     * @param jp  JsonParser pointing to the beginning of the object.
 -     * @param em Entity Manager for the application being imported
 -     * @param rootEm Entity manager for the root applicaition
 -     * @param fileImport the file import entity
 +     *
-      * @param jp           JsonParser pointing to the beginning of the object.
++     * @param file         The file to be imported
 +     * @param em           Entity Manager for the application being imported
 +     * @param rootEm       Entity manager for the root applicaition
-      * @param fileImport   the file import entity
-      * @param jobExecution execution details for the import jbo
++     * @param fileImport   The file import entity
       */
      private void importEntitiesFromFile(
 -        final JsonParser jp,
 +        final File file,
          final EntityManager em,
          final EntityManager rootEm,
-         final FileImport fileImport,
-         final JobExecution jobExecution) throws Exception {
+         final FileImport fileImport) throws Exception {
  
 -        final JsonParserObservable subscribe = new JsonParserObservable(jp, em, rootEm, fileImport);
  
 -        final Observable<WriteEvent> observable = Observable.create(subscribe);
 +        // first we do entities
 +        boolean entitiesOnly = true;
  
 -        // This is the action we want to perform for every UUID we receive
 +        // observable that parses JSON and emits write events
 +        JsonParser jp = getJsonParserForFile(file);
 +        final JsonEntityParserObservable jsonObservableEntities =
 +            new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
 +        final Observable<WriteEvent> entityEventObservable = Observable.create(jsonObservableEntities);
 +
 +        // function to execute for each write event
+         //TODO: job execution no longer needed due to having queueMessage.
          final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
              @Override
              public void call(WriteEvent writeEvent) {
@@@ -792,30 -818,6 +846,30 @@@
  
              }
          }, Schedulers.io()).toBlocking().last();
 +        jp.close();
 +
 +        logger.debug("\n\nWrote entities\n");
 +
 +        // now do other stuff: connections and dictionaries
 +        entitiesOnly = false;
 +
 +        // observable that parses JSON and emits write events
 +        jp = getJsonParserForFile(file);
 +        final JsonEntityParserObservable jsonObservableOther =
 +            new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
 +        final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
 +
 +        otherEventObservable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
 +            @Override
 +            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
 +                return entityWrapperObservable.doOnNext(doWork);
 +
 +            }
 +        }, Schedulers.io()).toBlocking().last();
-         
++
 +        jp.close();
 +
 +        logger.debug("\n\nWrote others\n");
      }
  
  
@@@ -932,22 -927,14 +986,22 @@@
  
          // adds map to the dictionary
          @Override
-         public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
+         public void doWrite(EntityManager em, FileImport fileImport) {
              EntityManager rootEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
              try {
 +
 +                logger.debug("Adding map to {}:{} dictionary {}",
 +                    new Object[]{ownerEntityRef.getType(), ownerEntityRef.getType(), dictionaryName});
 +
                  em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
 +
              } catch (Exception e) {
 +                logger.error("Error writing dictionary", e);
                  fileImport.setErrorMessage(e.getMessage());
                  try {
 +
                      rootEm.update(fileImport);
 +
                  } catch (Exception ex) {
  
                      // TODO should we abort at this point?

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c652c39b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
index 53bf144,07e0798..922c16a
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
@@@ -29,11 -35,11 +35,10 @@@ import com.google.inject.Inject
  import com.google.inject.Singleton;
  
  
--/**
-- * Created by ApigeeCorporation on 1/15/15.
-- *///TODO: make sure this is properly instantiated by guice
++//TODO: make sure this is properly instantiated by guice
  @Singleton
  public class ImportQueueListener extends QueueListener {
++    
      /**
       * Initializes the QueueListener. Need to wire the factories up in guice.
       */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c652c39b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
----------------------------------------------------------------------
diff --cc stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
index 742c33e,f9a0753..b9c3bb9
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
@@@ -97,12 -103,12 +103,13 @@@ public class ImportServiceIT 
          applicationId = setup.getMgmtSvc().createApplication( organization.getUuid(), username+"app" ).getId();
      }
  
++
      @Before
      public void before() {
  
          boolean configured =
-                    !StringUtils.isEmpty(System.getProperty("secretKey"))
-                 && !StringUtils.isEmpty(System.getProperty("accessKey"))
+                    !StringUtils.isEmpty(System.getProperty( SDKGlobalConfiguration.SECRET_KEY_ENV_VAR))
 -                && !StringUtils.isEmpty(System.getProperty(SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR))
++                && !StringUtils.isEmpty(System.getProperty( SDKGlobalConfiguration.ACCESS_KEY_ENV_VAR))
                  && !StringUtils.isEmpty(System.getProperty("bucketName"));
  
          if ( !configured ) {
@@@ -416,7 -445,7 +439,7 @@@
          while ( !exportService.getState( exportUUID ).equals( "FINISHED" ) ) {
              ;
          }
--        //TODo: can check if the temp files got created
++        //TODO: can check if the temp files got created
  
          // import
          S3Import s3Import = new S3ImportImpl();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c652c39b/stack/services/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --cc stack/services/src/test/resources/log4j.properties
index 3ef4d3b,3ef4d3b..a63d719
--- a/stack/services/src/test/resources/log4j.properties
+++ b/stack/services/src/test/resources/log4j.properties
@@@ -57,6 -57,6 +57,7 @@@ log4j.logger.org.apache.usergrid.lockin
  #log4j.logger.org.apache.usergrid.corepersistence=DEBUG
  
  #log4j.logger.org.apache.usergrid.persistence.index=DEBUG
++#log4j.logger.org.apache.usergrid.batch=DEBUG
  log4j.logger.org.apache.usergrid.management.export=DEBUG
  log4j.logger.org.apache.usergrid.management.importer=DEBUG
  


[2/3] incubator-usergrid git commit: Rewrote import parser, fixed some error handing and now ImportServiceIT tests now passing.

Posted by sn...@apache.org.
Rewrote import parser, fixed some error handing and now ImportServiceIT tests now passing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b22c37ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b22c37ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b22c37ae

Branch: refs/heads/two-dot-o-import
Commit: b22c37aeefeb8c9c84b2170e24c966d15112fd6f
Parents: 858cbc0
Author: Dave Johnson <dm...@apigee.com>
Authored: Tue Feb 3 09:26:26 2015 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Tue Feb 3 09:26:26 2015 -0500

----------------------------------------------------------------------
 stack/services/pom.xml                          |   3 +-
 .../management/importer/FileImportJob.java      |  42 ++-
 .../usergrid/management/importer/ImportJob.java |  41 ++-
 .../management/importer/ImportServiceImpl.java  | 329 ++++++++++---------
 .../management/importer/ImportServiceIT.java    |   8 +-
 5 files changed, 239 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b22c37ae/stack/services/pom.xml
----------------------------------------------------------------------
diff --git a/stack/services/pom.xml b/stack/services/pom.xml
index 83d49e3..820020d 100644
--- a/stack/services/pom.xml
+++ b/stack/services/pom.xml
@@ -56,7 +56,7 @@
         <directory>src/main/resources</directory>
         <filtering>true</filtering>
         <includes>
-          <include>**/*.xml</include>
+            <include>**/*.xml</include>
         </includes>
       </resource>
       <resource>
@@ -72,6 +72,7 @@
           <include>**/*.xml</include>
           <include>**/*.properties</include>
           <include>**/*.p12</include>
+          <include>**/*.json</include>
         </includes>
       </testResource>
       <testResource>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b22c37ae/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
index 25295fc..cb83ce8 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/FileImportJob.java
@@ -42,8 +42,8 @@ public class FileImportJob extends OnlyOnceJob {
     public static final String FILE_IMPORT_ID = "fileImportId";
     private static final Logger logger = LoggerFactory.getLogger(FileImportJob.class);
 
-    // injected the Entity Manager Factory
-    protected EntityManagerFactory emf;
+    @Autowired
+    EntityManagerFactory emf;
 
     @Autowired
     ImportService importService;
@@ -54,20 +54,34 @@ public class FileImportJob extends OnlyOnceJob {
 
     @Override
     protected void doJob(JobExecution jobExecution) throws Exception {
-        logger.info( "execute FileImportJob {}", jobExecution.toString() );
-
-        JobData jobData = jobExecution.getJobData();
-        if ( jobData == null ) {
-            logger.error( "jobData cannot be null" );
-            return;
+        logger.info("execute FileImportJob {}", jobExecution.toString());
+
+        try {
+            JobData jobData = jobExecution.getJobData();
+            if (jobData == null) {
+                logger.error("jobData cannot be null");
+                return;
+            }
+
+            // heartbeat to indicate job has started
+            jobExecution.heartbeat();
+
+            // call the File Parser for the file set in job execution
+            importService.parseFileToEntities(jobExecution);
+            
+        } catch ( Throwable t ) {
+            logger.debug("Error importing file", t);
+
+            // update file import record
+            UUID fileImportId = (UUID) jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
+            EntityManager em = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+            FileImport fileImport = em.get(fileImportId, FileImport.class);
+            fileImport.setState( FileImport.State.FAILED );
+            em.update( fileImport );
+
+            throw t;
         }
 
-        // heartbeat to indicate job has started
-        jobExecution.heartbeat();
-
-        // call the File Parser for the file set in job execution
-        importService.parseFileToEntities(jobExecution);
-
         logger.error("File Import Service completed job");
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b22c37ae/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
index 24393cd..995850c 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportJob.java
@@ -23,6 +23,7 @@ import org.apache.usergrid.batch.job.OnlyOnceJob;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.entities.FileImport;
 import org.apache.usergrid.persistence.entities.Import;
 import org.apache.usergrid.persistence.entities.JobData;
 import org.slf4j.Logger;
@@ -30,6 +31,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.UUID;
+
 
 @Component("importJob")
 public class ImportJob extends OnlyOnceJob {
@@ -37,8 +40,9 @@ public class ImportJob extends OnlyOnceJob {
     public static final String IMPORT_ID = "importId";
     private static final Logger logger = LoggerFactory.getLogger(ImportJob.class);
 
-    //injected the Entity Manager Factory
+    @Autowired
     protected EntityManagerFactory emf;
+
     @Autowired
     ImportService importService;
 
@@ -50,18 +54,33 @@ public class ImportJob extends OnlyOnceJob {
     protected void doJob(JobExecution jobExecution) throws Exception {
         logger.info( "execute ImportJob {}", jobExecution.getJobId().toString() );
 
-        JobData jobData = jobExecution.getJobData();
-        if ( jobData == null ) {
-            logger.error( "jobData cannot be null" );
-            return;
-        }
+        try {
+            JobData jobData = jobExecution.getJobData();
+            if (jobData == null) {
+                logger.error("jobData cannot be null");
+                return;
+            }
 
-        // heartbeat to indicate job has started
-        jobExecution.heartbeat();
+            // heartbeat to indicate job has started
+            jobExecution.heartbeat();
 
-        // call the doImport method from import service which
-        // schedules the sub-jobs i.e. parsing of files to FileImport Job
-        importService.doImport( jobExecution );
+            // call the doImport method from import service which
+            // schedules the sub-jobs i.e. parsing of files to FileImport Job
+            importService.doImport(jobExecution);
+
+        } catch ( Throwable t ) {
+            logger.error("Error calling in importJob", t);
+
+            // update import job record
+            UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
+            EntityManager mgmtApp = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+            Import importEntity = mgmtApp.get(importId, Import.class);
+            importEntity.setState(Import.State.FAILED);
+            importEntity.setErrorMessage(t.getMessage());
+            mgmtApp.update(importEntity);
+
+            throw t;
+        }
 
         logger.error("Import Service completed job");
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b22c37ae/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
index e78675b..7339087 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
@@ -24,7 +24,6 @@ import org.apache.usergrid.management.ApplicationInfo;
 import org.apache.usergrid.management.ManagementService;
 import org.apache.usergrid.management.OrganizationInfo;
 import org.apache.usergrid.persistence.*;
-import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.entities.FileImport;
 import org.apache.usergrid.persistence.entities.Import;
 import org.apache.usergrid.persistence.entities.JobData;
@@ -84,7 +83,7 @@ public class ImportServiceImpl implements ImportService {
 
         EntityManager rootEm = null;
         try {
-            rootEm = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID);
+            rootEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
             Set<String> collections = rootEm.getApplicationCollections();
             if (!collections.contains("imports")) {
                 rootEm.createApplicationCollection("imports");
@@ -132,10 +131,10 @@ public class ImportServiceImpl implements ImportService {
      * @return it returns the UUID of the scheduled job
      * @throws Exception
      */
-    public UUID scheduleFile( Map<String, Object> config, String file, EntityRef importRef) throws Exception {
+    public UUID scheduleFile(Map<String, Object> config, String file, EntityRef importRef) throws Exception {
 
         logger.debug("scheduleFile() for import {}:{} file {}",
-            new Object[] { importRef.getType(), importRef.getType(), file});
+            new Object[]{importRef.getType(), importRef.getType(), file});
 
         EntityManager rootEm = null;
 
@@ -172,9 +171,9 @@ public class ImportServiceImpl implements ImportService {
 
         //set data to be transferred to the FileImport Job
         JobData jobData = new JobData();
-        jobData.setProperty( "File", file);
-        jobData.setProperty( FILE_IMPORT_ID, fileImport.getUuid());
-        jobData.addProperties( config );
+        jobData.setProperty("File", file);
+        jobData.setProperty(FILE_IMPORT_ID, fileImport.getUuid());
+        jobData.addProperties(config);
 
         long soonestPossible = System.currentTimeMillis() + 250; //sch grace period
 
@@ -242,6 +241,7 @@ public class ImportServiceImpl implements ImportService {
 
     /**
      * Returns the Import Entity that stores all meta-data for the particular import Job
+     *
      * @param jobExecution the import job details
      * @return Import Entity
      * @throws Exception
@@ -258,6 +258,7 @@ public class ImportServiceImpl implements ImportService {
 
     /**
      * Returns the File Import Entity that stores all meta-data for the particular sub File import Job
+     *
      * @param jobExecution the file import job details
      * @return File Import Entity
      * @throws Exception
@@ -278,7 +279,6 @@ public class ImportServiceImpl implements ImportService {
 //    public ArrayList<File> getEphemeralFile() {
 //        return files;
 //    }
-
     public SchedulerService getSch() {
         return sch;
     }
@@ -312,6 +312,7 @@ public class ImportServiceImpl implements ImportService {
 
     /**
      * This method gets the files from s3 and also creates sub-jobs for each file i.e. File Import Jobs
+     *
      * @param jobExecution the job created by the scheduler with all the required config data
      */
     @Override
@@ -375,7 +376,7 @@ public class ImportServiceImpl implements ImportService {
 
                     // import All the applications from an organization
                     //importApplicationsFromOrg(
-                        //(UUID) config.get("organizationId"), config, jobExecution, s3Import);
+                    //(UUID) config.get("organizationId"), config, jobExecution, s3Import);
 
                 } else if (config.get("collectionName") == null) {
 
@@ -383,13 +384,13 @@ public class ImportServiceImpl implements ImportService {
 
                     // imports an Application from a single organization
                     //importApplicationFromOrg( (UUID) config.get("organizationId"),
-                        // (UUID) config.get("applicationId"), config, jobExecution, s3Import);
+                    // (UUID) config.get("applicationId"), config, jobExecution, s3Import);
 
                 } else {
 
                     // imports a single collection from an app org combo
                     files = importCollectionFromOrgApp(
-                        (UUID)config.get("organizationId"), (UUID)config.get("applicationId"),
+                        (UUID) config.get("organizationId"), (UUID) config.get("applicationId"),
                         config, jobExecution, s3Import);
                 }
             }
@@ -417,7 +418,7 @@ public class ImportServiceImpl implements ImportService {
 
                 // TODO SQS: replace the method inside here so that it uses sqs instead of internal q
 
-                UUID jobID = scheduleFile( config, file.getPath(), importUG);
+                UUID jobID = scheduleFile(config, file.getPath(), importUG);
 
                 Map<String, Object> fileJobID = new HashMap<String, Object>();
                 fileJobID.put("FileName", file.getName());
@@ -457,9 +458,9 @@ public class ImportServiceImpl implements ImportService {
         String collectionName = config.get("collectionName").toString();
 
         String appFileName = prepareCollectionInputFileName(
-            organizationInfo.getName(), application.getName(), collectionName );
+            organizationInfo.getName(), application.getName(), collectionName);
 
-       return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.COLLECTION);
+        return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.COLLECTION);
 
     }
 
@@ -506,30 +507,31 @@ public class ImportServiceImpl implements ImportService {
         }
 
         // prepares the prefix path for the files to be import depending on the endpoint being hit
-        String appFileName = prepareOrganizationInputFileName( organizationInfo.getName());
+        String appFileName = prepareOrganizationInputFileName(organizationInfo.getName());
 
         return copyFileFromS3(importUG, appFileName, config, s3Import, ImportType.ORGANIZATION);
 
     }
 
 
-    protected String prepareCollectionInputFileName(String orgName, String appName, String collectionName ) {
+    protected String prepareCollectionInputFileName(String orgName, String appName, String collectionName) {
         return orgName + "/" + appName + "." + collectionName + ".";
     }
 
 
-    protected String prepareApplicationInputFileName(String orgName, String appName ) {
+    protected String prepareApplicationInputFileName(String orgName, String appName) {
         return orgName + "/" + appName + ".";
     }
 
 
-    protected String prepareOrganizationInputFileName(String orgName ) {
+    protected String prepareOrganizationInputFileName(String orgName) {
         return orgName + "/";
     }
 
 
     /**
      * Copies file from S3.
+     *
      * @param importUG    Import instance
      * @param appFileName the base file name for the files to be downloaded
      * @param config      the config information for the import job
@@ -537,7 +539,7 @@ public class ImportServiceImpl implements ImportService {
      * @param type        it indicates the type of import
      */
     public ArrayList<File> copyFileFromS3(Import importUG, String appFileName,
-        Map<String, Object> config, S3Import s3Import, ImportType type) throws Exception {
+                                          Map<String, Object> config, S3Import s3Import, ImportType type) throws Exception {
 
         EntityManager rootEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
         ArrayList<File> copyFiles = new ArrayList<>();
@@ -587,43 +589,45 @@ public class ImportServiceImpl implements ImportService {
                 emManagementApp.update(fileImport);
 
                 // Get target application ID from the job data (NOT from the filename)
-                UUID targetAppId = (UUID)jobExecution.getJobData().getProperty("applicationId");
+                UUID targetAppId = (UUID) jobExecution.getJobData().getProperty("applicationId");
 
-                if ( emManagementApp.get( targetAppId ) == null ) {
+                if (emManagementApp.get(targetAppId) == null) {
                     throw new IllegalArgumentException("Application does not exist: " + targetAppId.toString());
                 }
-                EntityManager targetEm = emf.getEntityManager( targetAppId );
+                EntityManager targetEm = emf.getEntityManager(targetAppId);
                 logger.debug("   importing into app {} file {}", targetAppId.toString(), file.getAbsolutePath());
 
-                JsonParser jp = getJsonParserForFile(file);
-
-                // in case of resume, retrieve the last updated UUID for this file
-                String lastUpdatedUUID = fileImport.getLastUpdatedUUID();
-
-                // this handles partially completed files by updating entities from the point of failure
-                if (!lastUpdatedUUID.equals(" ")) {
+                importEntitiesFromFile(file, targetEm, emManagementApp, fileImport, jobExecution);
 
-                    // go till the last updated entity
-                    while (!jp.getText().equals(lastUpdatedUUID)) {
-                        jp.nextToken();
-                    }
-
-                    // skip the last one and start from the next one
-                    while (!(jp.getCurrentToken() == JsonToken.END_OBJECT
-                            && jp.nextToken() == JsonToken.START_OBJECT)) {
-                        jp.nextToken();
-                    }
-                }
+                // TODO: fix the resume on error feature
 
-                // get to start of an object i.e next entity.
-                while (jp.getCurrentToken() != JsonToken.START_OBJECT) {
-                    jp.nextToken();
-                }
-
-                while (jp.nextToken() != JsonToken.END_ARRAY) {
-                    importEntitiesFromFile(jp, targetEm, emManagementApp, fileImport, jobExecution);
-                }
-                jp.close();
+//                // in case of resume, retrieve the last updated UUID for this file
+//                String lastUpdatedUUID = fileImport.getLastUpdatedUUID();
+//
+//                // this handles partially completed files by updating entities from the point of failure
+//                if (!lastUpdatedUUID.equals(" ")) {
+//
+//                    // go till the last updated entity
+//                    while (!jp.getText().equals(lastUpdatedUUID)) {
+//                        jp.nextToken();
+//                    }
+//
+//                    // skip the last one and start from the next one
+//                    while (!(jp.getCurrentToken() == JsonToken.END_OBJECT
+//                        && jp.nextToken() == JsonToken.START_OBJECT)) {
+//                        jp.nextToken();
+//                    }
+//                }
+//
+//                // get to start of an object i.e next entity.
+//                while (jp.getCurrentToken() != JsonToken.START_OBJECT) {
+//                    jp.nextToken();
+//                }
+//
+//                while (jp.nextToken() != JsonToken.END_ARRAY) {
+//                    importEntitiesFromFile(jp, targetEm, emManagementApp, fileImport, jobExecution);
+//                }
+//                jp.close();
 
                 // Updates the state of file import job
                 if (!fileImport.getState().toString().equals("FAILED")) {
@@ -642,7 +646,7 @@ public class ImportServiceImpl implements ImportService {
                     UUID importId = importEntity.get(0).getUuid();
                     Import importUG = emManagementApp.get(importId, Import.class);
 
-                    Results entities = emManagementApp.getConnectedEntities( importUG, "includes", null, Level.ALL_PROPERTIES);
+                    Results entities = emManagementApp.getConnectedEntities(importUG, "includes", null, Level.ALL_PROPERTIES);
                     List<Entity> importFile = entities.getEntities();
 
                     int count = 0;
@@ -667,9 +671,10 @@ public class ImportServiceImpl implements ImportService {
 
     /**
      * Checks if a file is a valid JSON
+     *
      * @param collectionFile the file being validated
-     * @param rootEm    the Entity Manager for the Management application
-     * @param fileImport the file import entity
+     * @param rootEm         the Entity Manager for the Management application
+     * @param fileImport     the file import entity
      * @return
      * @throws Exception
      */
@@ -696,6 +701,7 @@ public class ImportServiceImpl implements ImportService {
 
     /**
      * Gets the JSON parser for given file
+     *
      * @param collectionFile the file for which JSON parser is required
      */
     private JsonParser getJsonParserForFile(File collectionFile) throws Exception {
@@ -705,27 +711,33 @@ public class ImportServiceImpl implements ImportService {
     }
 
 
-
     /**
      * Imports the entity's connecting references (collections, connections and dictionaries)
-     * @param jp  JsonParser pointing to the beginning of the object.
-     * @param em Entity Manager for the application being imported
-     * @param rootEm Entity manager for the root applicaition
-     * @param fileImport the file import entity
-     * @param jobExecution  execution details for the import jbo
+     *
+     * @param jp           JsonParser pointing to the beginning of the object.
+     * @param em           Entity Manager for the application being imported
+     * @param rootEm       Entity manager for the root applicaition
+     * @param fileImport   the file import entity
+     * @param jobExecution execution details for the import jbo
      */
     private void importEntitiesFromFile(
-        final JsonParser jp,
+        final File file,
         final EntityManager em,
         final EntityManager rootEm,
         final FileImport fileImport,
         final JobExecution jobExecution) throws Exception {
 
-        final JsonParserObservable subscribe = new JsonParserObservable(jp, em, rootEm, fileImport);
 
-        final Observable<WriteEvent> observable = Observable.create(subscribe);
+        // first we do entities
+        boolean entitiesOnly = true;
 
-        // This is the action we want to perform for every UUID we receive
+        // observable that parses JSON and emits write events
+        JsonParser jp = getJsonParserForFile(file);
+        final JsonEntityParserObservable jsonObservableEntities =
+            new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
+        final Observable<WriteEvent> entityEventObservable = Observable.create(jsonObservableEntities);
+
+        // function to execute for each write event
         final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
             @Override
             public void call(WriteEvent writeEvent) {
@@ -736,16 +748,13 @@ public class ImportServiceImpl implements ImportService {
 //        final AtomicLong entityCounter = new AtomicLong();
 //        final AtomicLong eventCounter = new AtomicLong();
 
-        // This is boilerplate glue code. We have to follow this for the parallel operation.
-        // In the "call" method we want to simply return the input observable + the chain of
-        // operations we want to invoke
-
-        observable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+        // start parsing JSON
+        entityEventObservable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
             @Override
             public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
 
-            // TODO: need to fixed so that number of entities created can be counted correctly and
-            // TODO: also update last updated UUID for fileImport which is a must for resume-ability
+                // TODO: need to fixed so that number of entities created can be counted correctly and
+                // TODO: also update last updated UUID for fileImport which is a must for resume-ability
 
 //                return entityWrapperObservable.doOnNext(doWork).doOnNext(new Action1<WriteEvent>() {
 //
@@ -783,6 +792,30 @@ public class ImportServiceImpl implements ImportService {
 
             }
         }, Schedulers.io()).toBlocking().last();
+        jp.close();
+
+        logger.debug("\n\nWrote entities\n");
+
+        // now do other stuff: connections and dictionaries
+        entitiesOnly = false;
+
+        // observable that parses JSON and emits write events
+        jp = getJsonParserForFile(file);
+        final JsonEntityParserObservable jsonObservableOther =
+            new JsonEntityParserObservable(jp, em, rootEm, fileImport, entitiesOnly);
+        final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther);
+
+        otherEventObservable.parallel(new Func1<Observable<WriteEvent>, Observable<WriteEvent>>() {
+            @Override
+            public Observable<WriteEvent> call(Observable<WriteEvent> entityWrapperObservable) {
+                return entityWrapperObservable.doOnNext(doWork);
+
+            }
+        }, Schedulers.io()).toBlocking().last();
+        
+        jp.close();
+
+        logger.debug("\n\nWrote others\n");
     }
 
 
@@ -813,7 +846,7 @@ public class ImportServiceImpl implements ImportService {
 
             try {
                 logger.debug("Writing imported entity {}:{} into app {}",
-                    new Object[] { entityType, entityUuid, em.getApplication().getUuid() });
+                    new Object[]{entityType, entityUuid, em.getApplication().getUuid()});
 
                 em.create(entityUuid, entityType, properties);
 
@@ -858,14 +891,14 @@ public class ImportServiceImpl implements ImportService {
                 // TODO: what happens if ConnectionEvents  happen before all entities are saved?
 
                 // Connections are specified as UUIDs with no type
-                if ( entityRef.getType() == null ) {
+                if (entityRef.getType() == null) {
                     entityRef = em.get(ownerEntityRef.getUuid());
                 }
 
                 logger.debug("Creating connection from {}:{} to {}:{}",
-                    new Object[] {
+                    new Object[]{
                         ownerEntityRef.getType(), ownerEntityRef.getUuid(),
-                        entityRef.getType(), entityRef.getUuid() });
+                        entityRef.getType(), entityRef.getUuid()});
 
                 em.createConnection(ownerEntityRef, connectionType, entityRef);
 
@@ -902,9 +935,9 @@ public class ImportServiceImpl implements ImportService {
         public void doWrite(EntityManager em, JobExecution jobExecution, FileImport fileImport) {
             EntityManager rootEm = emf.getEntityManager(CpNamingUtils.MANAGEMENT_APPLICATION_ID);
             try {
-                
+
                 logger.debug("Adding map to {}:{} dictionary {}",
-                    new Object[] {ownerEntityRef.getType(), ownerEntityRef.getType(), dictionaryName });
+                    new Object[]{ownerEntityRef.getType(), ownerEntityRef.getType(), dictionaryName});
 
                 em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary);
 
@@ -926,131 +959,120 @@ public class ImportServiceImpl implements ImportService {
     }
 
 
-    private final class JsonParserObservable implements Observable.OnSubscribe<WriteEvent> {
+    private final class JsonEntityParserObservable implements Observable.OnSubscribe<WriteEvent> {
         private final JsonParser jp;
         EntityManager em;
         EntityManager rootEm;
         FileImport fileImport;
+        boolean entitiesOnly;
 
 
-        JsonParserObservable(JsonParser parser, EntityManager em, EntityManager rootEm, FileImport fileImport) {
+        JsonEntityParserObservable(
+            JsonParser parser,
+            EntityManager em,
+            EntityManager rootEm,
+            FileImport fileImport,
+            boolean entitiesOnly) {
             this.jp = parser;
             this.em = em;
             this.rootEm = rootEm;
             this.fileImport = fileImport;
+            this.entitiesOnly = entitiesOnly;
         }
 
 
         @Override
         public void call(final Subscriber<? super WriteEvent> subscriber) {
-
-            // have to do this in two passes so that entities are created before connections
-
-            // first entities
-            process( subscriber, true );
-
-            // next connections and dictionaries
-            process( subscriber, false);
+            process(subscriber);
         }
 
 
-        private void process(final Subscriber<? super WriteEvent> subscriber, boolean entities ) {
-
-            logger.debug("process(): entities = " + entities );
-
-            EntityRef lastEntitySeenRef = null;
+        private void process(final Subscriber<? super WriteEvent> subscriber) {
 
             try {
 
-                while (!subscriber.isUnsubscribed() && jp.nextToken() != JsonToken.END_OBJECT) {
-
-                    String collectionName = jp.getCurrentName();
+                boolean done = false;
+                Stack tokenStack = new Stack();
+                EntityRef lastEntity = null;
 
-                    logger.debug("Processing currentName: " + jp.getCurrentName());
+                while (!done) {
 
-                    // create the  wrapper for connections
-                    if (collectionName != null && collectionName.equals("connections")) {
-
-                        jp.nextToken(); // START_OBJECT
-                        while (jp.nextToken() != JsonToken.END_OBJECT) {
-                            String connectionType = jp.getCurrentName();
-
-                            jp.nextToken(); // START_ARRAY
-                            while (jp.nextToken() != JsonToken.END_ARRAY) {
-                                String entryId = jp.getText();
-
-                                if ( !entities ) {
-                                    EntityRef entryRef = new SimpleEntityRef(UUID.fromString(entryId));
-                                    WriteEvent entityWrapper = new ConnectionEvent(lastEntitySeenRef, connectionType, entryRef);
-                                    subscriber.onNext(entityWrapper);
-                                }
-                            }
-                        }
+                    JsonToken token = jp.nextToken();
+                    String name = jp.getCurrentName();
 
+                    String indent = "";
+                    for (int i = 0; i < tokenStack.size(); i++) {
+                        indent += "   ";
                     }
-                    // create the  wrapper for dictionaries
-                    else if (collectionName != null && collectionName.equals("dictionaries")) {
 
-                        jp.nextToken(); // START_OBJECT
-                        while (jp.nextToken() != JsonToken.END_OBJECT) {
+                    logger.debug("{}Token {} name {}", new Object[]{indent, token, name});
 
-                            String dictionaryName = jp.getCurrentName();
-                            jp.nextToken();
-                            Map<String, Object> dictionary = jp.readValueAs(HashMap.class);
+                    if (token.equals(JsonToken.START_OBJECT) && "Metadata".equals(name)) {
 
-                            if ( !entities ) {
-                                WriteEvent entityWrapper = new DictionaryEvent(
-                                    lastEntitySeenRef, dictionaryName, dictionary);
-                                subscriber.onNext(entityWrapper);
-                            }
-                        }
-                        subscriber.onCompleted();
+                        Map<String, Object> entityMap = jp.readValueAs(HashMap.class);
 
-                    } else {
+                        String type = (String) entityMap.get("type");
+                        UUID uuid = UUID.fromString((String) entityMap.get("uuid"));
+                        lastEntity = new SimpleEntityRef(type, uuid);
 
-                        // Regular collections
+                        logger.debug("{}Got entity with uuid {}", indent, lastEntity);
+                        if (entitiesOnly) {
+                            WriteEvent event = new EntityEvent(uuid, type, entityMap);
+                            subscriber.onNext(event);
+                        }
 
-                        jp.nextToken(); // START_OBJECT
+                    } else if (token.equals(JsonToken.START_OBJECT) && "connections".equals(name)) {
 
-                        JsonToken token = jp.nextToken();
+                        Map<String, Object> connectionMap = jp.readValueAs(HashMap.class);
 
-                        Map<String, Object> properties = new HashMap<>();
+                        for (String type : connectionMap.keySet()) {
+                            List targets = (List) connectionMap.get(type);
 
-                        String entityUuid = null;
-                        String entityType = null;
+                            for (Object targetObject : targets) {
+                                UUID target = UUID.fromString((String) targetObject);
 
-                        while (token != JsonToken.END_OBJECT) {
+                                logger.debug("{}Got connection {} to {}",
+                                    new Object[]{indent, type, target.toString()});
 
-                            if (token == JsonToken.VALUE_STRING || token == JsonToken.VALUE_NUMBER_INT) {
+                                if (!entitiesOnly) {
+                                    EntityRef entryRef = new SimpleEntityRef(target);
+                                    WriteEvent event = new ConnectionEvent(lastEntity, type, entryRef);
+                                    subscriber.onNext(event);
+                                }
+                            }
+                        }
 
-                                String key = jp.getCurrentName();
-                                logger.debug("   currentName: " + jp.getText());
+                    } else if (token.equals(JsonToken.START_OBJECT) && "dictionaries".equals(name)) {
 
-                                if (key.equals("uuid")) {
-                                    entityUuid = jp.getText();
+                        Map<String, Object> dictionariesMap = jp.readValueAs(HashMap.class);
+                        for (String dname : dictionariesMap.keySet()) {
+                            Map dmap = (Map) dictionariesMap.get(dname);
 
-                                } else if (key.equals("type")) {
-                                    entityType = jp.getText();
+                            logger.debug("{}Got dictionary {} size {}",
+                                new Object[] {indent, dname, dmap.size() });
 
-                                } else if (key.length() != 0 && jp.getText().length() != 0) {
-                                    String value = jp.getText();
-                                    properties.put(key, value);
-                                }
+                            if (!entitiesOnly) {
+                                WriteEvent event = new DictionaryEvent(lastEntity, dname, dmap);
+                                subscriber.onNext(event);
                             }
-                            token = jp.nextToken();
                         }
 
-                        if ( entities ) {
-                            WriteEvent entityWrapper = new EntityEvent(
-                                UUID.fromString(entityUuid), entityType, properties);
-                            subscriber.onNext(entityWrapper);
-                        }
+                    } else if (token.equals(JsonToken.START_OBJECT)) {
+                        tokenStack.push(token);
+
+                    } else if (token.equals(JsonToken.END_OBJECT)) {
+                        tokenStack.pop();
+                    }
 
-                        // don't save it, but do keep track of last entity seen
-                        lastEntitySeenRef = new SimpleEntityRef( entityType, UUID.fromString(entityUuid) );
+                    if (token.equals(JsonToken.END_ARRAY) && tokenStack.isEmpty()) {
+                        done = true;
                     }
                 }
 
+                subscriber.onCompleted();
+
+                logger.debug("process(): done parsing JSON");
+
             } catch (Exception e) {
                 // skip illegal entity UUID and go to next one
                 fileImport.setErrorMessage(e.getMessage());
@@ -1062,7 +1084,6 @@ public class ImportServiceImpl implements ImportService {
                 subscriber.onError(e);
             }
         }
-
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b22c37ae/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
index 178909b..742c33e 100644
--- a/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/management/importer/ImportServiceIT.java
@@ -233,10 +233,10 @@ public class ImportServiceIT {
                 }
             }
 
-//            // all things should have been updated
-//            for ( Entity e : importedThings ) {
-//                assertTrue(e.getModified() > thingsMap.get(e.getUuid()).getModified());
-//            }
+            // all things should have been updated
+            for ( Entity e : importedThings ) {
+                assertTrue(e.getModified() > thingsMap.get(e.getUuid()).getModified());
+            }
 
         }
         finally {