You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/10 15:37:41 UTC

[35/50] [abbrv] incubator-usergrid git commit: Use multiple threads to read data from Cassandra and one thread to write that data to the two output files.

Use multiple threads to read data from Cassandra and one thread to write that data to the two output files.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0631bc6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0631bc6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0631bc6b

Branch: refs/heads/two-dot-o-import
Commit: 0631bc6b9d0de0e9022284469ac32a0a2dce5d38
Parents: 5fbe31f
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Jun 11 17:08:52 2015 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Jun 11 17:08:52 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportAdmins.java | 481 +++++++++++++------
 1 file changed, 340 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0631bc6b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
index 8831b89..90d3726 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
@@ -18,39 +18,61 @@
 package org.apache.usergrid.tools;
 
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
 import com.google.common.collect.BiMap;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.Results.Level;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
+import org.apache.usergrid.utils.StringUtils;
 import org.codehaus.jackson.JsonGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.commons.cli.CommandLine;
-
-import org.apache.usergrid.persistence.ConnectionRef;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.Results.Level;
-import org.apache.usergrid.persistence.cassandra.CassandraService;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 
 /**
+ * Export Admin Users and metadata including organizations.
+ *
  * java -jar usergrid-tools.jar ExportAdmins
  */
 public class ExportAdmins extends ExportingToolBase {
 
     static final Logger logger = LoggerFactory.getLogger( ExportAdmins.class );
-
     public static final String ADMIN_USERS_PREFIX = "admin-users";
     public static final String ADMIN_USER_METADATA_PREFIX = "admin-user-metadata";
+    private static final String READ_THREAD_COUNT = "readThreads";
+    private int readThreadCount;
+
+
+    /**
+     * Represents an AdminUser that has been read and is ready for export.
+     */
+    class AdminUserWriteTask {
+        Entity                           adminUser;
+        Map<String, List<UUID>>          collectionsByName;
+        Map<String, List<ConnectionRef>> connectionsByType;
+        Map<String, Map<Object, Object>> dictionariesByName;
+        BiMap<UUID, String>              orgNamesByUuid;
+    }
 
+
+    /**
+     * Export admin users using multiple threads.
+     * <p/>
+     * How it works:
+     * In main thread we query for IDs of all admin users, add each ID to read queue.
+     * Read-queue workers read admin user data, add data to write queue.
+     * One write-queue worker reads data writes to file.
+     */
     @Override
-    public void runTool( CommandLine line ) throws Exception {
+    public void runTool(CommandLine line) throws Exception {
         startSpring();
 
         setVerbose( line );
@@ -60,213 +82,390 @@ public class ExportAdmins extends ExportingToolBase {
         outputDir = createOutputParentDir();
         logger.info( "Export directory: " + outputDir.getAbsolutePath() );
 
-        exportAdminUsers();
-    }
+        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
+            try {
+                readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
+            } catch (NumberFormatException nfe) {
+                logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+                return;
+            }
+        } else {
+            readThreadCount = 20;
+        }
 
+        // start write queue worker
 
-    private void exportAdminUsers() throws Exception {
+        BlockingQueue<AdminUserWriteTask> writeQueue = new LinkedBlockingQueue<AdminUserWriteTask>();
+        AdminUserWriter adminUserWriter = new AdminUserWriter( writeQueue );
+        Thread writeThread = new Thread( adminUserWriter );
+        writeThread.start();
+        logger.debug( "Write thread started" );
 
-        int count = 0;
+        // start read queue workers
 
+        BlockingQueue<UUID> readQueue = new LinkedBlockingQueue<UUID>();
+        List<AdminUserReader> readers = new ArrayList<AdminUserReader>();
+        for (int i = 0; i < readThreadCount; i++) {
+            AdminUserReader worker = new AdminUserReader( readQueue, writeQueue );
+            Thread readerThread = new Thread( worker, "AdminUserReader-" + i );
+            readerThread.start();
+            readers.add( worker );
+        }
+        logger.debug( readThreadCount + " read worker threads started" );
+
+        // query for IDs, add each to read queue
+
+        Query query = new Query();
+        query.setLimit( MAX_ENTITY_FETCH );
+        query.setResultsLevel( Level.IDS );
         EntityManager em = emf.getEntityManager( CassandraService.MANAGEMENT_APPLICATION_ID );
+        Results ids = em.searchCollection( em.getApplicationRef(), "users", query );
 
-        // write one JSON file for management application users
+        while (ids.size() > 0) {
+            for (UUID uuid : ids.getIds()) {
+                readQueue.add( uuid );
+                logger.debug( "Added uuid to readQueue: " + uuid );
+            }
+            if (ids.getCursor() == null) {
+                break;
+            }
+            query.setCursor( ids.getCursor() );
+            ids = em.searchCollection( em.getApplicationRef(), "users", query );
+        }
 
-        JsonGenerator usersFile =
-                getJsonGenerator( createOutputFile( ADMIN_USERS_PREFIX, em.getApplication().getName() ) );
-        usersFile.writeStartArray();
+        adminUserWriter.setDone( true );
+        for (AdminUserReader aur : readers) {
+            aur.setDone( true );
+        }
 
-        // write one JSON file for metadata: collections, connections and dictionaries of those users
+        logger.debug( "Waiting for write thread to complete" );
+        writeThread.join();
+    }
 
-        JsonGenerator metadataFile =
-                getJsonGenerator( createOutputFile( ADMIN_USER_METADATA_PREFIX, em.getApplication().getName() ) );
-        metadataFile.writeStartObject();
 
-        // query for and loop through all users in management application
+    @Override
+    @SuppressWarnings("static-access")
+    public Options createOptions() {
 
-        Query query = new Query();
-        query.setLimit( MAX_ENTITY_FETCH );
-        query.setResultsLevel( Results.Level.ALL_PROPERTIES );
+        Options options = super.createOptions();
 
-        Results entities = em.searchCollection( em.getApplicationRef(), "users", query );
+        Option readThreads = OptionBuilder
+                .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
 
-        while ( entities.size() > 0 ) {
+        options.addOption( readThreads );
+        return options;
+    }
 
-            for ( Entity entity : entities ) {
 
-                // write user to application file
-                usersFile.writeObject( entity );
-                echo( entity );
+    public class AdminUserReader implements Runnable {
 
-                // write user's collections, connections, etc. to collections file
-                saveEntityMetadata( metadataFile, em, null, entity );
+        private boolean done = true;
 
-                logger.debug("Exported user {}", entity.getProperty( "email" ));
+        private final BlockingQueue<UUID> readQueue;
+        private final BlockingQueue<AdminUserWriteTask> writeQueue;
+
+        public AdminUserReader( BlockingQueue<UUID> readQueue, BlockingQueue<AdminUserWriteTask> writeQueue ) {
+            this.readQueue = readQueue;
+            this.writeQueue = writeQueue;
+        }
 
-                count++;
-                if ( count % 1000 == 0 ) {
-                    logger.info("Exported {} admin users", count);
-                }
 
+        @Override
+        public void run() {
+            try {
+                readAndQueueAdminUsers();
+            } catch (Exception e) {
+                logger.error("Error reading data for export", e);
             }
+        }
 
-            if ( entities.getCursor() == null ) {
-                break;
+
+        private void readAndQueueAdminUsers() throws Exception {
+
+            EntityManager em = emf.getEntityManager( CassandraService.MANAGEMENT_APPLICATION_ID );
+
+            while ( true ) {
+
+                UUID uuid = null;
+                try {
+                    uuid = readQueue.poll( 30, TimeUnit.SECONDS );
+                    logger.debug("Got item from entityId queue: " + uuid );
+
+                    if ( uuid == null && done ) {
+                        break;
+                    }
+
+                    Entity entity = em.get( uuid );
+
+                    AdminUserWriteTask task = new AdminUserWriteTask();
+                    task.adminUser = entity;
+
+                    addCollectionsToTask(   task, entity );
+                    addDictionariesToTask(  task, entity );
+                    addConnectionsToTask(   task, entity );
+                    addOrganizationsToTask( task, entity );
+
+                    writeQueue.add( task );
+
+                } catch ( Exception e ) {
+                    logger.error("Error reading data for user " + uuid, e );
+                }
             }
-            query.setCursor( entities.getCursor() );
-            entities = em.searchCollection( em.getApplicationRef(), "users", query );
         }
 
-        metadataFile.writeEndObject();
-        metadataFile.close();
 
-        usersFile.writeEndArray();
-        usersFile.close();
+        private void addCollectionsToTask(AdminUserWriteTask task, Entity entity) throws Exception {
 
-        logger.info("Exported total of {} admin users", count);
-    }
+            EntityManager em = emf.getEntityManager( CassandraService.MANAGEMENT_APPLICATION_ID );
+            Set<String> collections = em.getCollections( entity );
+            if ((collections == null) || collections.isEmpty()) {
+                return;
+            }
 
+            task.collectionsByName = new HashMap<String, List<UUID>>();
 
-    /**
-     * Serialize and save the collection members of this <code>entity</code>
-     *
-     * @param em Entity Manager
-     * @param application Application name
-     * @param entity entity
-     */
-    private void saveEntityMetadata(
-            JsonGenerator jg, EntityManager em, String application, Entity entity) throws Exception {
+            for (String collectionName : collections) {
+
+                List<UUID> uuids = task.collectionsByName.get( collectionName );
+                if ( uuids == null ) {
+                    uuids = new ArrayList<UUID>();
+                    task.collectionsByName.put( collectionName, uuids );
+                }
 
-        saveCollections( jg, em, entity );
-        saveConnections( entity, em, jg );
-        saveOrganizations( entity, em, jg );
-        saveDictionaries( entity, em, jg );
+                Results collectionMembers = em.getCollection( entity, collectionName, null, 100000, Level.IDS, false );
 
-        // End the object if it was Started
-        jg.writeEndObject();
-    }
+                List<UUID> entityIds = collectionMembers.getIds();
+
+                if ((entityIds != null) && !entityIds.isEmpty()) {
+                    for (UUID childEntityUUID : entityIds) {
+                        uuids.add( childEntityUUID );
+                    }
+                }
+            }
+        }
 
 
-    private void saveCollections(JsonGenerator jg, EntityManager em, Entity entity) throws Exception {
+        private void addDictionariesToTask(AdminUserWriteTask task, Entity entity) throws Exception {
+            EntityManager em = emf.getEntityManager( CassandraService.MANAGEMENT_APPLICATION_ID );
 
-        Set<String> collections = em.getCollections( entity );
+            Set<String> dictionaries = em.getDictionaries( entity );
 
-        // Only create entry for Entities that have collections
-        if ( ( collections == null ) || collections.isEmpty() ) {
-            return;
+            task.dictionariesByName = new HashMap<String, Map<Object, Object>>();
+
+            for (String dictionary : dictionaries) {
+                Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
+                task.dictionariesByName.put( dictionary, dict );
+            }
         }
 
-        jg.writeFieldName( entity.getUuid().toString() );
-        jg.writeStartObject();
 
-        for ( String collectionName : collections ) {
+        private void addConnectionsToTask(AdminUserWriteTask task, Entity entity) throws Exception {
+            EntityManager em = emf.getEntityManager( CassandraService.MANAGEMENT_APPLICATION_ID );
 
-            jg.writeFieldName( collectionName );
-            // Start collection array.
-            jg.writeStartArray();
+            task.connectionsByType = new HashMap<String, List<ConnectionRef>>();
 
-            Results collectionMembers = em.getCollection( entity, collectionName, null, 100000, Level.IDS, false );
+            Set<String> connectionTypes = em.getConnectionTypes( entity );
+            for (String connectionType : connectionTypes) {
 
-            List<UUID> entityIds = collectionMembers.getIds();
+                List<ConnectionRef> connRefs = task.connectionsByType.get( connectionType );
+                if ( connRefs == null ) {
+                    connRefs = new ArrayList<ConnectionRef>();
+                }
 
-            if ( ( entityIds != null ) && !entityIds.isEmpty() ) {
-                for ( UUID childEntityUUID : entityIds ) {
-                    jg.writeObject( childEntityUUID.toString() );
+                Results results = em.getConnectedEntities( entity.getUuid(), connectionType, null, Level.IDS );
+                List<ConnectionRef> connections = results.getConnections();
+
+                for (ConnectionRef connectionRef : connections) {
+                    connRefs.add( connectionRef );
                 }
             }
+        }
 
-            // End collection array.
-            jg.writeEndArray();
+
+        private void addOrganizationsToTask(AdminUserWriteTask task, Entity entity) throws Exception {
+            task.orgNamesByUuid = managementService.getOrganizationsForAdminUser( entity.getUuid() );
+        }
+
+        public void setDone(boolean done) {
+            this.done = done;
         }
     }
 
+    class AdminUserWriter implements Runnable {
 
-    /**
-     * Persists the connection for this entity.
-     */
-    private void saveDictionaries( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception {
+        private boolean done = false;
 
-        jg.writeFieldName( "dictionaries" );
-        jg.writeStartObject();
+        private final BlockingQueue<AdminUserWriteTask> taskQueue;
 
-        Set<String> dictionaries = em.getDictionaries( entity );
-        for ( String dictionary : dictionaries ) {
+        public AdminUserWriter( BlockingQueue<AdminUserWriteTask> taskQueue ) {
+            this.taskQueue = taskQueue;
+        }
 
-            Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
 
-            // nothing to do
-            if ( dict.isEmpty() ) {
-                continue;
+        @Override
+        public void run() {
+            try {
+                writeEntities();
+            } catch (Exception e) {
+                logger.error("Error writing export data", e);
             }
+        }
+
+
+        private void writeEntities() throws Exception {
+            EntityManager em = emf.getEntityManager( CassandraService.MANAGEMENT_APPLICATION_ID );
+
+            // write one JSON file for management application users
+            JsonGenerator usersFile =
+                    getJsonGenerator( createOutputFile( ADMIN_USERS_PREFIX, em.getApplication().getName() ) );
+            usersFile.writeStartArray();
+
+            // write one JSON file for metadata: collections, connections and dictionaries of those users
+            JsonGenerator metadataFile =
+                    getJsonGenerator( createOutputFile( ADMIN_USER_METADATA_PREFIX, em.getApplication().getName() ) );
+            metadataFile.writeStartObject();
+
+            int count = 0;
+
+            while ( true ) {
+
+                try {
+                    AdminUserWriteTask task = taskQueue.poll( 30, TimeUnit.SECONDS );
+                    if ( task == null && done ) {
+                        break;
+                    }
 
-            jg.writeFieldName( dictionary );
+                    // write user to application file
+                    usersFile.writeObject( task.adminUser );
+                    //usersFile.writeEndObject();
+                    echo( task.adminUser );
 
+                    // write metadata to metadata file
+                    saveCollections(   metadataFile, task );
+                    saveConnections(   metadataFile, task );
+                    saveOrganizations( metadataFile, task );
+                    saveDictionaries(  metadataFile, task );
+
+                    logger.debug("Exported user {}", task.adminUser.getProperty( "email" ));
+
+                    count++;
+                    if ( count % 1000 == 0 ) {
+                        logger.info("Exported {} admin users", count);
+                    }
+
+
+                } catch (InterruptedException e) {
+                    throw new Exception("Interrupted", e);
+                }
+            }
+
+            metadataFile.writeEndObject();
+            metadataFile.close();
+
+            usersFile.writeEndArray();
+            usersFile.close();
+        }
+
+
+        private void saveCollections( JsonGenerator jg, AdminUserWriteTask task ) throws Exception {
+
+            jg.writeFieldName( task.adminUser.getUuid().toString() );
             jg.writeStartObject();
 
-            for ( Map.Entry<Object, Object> entry : dict.entrySet() ) {
-                jg.writeFieldName( entry.getKey().toString() );
-                jg.writeObject( entry.getValue() );
+            for (String collectionName : task.collectionsByName.keySet() ) {
+
+                jg.writeFieldName( collectionName );
+                jg.writeStartArray();
+
+                List<UUID> entityIds = task.collectionsByName.get( collectionName );
+
+                if ((entityIds != null) && !entityIds.isEmpty()) {
+                    for (UUID childEntityUUID : entityIds) {
+                        jg.writeObject( childEntityUUID.toString() );
+                    }
+                }
+
+                jg.writeEndArray();
             }
+        }
+
 
+        private void saveDictionaries( JsonGenerator jg, AdminUserWriteTask task ) throws Exception {
+
+            jg.writeFieldName( "dictionaries" );
+            jg.writeStartObject();
+
+            for (String dictionary : task.dictionariesByName.keySet() ) {
+
+                Map<Object, Object> dict = task.dictionariesByName.get( dictionary );
+
+                if (dict.isEmpty()) {
+                    continue;
+                }
+
+                jg.writeFieldName( dictionary );
+
+                jg.writeStartObject();
+
+                for (Map.Entry<Object, Object> entry : dict.entrySet()) {
+                    jg.writeFieldName( entry.getKey().toString() );
+                    jg.writeObject( entry.getValue() );
+                }
+
+                jg.writeEndObject();
+            }
             jg.writeEndObject();
         }
-        jg.writeEndObject();
-    }
 
 
-    /**
-     * Persists the outgoing connections for this entity.
-     */
-    private void saveConnections( Entity entity, EntityManager em, JsonGenerator jg ) throws Exception {
+        private void saveConnections( JsonGenerator jg, AdminUserWriteTask task ) throws Exception {
 
-        jg.writeFieldName( "connections" );
-        jg.writeStartObject();
+            jg.writeFieldName( "connections" );
+            jg.writeStartObject();
 
-        Set<String> connectionTypes = em.getConnectionTypes( entity );
-        for ( String connectionType : connectionTypes ) {
+            for (String connectionType : task.connectionsByType.keySet() ) {
 
-            jg.writeFieldName( connectionType );
-            jg.writeStartArray();
+                jg.writeFieldName( connectionType );
+                jg.writeStartArray();
 
-            Results results = em.getConnectedEntities( entity.getUuid(), connectionType, null, Level.IDS );
-            List<ConnectionRef> connections = results.getConnections();
+                List<ConnectionRef> connections = task.connectionsByType.get( connectionType );
+                for (ConnectionRef connectionRef : connections) {
+                    jg.writeObject( connectionRef.getConnectedEntity().getUuid() );
+                }
 
-            for ( ConnectionRef connectionRef : connections ) {
-                jg.writeObject( connectionRef.getConnectedEntity().getUuid() );
+                jg.writeEndArray();
             }
-
-            jg.writeEndArray();
+            jg.writeEndObject();
         }
-        jg.writeEndObject();
-    }
 
 
-    /**
-     * Persists the incoming connections for this entity.
-     */
-    private void saveOrganizations(Entity entity, EntityManager em, JsonGenerator jg) throws Exception {
+        private void saveOrganizations( JsonGenerator jg, AdminUserWriteTask task ) throws Exception {
 
-        final BiMap<UUID, String> orgs = managementService.getOrganizationsForAdminUser( entity.getUuid() );
+            final BiMap<UUID, String> orgs = task.orgNamesByUuid;
 
-        jg.writeFieldName( "organizations" );
+            jg.writeFieldName( "organizations" );
+
+            jg.writeStartArray();
 
-        jg.writeStartArray();
+            for (UUID uuid : orgs.keySet()) {
 
-        for ( UUID uuid : orgs.keySet() ) {
+                jg.writeStartObject();
 
-             jg.writeStartObject();
+                jg.writeFieldName( "uuid" );
+                jg.writeObject( uuid );
 
-             jg.writeFieldName( "uuid" );
-             jg.writeObject( uuid );
+                jg.writeFieldName( "name" );
+                jg.writeObject( orgs.get( uuid ) );
 
-             jg.writeFieldName( "name" );
-             jg.writeObject( orgs.get( uuid ) );
+                jg.writeEndObject();
+            }
 
-             jg.writeEndObject();
+            jg.writeEndArray();
         }
 
-        jg.writeEndArray();
+        public void setDone(boolean done) {
+            this.done = done;
+        }
     }
-
 }