You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/06/22 17:40:38 UTC

[06/22] incubator-usergrid git commit: Added threading and queueing to imports.

Added threading and queueing to imports.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c0b89db5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c0b89db5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c0b89db5

Branch: refs/heads/two-dot-o-dev
Commit: c0b89db57ae43ffb1b94780ea79c314b38de5a60
Parents: 5fbe31f
Author: Jeff West <jw...@apigee.com>
Authored: Thu Jun 11 07:47:39 2015 -0700
Committer: Jeff West <jw...@apigee.com>
Committed: Thu Jun 11 07:47:39 2015 -0700

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ImportAdmins.java | 598 ++++++++++++++-----
 1 file changed, 450 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b89db5/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
index 49757ee..7d49459 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
@@ -36,10 +36,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
 import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
@@ -53,55 +53,85 @@ import static org.apache.usergrid.persistence.cassandra.CassandraService.MANAGEM
  */
 public class ImportAdmins extends ToolBase {
 
-    private static final Logger logger = LoggerFactory.getLogger( ImportAdmins.class );
+    private static final Logger logger = LoggerFactory.getLogger(ImportAdmins.class);
 
     /**
      * Input directory where the .json export files are
      */
     static final String INPUT_DIR = "inputDir";
+    static final String WRITE_THREAD_COUNT = "writeThreads";
+    static final String AUDIT_THREAD_COUNT = "auditThreads";
 
     static File importDir;
 
     static final String DEFAULT_INPUT_DIR = "export";
 
+    private Map<Stoppable, Thread> adminWriteThreads = new HashMap<Stoppable, Thread>();
+    private Map<Stoppable, Thread> adminAuditThreads = new HashMap<Stoppable, Thread>();
+    private Map<Stoppable, Thread> metadataWorkerThreadMap = new HashMap<Stoppable, Thread>();
+
+
     JsonFactory jsonFactory = new JsonFactory();
 
 
     @Override
-    @SuppressWarnings( "static-access" )
+    @SuppressWarnings("static-access")
     public Options createOptions() {
 
-        Option hostOption = OptionBuilder.withArgName( "host" )
-                .hasArg().withDescription( "Cassandra host" ).create( "host" );
+        Option hostOption = OptionBuilder.withArgName("host")
+                .hasArg()
+                .withDescription("Cassandra host").create("host");
 
         Option inputDir = OptionBuilder
-                .hasArg().withDescription( "input directory -inputDir" ).create( INPUT_DIR );
+                .hasArg()
+                .withDescription("input directory -inputDir").create(INPUT_DIR);
+
+        Option writeThreads = OptionBuilder
+                .hasArg()
+                .withDescription("Write Threads -writeThreads").create(WRITE_THREAD_COUNT);
+
+        Option auditThreads = OptionBuilder
+                .hasArg()
+                .withDescription("Audit Threads -auditThreads").create(AUDIT_THREAD_COUNT);
 
         Option verbose = OptionBuilder
-                .withDescription( "Print on the console an echo of the content written to the file" )
-                .create( VERBOSE );
+                .withDescription("Print on the console an echo of the content written to the file")
+                .create(VERBOSE);
 
         Options options = new Options();
-        options.addOption( hostOption );
-        options.addOption( inputDir );
-        options.addOption( verbose );
+        options.addOption(hostOption);
+        options.addOption(writeThreads);
+        options.addOption(auditThreads);
+        options.addOption(inputDir);
+        options.addOption(verbose);
 
         return options;
     }
 
 
     @Override
-    public void runTool( CommandLine line ) throws Exception {
+    public void runTool(CommandLine line) throws Exception {
 
         startSpring();
 
-        setVerbose( line );
+        setVerbose(line);
+
+        openImportDirectory(line);
 
-        openImportDirectory( line );
+        int auditThreadCount = 1;
+        int writeThreadCount = 1;
+
+        if (line.hasOption(AUDIT_THREAD_COUNT)) {
+            auditThreadCount = Integer.parseInt(line.getOptionValue(AUDIT_THREAD_COUNT));
+        }
+
+        if (line.hasOption(WRITE_THREAD_COUNT)) {
+            writeThreadCount = Integer.parseInt(line.getOptionValue(WRITE_THREAD_COUNT));
+        }
 
-        importAdminUsers();
+        importAdminUsers(writeThreadCount, auditThreadCount);
 
-        importMetadata();
+        importMetadata(writeThreadCount);
 
         // forces the counters to flush
 //        logger.info( "Sleeping 35 seconds for batcher" );
@@ -112,17 +142,17 @@ public class ImportAdmins extends ToolBase {
     /**
      * Import admin users.
      */
-    private void importAdminUsers() throws Exception {
-        String[] fileNames = importDir.list( new PrefixFileFilter( ExportAdmins.ADMIN_USERS_PREFIX + "." ) );
-        logger.info( "Applications to read: " + fileNames.length );
+    private void importAdminUsers(int writeThreadCount, int auditThreadCount) throws Exception {
 
-        //this fails on the second run of the applications find out why.
-        for ( String fileName : fileNames ) {
+        String[] fileNames = importDir.list(new PrefixFileFilter(ExportAdmins.ADMIN_USERS_PREFIX + "."));
+
+        logger.info("Applications to read: " + fileNames.length);
+
+        for (String fileName : fileNames) {
             try {
-                importAdminUsers( fileName );
-            }
-            catch ( Exception e ) {
-                logger.warn( "Unable to import application: " + fileName, e );
+                importAdminUsers(fileName, writeThreadCount, auditThreadCount);
+            } catch (Exception e) {
+                logger.warn("Unable to import application: " + fileName, e);
             }
         }
     }
@@ -133,84 +163,129 @@ public class ImportAdmins extends ToolBase {
      *
      * @param fileName Name of admin user data file.
      */
-    private void importAdminUsers( String fileName ) throws Exception {
+    private void importAdminUsers(final String fileName,
+                                  final int writeThreadCount,
+                                  final int auditThreadCount) throws Exception {
 
         int count = 0;
 
-        File adminUsersFile = new File( importDir, fileName );
+        File adminUsersFile = new File(importDir, fileName);
+
+        logger.info("----- Loading file: " + adminUsersFile.getAbsolutePath());
+        JsonParser jp = getJsonParserForFile(adminUsersFile);
+
+        int loopCounter = 0;
 
-        logger.info( "----- Loading file: " + adminUsersFile.getAbsolutePath() );
-        JsonParser jp = getJsonParserForFile( adminUsersFile );
+        BlockingQueue<Map<String, Object>> workQueue = new LinkedBlockingQueue<Map<String, Object>>();
+        BlockingQueue<Map<String, Object>> auditQueue = new LinkedBlockingQueue<Map<String, Object>>();
+
+        startAdminWorkers(workQueue, auditQueue, writeThreadCount);
+        startAdminAuditors(auditQueue, auditThreadCount);
 
         JsonToken token = jp.nextToken();
-        validateStartArray( token );
+        validateStartArray(token);
 
-        EntityManager em = emf.getEntityManager( MANAGEMENT_APPLICATION_ID );
+        while (jp.nextValue() != JsonToken.END_ARRAY) {
+            loopCounter += 1;
 
-        while ( jp.nextValue() != JsonToken.END_ARRAY ) {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> entityProps = jp.readValueAs(HashMap.class);
+            if (loopCounter % 100 == 1)
+                logger.info("Publishing to queue... counter=" + loopCounter);
 
-            @SuppressWarnings( "unchecked" )
-            Map<String, Object> entityProps = jp.readValueAs( HashMap.class );
+            workQueue.add(entityProps);
+        }
 
-            // Import/create the entity
-            UUID uuid = getId( entityProps );
-            String type = getType( entityProps );
+        waitForQueueAndMeasure(workQueue, adminWriteThreads, "Admin Write");
+        waitForQueueAndMeasure(auditQueue, adminAuditThreads, "Admin Audit");
 
+        logger.info("----- End: Imported {} admin users from file {}",
+                count, adminUsersFile.getAbsolutePath());
 
-            try {
-                em.create( uuid, type, entityProps );
+        jp.close();
+    }
 
-                logger.debug( "Imported admin user {} {}", uuid, entityProps.get( "username" ) );
-                count++;
-                if ( count % 1000 == 0 ) {
-                    logger.info("Imported {} admin users", count);
-                }
-            }
-            catch ( DuplicateUniquePropertyExistsException de ) {
-                logger.warn( "Unable to create entity. It appears to be a duplicate: " +
-                    "id={}, type={}, name={}, username={}",
-                    new Object[] { uuid, type, entityProps.get("name"), entityProps.get("username")});
-                if ( logger.isDebugEnabled() ) {
-                    logger.debug( "Exception" , de );
-                }
-                continue;
-            }
+    private static void waitForQueueAndMeasure(final BlockingQueue workQueue,
+                                               final Map<Stoppable, Thread> threadMap,
+                                               final String identifier) throws InterruptedException {
+        double rateAverageSum = 0;
+        int iterationCounter = 0;
 
-            if ( em.get( uuid ) == null ) {
-                logger.error( "Holy hell, we wrote an entity and it's missing.  " +
-                                "Entity Id was {} and type is {}", uuid, type );
-                System.exit( 1 );
-            }
-            echo( entityProps );
+        while (!workQueue.isEmpty()) {
+            iterationCounter += 1;
+
+            int sizeLast = workQueue.size();
+            long lastTime = System.currentTimeMillis();
+            logger.info("Queue {} is not empty, remaining size={}, waiting...", identifier, sizeLast);
+            Thread.sleep(5000);
+
+            long timeNow = System.currentTimeMillis();
+            int sizeNow = workQueue.size();
+
+            int processed = sizeLast - sizeNow;
+
+            long timeDelta = timeNow - lastTime;
+
+            double rateLast = (double) processed / (timeDelta / 1000);
+            rateAverageSum += rateLast;
+
+            long timeRemaining = sizeLast / (long) (rateAverageSum / iterationCounter);
+
+            logger.info(
+                    String.format("++PROGRESS (%s): sizeLast=%s nowSize=%s processed=%s rateLast=%s/s rateAvg=%s/s timeRemaining=%s(s)",
+                            identifier, sizeLast, sizeNow, processed, rateLast, (rateAverageSum / iterationCounter), timeRemaining)
+            );
         }
 
-        logger.info( "----- End: Imported {} admin users from file {}",
-                count, adminUsersFile.getAbsolutePath() );
+        for (Stoppable worker : threadMap.keySet()) {
+            worker.setDone(true);
+        }
+    }
+
+    private void startAdminAuditors(BlockingQueue<Map<String, Object>> auditQueue, int workerCount) {
+        for (int x = 0; x < workerCount; x++) {
+            AuditWorker worker = new AuditWorker(auditQueue);
+            Thread workerThread = new Thread(worker, "AdminAuditor-" + x);
+            workerThread.start();
+            adminAuditThreads.put(worker, workerThread);
+        }
 
-        jp.close();
     }
 
 
-    private String getType( Map<String, Object> entityProps ) {
-        return ( String ) entityProps.get( PROPERTY_TYPE );
+    private void startAdminWorkers(BlockingQueue<Map<String, Object>> workQueue,
+                                   BlockingQueue<Map<String, Object>> auditQueue,
+                                   int workerCount) {
+
+        for (int x = 0; x < workerCount; x++) {
+            ImportAdminWorker worker = new ImportAdminWorker(workQueue, auditQueue);
+            Thread workerThread = new Thread(worker, "AdminWriter-" + x);
+            workerThread.start();
+            adminWriteThreads.put(worker, workerThread);
+        }
     }
 
 
-    private UUID getId( Map<String, Object> entityProps ) {
-        return UUID.fromString( ( String ) entityProps.get( PROPERTY_UUID ) );
+    private String getType(Map<String, Object> entityProps) {
+        return (String) entityProps.get(PROPERTY_TYPE);
     }
 
 
-    private void validateStartArray( JsonToken token ) {
-        if ( token != JsonToken.START_ARRAY ) {
-            throw new RuntimeException( "Token should be START ARRAY but it is:" + token.asString() );
+    private UUID getId(Map<String, Object> entityProps) {
+        return UUID.fromString((String) entityProps.get(PROPERTY_UUID));
+    }
+
+
+    private void validateStartArray(JsonToken token) {
+        if (token != JsonToken.START_ARRAY) {
+            throw new RuntimeException("Token should be START ARRAY but it is:" + token.asString());
         }
     }
 
 
-    private JsonParser getJsonParserForFile( File organizationFile ) throws Exception {
-        JsonParser jp = jsonFactory.createJsonParser( organizationFile );
-        jp.setCodec( new ObjectMapper() );
+    private JsonParser getJsonParserForFile(File organizationFile) throws Exception {
+        JsonParser jp = jsonFactory.createJsonParser(organizationFile);
+        jp.setCodec(new ObjectMapper());
         return jp;
     }
 
@@ -218,66 +293,82 @@ public class ImportAdmins extends ToolBase {
     /**
      * Import collections. Collections files are named: collections.<application_name>.Timestamp.json
      */
-    private void importMetadata() throws Exception {
+    private void importMetadata(int writeThreadCount) throws Exception {
 
         String[] fileNames = importDir.list(
-                new PrefixFileFilter( ExportAdmins.ADMIN_USER_METADATA_PREFIX + "." ) );
-        logger.info( "Metadata files to read: " + fileNames.length );
+                new PrefixFileFilter(ExportAdmins.ADMIN_USER_METADATA_PREFIX + "."));
+        logger.info("Metadata files to read: " + fileNames.length);
 
-        for ( String fileName : fileNames ) {
+        for (String fileName : fileNames) {
             try {
-                importMetadata( fileName );
-            }
-            catch ( Exception e ) {
-                logger.warn( "Unable to import metadata file: " + fileName, e );
+                importMetadata(fileName, writeThreadCount);
+            } catch (Exception e) {
+                logger.warn("Unable to import metadata file: " + fileName, e);
             }
         }
     }
 
+    private void startMetadataWorkers(BlockingQueue<ImportMetadataTask> workQueue, int writeThreadCount) {
 
-    @SuppressWarnings( "unchecked" )
-    private void importMetadata( String fileName ) throws Exception {
+        for (int x = 0; x < writeThreadCount; x++) {
+            ImportMetadataWorker worker = new ImportMetadataWorker(workQueue);
+            Thread workerThread = new Thread(worker, "ImportMetadataTask-" + x);
+            workerThread.start();
+            metadataWorkerThreadMap.put(worker, workerThread);
+        }
+    }
 
-        EntityManager em = emf.getEntityManager( MANAGEMENT_APPLICATION_ID );
 
-        File metadataFile = new File( importDir, fileName );
+    @SuppressWarnings("unchecked")
+    private void importMetadata(String fileName, int writeThreads) throws Exception {
+
+        EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
 
-        logger.info( "----- Loading metadata file: " + metadataFile.getAbsolutePath() );
+        File metadataFile = new File(importDir, fileName);
 
-        JsonParser jp = getJsonParserForFile( metadataFile );
+        logger.info("----- Loading metadata file: " + metadataFile.getAbsolutePath());
+
+        JsonParser jp = getJsonParserForFile(metadataFile);
 
         JsonToken jsonToken = null; // jp.nextToken();// START_OBJECT this is the outer hashmap
 
         int depth = 1;
 
-        while ( depth > 0 ) {
+        BlockingQueue<ImportMetadataTask> workQueue = new LinkedBlockingQueue<ImportMetadataTask>();
+        startMetadataWorkers(workQueue, writeThreads);
+
+        while (depth > 0) {
 
             jsonToken = jp.nextToken();
 
-            if ( jsonToken == null ) {
+            if (jsonToken == null) {
                 logger.info("token is null, breaking");
                 break;
             }
 
-            if (jsonToken.equals( JsonToken.START_OBJECT )) {
+            if (jsonToken.equals(JsonToken.START_OBJECT)) {
                 depth++;
-            } else if (jsonToken.equals( JsonToken.END_OBJECT )) {
+            } else if (jsonToken.equals(JsonToken.END_OBJECT)) {
                 depth--;
             }
 
-            if (jsonToken.equals( JsonToken.FIELD_NAME ) && depth == 2 ) {
+            if (jsonToken.equals(JsonToken.FIELD_NAME) && depth == 2) {
 
                 jp.nextToken();
 
                 String entityOwnerId = jp.getCurrentName();
-                EntityRef entityRef = em.getRef( UUID.fromString( entityOwnerId ) );
+                EntityRef entityRef = em.getRef(UUID.fromString(entityOwnerId));
 
-                Map<String, Object> metadata = (Map<String, Object>)jp.readValueAs( Map.class );
-                importEntityMetadata( em, entityRef, metadata );
+                Map<String, Object> metadata = (Map<String, Object>) jp.readValueAs(Map.class);
+
+                workQueue.put(new ImportMetadataTask(entityRef, metadata));
+//                importEntityMetadata(em, entityRef, metadata);
             }
         }
 
-        logger.info( "----- End of metadata -----" );
+        waitForQueueAndMeasure(workQueue, metadataWorkerThreadMap, "Metadata Load");
+
+        logger.info("----- End of metadata -----");
         jp.close();
     }
 
@@ -287,113 +378,113 @@ public class ImportAdmins extends ToolBase {
      */
     @SuppressWarnings("unchecked")
     private void importEntityMetadata(
-        EntityManager em, EntityRef entityRef, Map<String, Object> metadata ) throws Exception {
+            EntityManager em, EntityRef entityRef, Map<String, Object> metadata) throws Exception {
 
-        Map<String, Object> connectionsMap = (Map<String, Object>) metadata.get( "connections" );
+        Map<String, Object> connectionsMap = (Map<String, Object>) metadata.get("connections");
 
         if (connectionsMap != null && !connectionsMap.isEmpty()) {
             for (String type : connectionsMap.keySet()) {
                 try {
-                    UUID uuid = UUID.fromString( (String) connectionsMap.get( type ) );
-                    EntityRef connectedEntityRef = em.getRef( uuid );
-                    em.createConnection( entityRef, type, connectedEntityRef );
+                    UUID uuid = UUID.fromString((String) connectionsMap.get(type));
+                    EntityRef connectedEntityRef = em.getRef(uuid);
+                    em.createConnection(entityRef, type, connectedEntityRef);
 
-                    logger.debug( "Creating connection from {} type {} target {}",
+                    logger.debug("Creating connection from {} type {} target {}",
                             new Object[]{entityRef, type, connectedEntityRef});
 
                 } catch (Exception e) {
                     if (logger.isDebugEnabled()) {
-                        logger.error( "Error importing connection of type "
-                                + type + " for user " + entityRef.getUuid(), e );
+                        logger.error("Error importing connection of type "
+                                + type + " for user " + entityRef.getUuid(), e);
                     } else {
-                        logger.error( "Error importing connection of type "
-                                + type + " for user " + entityRef.getUuid() );
+                        logger.error("Error importing connection of type "
+                                + type + " for user " + entityRef.getUuid());
                     }
                 }
             }
         }
 
-        Map<String, Object> dictionariesMap = (Map<String, Object>) metadata.get( "dictionaries" );
+        Map<String, Object> dictionariesMap = (Map<String, Object>) metadata.get("dictionaries");
 
         if (dictionariesMap != null && !dictionariesMap.isEmpty()) {
             for (String name : dictionariesMap.keySet()) {
                 try {
-                    Map<String, Object> dictionary = (Map<String, Object>) dictionariesMap.get( name );
-                    em.addMapToDictionary( entityRef, name, dictionary );
+                    Map<String, Object> dictionary = (Map<String, Object>) dictionariesMap.get(name);
+                    em.addMapToDictionary(entityRef, name, dictionary);
 
                     logger.debug("Creating dictionary for {} name {} map {}",
-                            new Object[] { entityRef, name, dictionary  });
+                            new Object[]{entityRef, name, dictionary});
 
                 } catch (Exception e) {
                     if (logger.isDebugEnabled()) {
-                        logger.error( "Error importing dictionary name "
-                                + name + " for user " + entityRef.getUuid(), e );
+                        logger.error("Error importing dictionary name "
+                                + name + " for user " + entityRef.getUuid(), e);
                     } else {
-                        logger.error( "Error importing dictionary name "
-                                + name + " for user " + entityRef.getUuid() );
+                        logger.error("Error importing dictionary name "
+                                + name + " for user " + entityRef.getUuid());
                     }
                 }
             }
         }
 
-        List<String> collectionsList = (List<String>) metadata.get( "collections" );
+        List<String> collectionsList = (List<String>) metadata.get("collections");
         if (collectionsList != null && !collectionsList.isEmpty()) {
             for (String name : collectionsList) {
                 try {
-                    UUID uuid = UUID.fromString( (String) connectionsMap.get( name ) );
-                    EntityRef collectedEntityRef = em.getRef( uuid );
-                    em.addToCollection( entityRef, name, collectedEntityRef );
+                    UUID uuid = UUID.fromString((String) connectionsMap.get(name));
+                    EntityRef collectedEntityRef = em.getRef(uuid);
+                    em.addToCollection(entityRef, name, collectedEntityRef);
 
                     logger.debug("Add to collection of {} name {} entity {}",
-                            new Object[] { entityRef, name, collectedEntityRef });
+                            new Object[]{entityRef, name, collectedEntityRef});
 
                 } catch (Exception e) {
                     if (logger.isDebugEnabled()) {
-                        logger.error( "Error adding to collection "
-                                + name + " for user " + entityRef.getUuid(), e );
+                        logger.error("Error adding to collection "
+                                + name + " for user " + entityRef.getUuid(), e);
                     } else {
-                        logger.error( "Error adding to collection "
-                                + name + " for user " + entityRef.getUuid() );
+                        logger.error("Error adding to collection "
+                                + name + " for user " + entityRef.getUuid());
                     }
                 }
             }
         }
 
 
-        List<Object> organizationsList = (List<Object>) metadata.get( "organizations" );
+        List<Object> organizationsList = (List<Object>) metadata.get("organizations");
         if (organizationsList != null && !organizationsList.isEmpty()) {
 
-            User user = em.get( entityRef, User.class );
+            User user = em.get(entityRef, User.class);
 
-            if ( user == null ) {
+            if (user == null) {
                 logger.error("User with uuid={} not found, not adding to organizations");
 
             } else {
 
-                final UserInfo userInfo = managementService.getAdminUserByEmail( user.getEmail() );
+                final UserInfo userInfo = managementService.getAdminUserByEmail(user.getEmail());
 
                 for (Object orgObject : organizationsList) {
 
                     Map<String, Object> orgMap = (Map<String, Object>) orgObject;
-                    UUID orgUuid = UUID.fromString( (String) orgMap.get( "uuid" ) );
-                    String orgName = (String) orgMap.get( "name" );
+                    UUID orgUuid = UUID.fromString((String) orgMap.get("uuid"));
+                    String orgName = (String) orgMap.get("name");
 
                     // create org only if it does not exist
-                    OrganizationInfo orgInfo = managementService.getOrganizationByUuid( orgUuid );
+                    OrganizationInfo orgInfo = managementService.getOrganizationByUuid(orgUuid);
                     if (orgInfo == null) {
                         try {
-                            managementService.createOrganization( orgUuid, orgName, userInfo, false );
-                            orgInfo = managementService.getOrganizationByUuid( orgUuid );
+                            managementService.createOrganization(orgUuid, orgName, userInfo, false);
+                            orgInfo = managementService.getOrganizationByUuid(orgUuid);
 
-                            logger.debug( "Created new org {} for user {}",
-                                    new Object[]{orgInfo.getName(), user.getEmail()} );
+                            logger.debug("Created new org {} for user {}",
+                                    new Object[]{orgInfo.getName(), user.getEmail()});
 
                         } catch (DuplicateUniquePropertyExistsException dpee) {
-                            logger.error( "Org {} already exists", orgName );
+                            logger.error("Org {} already exists", orgName);
                         }
                     } else {
-                        managementService.addAdminUserToOrganization( userInfo, orgInfo, false );
-                        logger.debug( "Added user {} to org {}", new Object[]{user.getEmail(), orgName} );
+                        managementService.addAdminUserToOrganization(userInfo, orgInfo, false);
+                        logger.debug("Added user {} to org {}", new Object[]{user.getEmail(), orgName});
                     }
                 }
             }
@@ -404,18 +495,229 @@ public class ImportAdmins extends ToolBase {
     /**
      * Open up the import directory based on <code>importDir</code>
      */
-    private void openImportDirectory( CommandLine line ) {
+    private void openImportDirectory(CommandLine line) {
+
+        boolean hasInputDir = line.hasOption(INPUT_DIR);
+
+        if (hasInputDir) {
+            importDir = new File(line.getOptionValue(INPUT_DIR));
+        } else {
+            importDir = new File(DEFAULT_INPUT_DIR);
+        }
+
+        logger.info("Importing from:" + importDir.getAbsolutePath());
+        logger.info("Status. Exists: " + importDir.exists() + " - Readable: " + importDir.canRead());
+    }
 
-        boolean hasInputDir = line.hasOption( INPUT_DIR );
 
-        if ( hasInputDir ) {
-            importDir = new File( line.getOptionValue( INPUT_DIR ) );
+    interface Stoppable {
+        void setDone(boolean done);
+    }
+
+    class AuditWorker implements Runnable, Stoppable {
+        private BlockingQueue<Map<String, Object>> workQueue;
+        private boolean done;
+
+        public AuditWorker(BlockingQueue<Map<String, Object>> workQueue) {
+            this.workQueue = workQueue;
         }
-        else {
-            importDir = new File( DEFAULT_INPUT_DIR );
+
+        @Override
+        public void setDone(boolean done) {
+            this.done = done;
         }
 
-        logger.info( "Importing from:" + importDir.getAbsolutePath() );
-        logger.info( "Status. Exists: " + importDir.exists() + " - Readable: " + importDir.canRead() );
+        @Override
+        public void run() {
+            int count = 0;
+
+            EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+            long durationSum = 0;
+
+            while (!done) {
+                try {
+                    Map<String, Object> entityProps = this.workQueue.poll(30, TimeUnit.SECONDS);
+
+                    if (entityProps == null) {
+                        logger.warn("Reading from AUDIT queue was null!");
+                        Thread.sleep(1000);
+                        continue;
+                    }
+
+                    count++;
+                    long startTime = System.currentTimeMillis();
+
+                    UUID uuid = (UUID) entityProps.get(PROPERTY_UUID);
+                    String type = getType(entityProps);
+
+                    if (em.get(uuid) == null) {
+                        logger.error("Holy hell, we wrote an entity and it's missing.  " +
+                                "Entity Id was {} and type is {}", uuid, type);
+                        System.exit(1);
+                    }
+
+                    echo(entityProps);
+
+                    long stopTime = System.currentTimeMillis();
+
+                    long duration = stopTime - startTime;
+                    durationSum += duration;
+                    logger.debug(String.format("Audited [%s]th admin", count));
+                    logger.info(String.format("Average Audit Rate: %s(ms)", durationSum / count));
+
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+
+            logger.warn("Done!");
+        }
+    }
+
+
+    class ImportMetadataTask {
+        public EntityRef entityRef;
+        public Map<String, Object> metadata;
+
+        public ImportMetadataTask(EntityRef entityRef, Map<String, Object> metadata) {
+            this.entityRef = entityRef;
+            this.metadata = metadata;
+        }
+    }
+
+    class ImportMetadataWorker implements Runnable, Stoppable {
+        private BlockingQueue<ImportMetadataTask> workQueue;
+        private boolean done = false;
+
+        public ImportMetadataWorker(final BlockingQueue<ImportMetadataTask> workQueue) {
+            this.workQueue = workQueue;
+
+        }
+
+        @Override
+        public void setDone(boolean done) {
+            this.done = done;
+        }
+
+        @Override
+        public void run() {
+            int count = 0;
+
+            EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+            long durationSum = 0;
+
+            while (!done) {
+                try {
+                    ImportMetadataTask task = this.workQueue.poll(30, TimeUnit.SECONDS);
+
+                    if (task == null) {
+                        logger.warn("Reading from metadata queue was null!");
+                        Thread.sleep(1000);
+                        continue;
+                    }
+
+                    count++;
+                    long startTime = System.currentTimeMillis();
+                    importEntityMetadata(em, task.entityRef, task.metadata);
+                    long stopTime = System.currentTimeMillis();
+
+                    long duration = stopTime - startTime;
+                    durationSum += duration;
+                    logger.debug(String.format("Imported [%s]th metadata", count));
+                    logger.info(String.format("Average metadata Imported Rate: %s(ms)", durationSum / count));
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    logger.debug("EXCEPTION", e);
+                }
+            }
+
+            logger.warn("Done!");
+        }
+    }
+
+
+    class ImportAdminWorker implements Runnable, Stoppable {
+
+        private BlockingQueue<Map<String, Object>> workQueue;
+        private BlockingQueue<Map<String, Object>> auditQueue;
+        private boolean done = false;
+
+
+        public ImportAdminWorker(final BlockingQueue<Map<String, Object>> workQueue,
+                                 final BlockingQueue<Map<String, Object>> auditQueue) {
+            logger.info("New Worker!");
+            this.workQueue = workQueue;
+            this.auditQueue = auditQueue;
+        }
+
+        @Override
+        public void setDone(boolean done) {
+            this.done = done;
+        }
+
+        @Override
+        public void run() {
+            int count = 0;
+
+            EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+
+            long durationSum = 0;
+
+            while (!done) {
+
+                try {
+
+                    Map<String, Object> entityProps = this.workQueue.poll(30, TimeUnit.SECONDS);
+
+                    if (entityProps == null) {
+                        logger.warn("Reading from admin import queue was null!");
+                        Thread.sleep(1000);
+                        continue;
+                    }
+
+                    // Import/create the entity
+                    UUID uuid = getId(entityProps);
+                    String type = getType(entityProps);
+
+
+                    try {
+                        long startTime = System.currentTimeMillis();
+                        em.create(uuid, type, entityProps);
+                        auditQueue.put(entityProps);
+                        long stopTime = System.currentTimeMillis();
+
+                        long duration = stopTime - startTime;
+                        durationSum += duration;
+
+                        count++;
+                        logger.debug(String.format("Imported [%s]th admin user %s  / %s", count, uuid, entityProps.get("username")));
+                        logger.info(String.format("Average Creation Rate: %s(ms)", durationSum / count));
+
+                        if (count % 100 == 0) {
+                            logger.info("Imported {} admin users", count);
+                        }
+                    } catch (DuplicateUniquePropertyExistsException de) {
+                        logger.warn("Unable to create entity. It appears to be a duplicate: " +
+                                        "id={}, type={}, name={}, username={}",
+                                new Object[]{uuid, type, entityProps.get("name"), entityProps.get("username")});
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Exception", de);
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+
+            }
+
+            logger.warn("Done!");
+        }
     }
 }