You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@usergrid.apache.org by jwest-apigee <gi...@git.apache.org> on 2015/06/11 16:48:13 UTC

[GitHub] incubator-usergrid pull request: Added threading and queueing to A...

GitHub user jwest-apigee opened a pull request:

    https://github.com/apache/incubator-usergrid/pull/275

    Added threading and queueing to Admin imports.

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jwest-apigee/incubator-usergrid master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-usergrid/pull/275.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #275
    
----
commit c0b89db57ae43ffb1b94780ea79c314b38de5a60
Author: Jeff West <jw...@apigee.com>
Date:   2015-06-11T14:47:39Z

    Added threading and queueing to imports.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: Added threading and queueing to A...

Posted by snoopdave <gi...@git.apache.org>.
Github user snoopdave commented on a diff in the pull request:

    https://github.com/apache/incubator-usergrid/pull/275#discussion_r32321210
  
    --- Diff: stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java ---
    @@ -133,84 +163,129 @@ private void importAdminUsers() throws Exception {
          *
          * @param fileName Name of admin user data file.
          */
    -    private void importAdminUsers( String fileName ) throws Exception {
    +    private void importAdminUsers(final String fileName,
    +                                  final int writeThreadCount,
    +                                  final int auditThreadCount) throws Exception {
     
             int count = 0;
     
    -        File adminUsersFile = new File( importDir, fileName );
    +        File adminUsersFile = new File(importDir, fileName);
    +
    +        logger.info("----- Loading file: " + adminUsersFile.getAbsolutePath());
    +        JsonParser jp = getJsonParserForFile(adminUsersFile);
    +
    +        int loopCounter = 0;
     
    -        logger.info( "----- Loading file: " + adminUsersFile.getAbsolutePath() );
    -        JsonParser jp = getJsonParserForFile( adminUsersFile );
    +        BlockingQueue<Map<String, Object>> workQueue = new LinkedBlockingQueue<Map<String, Object>>();
    +        BlockingQueue<Map<String, Object>> auditQueue = new LinkedBlockingQueue<Map<String, Object>>();
    +
    +        startAdminWorkers(workQueue, auditQueue, writeThreadCount);
    +        startAdminAuditors(auditQueue, auditThreadCount);
     
             JsonToken token = jp.nextToken();
    -        validateStartArray( token );
    +        validateStartArray(token);
     
    -        EntityManager em = emf.getEntityManager( MANAGEMENT_APPLICATION_ID );
    +        while (jp.nextValue() != JsonToken.END_ARRAY) {
    +            loopCounter += 1;
     
    -        while ( jp.nextValue() != JsonToken.END_ARRAY ) {
    +            @SuppressWarnings("unchecked")
    +            Map<String, Object> entityProps = jp.readValueAs(HashMap.class);
    +            if (loopCounter % 100 == 1)
    +                logger.info("Publishing to queue... counter=" + loopCounter);
     
    -            @SuppressWarnings( "unchecked" )
    -            Map<String, Object> entityProps = jp.readValueAs( HashMap.class );
    +            workQueue.add(entityProps);
    +        }
     
    -            // Import/create the entity
    -            UUID uuid = getId( entityProps );
    -            String type = getType( entityProps );
    +        waitForQueueAndMeasure(workQueue, adminWriteThreads, "Admin Write");
    +        waitForQueueAndMeasure(auditQueue, adminAuditThreads, "Admin Audit");
     
    +        logger.info("----- End: Imported {} admin users from file {}",
    +                count, adminUsersFile.getAbsolutePath());
     
    -            try {
    -                em.create( uuid, type, entityProps );
    +        jp.close();
    +    }
     
    -                logger.debug( "Imported admin user {} {}", uuid, entityProps.get( "username" ) );
    -                count++;
    -                if ( count % 1000 == 0 ) {
    -                    logger.info("Imported {} admin users", count);
    -                }
    -            }
    -            catch ( DuplicateUniquePropertyExistsException de ) {
    -                logger.warn( "Unable to create entity. It appears to be a duplicate: " +
    -                    "id={}, type={}, name={}, username={}",
    -                    new Object[] { uuid, type, entityProps.get("name"), entityProps.get("username")});
    -                if ( logger.isDebugEnabled() ) {
    -                    logger.debug( "Exception" , de );
    -                }
    -                continue;
    -            }
    +    private static void waitForQueueAndMeasure(final BlockingQueue workQueue,
    +                                               final Map<Stoppable, Thread> threadMap,
    +                                               final String identifier) throws InterruptedException {
    +        double rateAverageSum = 0;
    +        int iterationCounter = 0;
     
    -            if ( em.get( uuid ) == null ) {
    -                logger.error( "Holy hell, we wrote an entity and it's missing.  " +
    -                                "Entity Id was {} and type is {}", uuid, type );
    -                System.exit( 1 );
    -            }
    -            echo( entityProps );
    +        while (!workQueue.isEmpty()) {
    +            iterationCounter += 1;
    +
    +            int sizeLast = workQueue.size();
    +            long lastTime = System.currentTimeMillis();
    +            logger.info("Queue {} is not empty, remaining size={}, waiting...", identifier, sizeLast);
    +            Thread.sleep(5000);
    +
    +            long timeNow = System.currentTimeMillis();
    +            int sizeNow = workQueue.size();
    +
    +            int processed = sizeLast - sizeNow;
    +
    +            long timeDelta = timeNow - lastTime;
    +
    +            double rateLast = (double) processed / (timeDelta / 1000);
    +            rateAverageSum += rateLast;
    +
    +            long timeRemaining = sizeLast / (long) (rateAverageSum / iterationCounter);
    +
    +            logger.info(
    +                    String.format("++PROGRESS (%s): sizeLast=%s nowSize=%s processed=%s rateLast=%s/s rateAvg=%s/s timeRemaining=%s(s)",
    --- End diff --
    
    logger has built-in formatting that may be slightly more efficient than using String.format() here, for example:
    
                logger.info(
                    "++PROGRESS ({}): sizeLast={} nowSize={} processed={} rateLast={}/s rateAvg={}/s timeRemaining={}(s)",
                    new Object[] { identifier, sizeLast, sizeNow, processed, rateLast, (rateAverageSum / iterationCounter), timeRemaining } );



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: Added threading and queueing to A...

Posted by snoopdave <gi...@git.apache.org>.
Github user snoopdave commented on the pull request:

    https://github.com/apache/incubator-usergrid/pull/275#issuecomment-111515415
  
    Apart from the minor things above, this PR looks good. I am merging it now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: Added threading and queueing to A...

Posted by snoopdave <gi...@git.apache.org>.
Github user snoopdave commented on a diff in the pull request:

    https://github.com/apache/incubator-usergrid/pull/275#discussion_r32321377
  
    --- Diff: stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java ---
    @@ -133,84 +163,129 @@ private void importAdminUsers() throws Exception {
          *
          * @param fileName Name of admin user data file.
          */
    -    private void importAdminUsers( String fileName ) throws Exception {
    +    private void importAdminUsers(final String fileName,
    +                                  final int writeThreadCount,
    +                                  final int auditThreadCount) throws Exception {
     
             int count = 0;
     
    -        File adminUsersFile = new File( importDir, fileName );
    +        File adminUsersFile = new File(importDir, fileName);
    +
    +        logger.info("----- Loading file: " + adminUsersFile.getAbsolutePath());
    +        JsonParser jp = getJsonParserForFile(adminUsersFile);
    +
    +        int loopCounter = 0;
     
    -        logger.info( "----- Loading file: " + adminUsersFile.getAbsolutePath() );
    -        JsonParser jp = getJsonParserForFile( adminUsersFile );
    +        BlockingQueue<Map<String, Object>> workQueue = new LinkedBlockingQueue<Map<String, Object>>();
    +        BlockingQueue<Map<String, Object>> auditQueue = new LinkedBlockingQueue<Map<String, Object>>();
    +
    +        startAdminWorkers(workQueue, auditQueue, writeThreadCount);
    +        startAdminAuditors(auditQueue, auditThreadCount);
     
             JsonToken token = jp.nextToken();
    -        validateStartArray( token );
    +        validateStartArray(token);
     
    -        EntityManager em = emf.getEntityManager( MANAGEMENT_APPLICATION_ID );
    +        while (jp.nextValue() != JsonToken.END_ARRAY) {
    +            loopCounter += 1;
     
    -        while ( jp.nextValue() != JsonToken.END_ARRAY ) {
    +            @SuppressWarnings("unchecked")
    +            Map<String, Object> entityProps = jp.readValueAs(HashMap.class);
    +            if (loopCounter % 100 == 1)
    +                logger.info("Publishing to queue... counter=" + loopCounter);
     
    -            @SuppressWarnings( "unchecked" )
    -            Map<String, Object> entityProps = jp.readValueAs( HashMap.class );
    +            workQueue.add(entityProps);
    +        }
     
    -            // Import/create the entity
    -            UUID uuid = getId( entityProps );
    -            String type = getType( entityProps );
    +        waitForQueueAndMeasure(workQueue, adminWriteThreads, "Admin Write");
    +        waitForQueueAndMeasure(auditQueue, adminAuditThreads, "Admin Audit");
     
    +        logger.info("----- End: Imported {} admin users from file {}",
    +                count, adminUsersFile.getAbsolutePath());
     
    -            try {
    -                em.create( uuid, type, entityProps );
    +        jp.close();
    +    }
     
    -                logger.debug( "Imported admin user {} {}", uuid, entityProps.get( "username" ) );
    -                count++;
    -                if ( count % 1000 == 0 ) {
    -                    logger.info("Imported {} admin users", count);
    -                }
    -            }
    -            catch ( DuplicateUniquePropertyExistsException de ) {
    -                logger.warn( "Unable to create entity. It appears to be a duplicate: " +
    -                    "id={}, type={}, name={}, username={}",
    -                    new Object[] { uuid, type, entityProps.get("name"), entityProps.get("username")});
    -                if ( logger.isDebugEnabled() ) {
    -                    logger.debug( "Exception" , de );
    -                }
    -                continue;
    -            }
    +    private static void waitForQueueAndMeasure(final BlockingQueue workQueue,
    +                                               final Map<Stoppable, Thread> threadMap,
    +                                               final String identifier) throws InterruptedException {
    +        double rateAverageSum = 0;
    +        int iterationCounter = 0;
     
    -            if ( em.get( uuid ) == null ) {
    -                logger.error( "Holy hell, we wrote an entity and it's missing.  " +
    -                                "Entity Id was {} and type is {}", uuid, type );
    -                System.exit( 1 );
    -            }
    -            echo( entityProps );
    +        while (!workQueue.isEmpty()) {
    +            iterationCounter += 1;
    +
    +            int sizeLast = workQueue.size();
    +            long lastTime = System.currentTimeMillis();
    +            logger.info("Queue {} is not empty, remaining size={}, waiting...", identifier, sizeLast);
    +            Thread.sleep(5000);
    +
    +            long timeNow = System.currentTimeMillis();
    +            int sizeNow = workQueue.size();
    +
    +            int processed = sizeLast - sizeNow;
    +
    +            long timeDelta = timeNow - lastTime;
    +
    +            double rateLast = (double) processed / (timeDelta / 1000);
    +            rateAverageSum += rateLast;
    +
    +            long timeRemaining = sizeLast / (long) (rateAverageSum / iterationCounter);
    --- End diff --
    
    I had to change this to:
    
    long timeRemaining = (long) ( sizeLast / (rateAverageSum / iterationCounter) );
    
    Otherwise the unit tests throws a divide by zero error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-usergrid pull request: Added threading and queueing to A...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-usergrid/pull/275


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---