You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/07 01:33:42 UTC

[2/4] incubator-usergrid git commit: Login to ensure that export and import wait until all data is exported or imported before existing.

Login to ensure that export and import wait until all data is exported or imported before existing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/59ea6e54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/59ea6e54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/59ea6e54

Branch: refs/heads/import-fixes
Commit: 59ea6e54fd48d069843cc8ceccd48cf93b0cb2a3
Parents: 67d3322
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Jul 6 19:19:31 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Jul 6 19:19:31 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportAdmins.java | 48 +++++++++++---------
 .../org/apache/usergrid/tools/ImportAdmins.java | 44 ++++++++++++------
 2 files changed, 56 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ea6e54/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
index d3d6371..d726ad3 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
@@ -167,11 +167,6 @@ public class ExportAdmins extends ExportingToolBase {
             ids = em.searchCollection( em.getApplicationRef(), "users", query );
         }
 
-        adminUserWriter.setDone( true );
-        for (AdminUserReader aur : readers) {
-            aur.setDone( true );
-        }
-
         logger.debug( "Waiting for write thread to complete" );
         
         boolean done = false;
@@ -202,7 +197,7 @@ public class ExportAdmins extends ExportingToolBase {
      */
     private void buildOrgMap() throws Exception {
 
-        logger.info("Building org map");
+        logger.info( "Building org map" );
 
         ExecutorService execService = Executors.newFixedThreadPool( this.readThreadCount );
 
@@ -269,8 +264,6 @@ public class ExportAdmins extends ExportingToolBase {
 
     public class AdminUserReader implements Runnable {
 
-        private boolean done = true;
-
         private final BlockingQueue<UUID> readQueue;
         private final BlockingQueue<AdminUserWriteTask> writeQueue;
 
@@ -299,9 +292,7 @@ public class ExportAdmins extends ExportingToolBase {
                 UUID uuid = null;
                 try {
                     uuid = readQueue.poll( 30, TimeUnit.SECONDS );
-                    //logger.debug("Got item from entityId queue: " + uuid );
-
-                    if ( uuid == null && done ) {
+                    if ( uuid == null ) {
                         break;
                     }
 
@@ -335,8 +326,31 @@ public class ExportAdmins extends ExportingToolBase {
 
             for (String dictionary : dictionaries) {
                 Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
+                if ( dict.isEmpty() ) {
+                    continue;
+                }
                 task.dictionariesByName.put( dictionary, dict );
             }
+           
+            if ( task.dictionariesByName.isEmpty() ) {
+                logger.error( "User {}:{} has no dictionaries",
+                        new Object[]{task.adminUser.getName(), task.adminUser.getUuid() } );
+                
+            } else if ( task.dictionariesByName.get("credentials") == null ) {
+                logger.error( "User {}:{} has no credentials dictionary",
+                        new Object[]{task.adminUser.getName(), task.adminUser.getUuid() } );
+                
+            } else {
+                if ( task.dictionariesByName.get("credentials").get("password") == null ) {
+                    logger.error( "User {}:{} has no password in credential dictionary",
+                            new Object[]{task.adminUser.getName(), task.adminUser.getUuid() } );
+                }
+                if ( task.dictionariesByName.get("credentials").get("secret") == null ) {
+                    logger.error( "User {}:{} has no secret in credential dictionary",
+                            new Object[]{task.adminUser.getName(), task.adminUser.getUuid() } );
+                }
+            }
+                
         }
 
         private void addOrganizationsToTask(AdminUserWriteTask task) throws Exception {
@@ -360,16 +374,10 @@ public class ExportAdmins extends ExportingToolBase {
                         task.adminUser.getUuid() } );
             }
         }
-
-        public void setDone(boolean done) {
-            this.done = done;
-        }
     }
 
     class AdminUserWriter implements Runnable {
 
-        private boolean done = false;
-
         private final BlockingQueue<AdminUserWriteTask> taskQueue;
 
         public AdminUserWriter( BlockingQueue<AdminUserWriteTask> taskQueue ) {
@@ -404,7 +412,7 @@ public class ExportAdmins extends ExportingToolBase {
 
                 try {
                     AdminUserWriteTask task = taskQueue.poll( 30, TimeUnit.SECONDS );
-                    if ( task == null && done ) {
+                    if ( task == null ) {
                         break;
                     }
 
@@ -494,10 +502,6 @@ public class ExportAdmins extends ExportingToolBase {
 
             jg.writeEndArray();
         }
-
-        public void setDone(boolean done) {
-            this.done = done;
-        }
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/59ea6e54/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
index 39384e6..4bcdc0b 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
@@ -90,6 +90,11 @@ public class ImportAdmins extends ToolBase {
     AtomicInteger userCount = new AtomicInteger( 0 );
     AtomicInteger metadataCount = new AtomicInteger( 0 );
 
+    AtomicInteger writeEmptyCount = new AtomicInteger( 0 );
+    AtomicInteger auditEmptyCount = new AtomicInteger( 0 );
+    AtomicInteger metadataEmptyCount = new AtomicInteger( 0 );
+    
+
 
     @Override
     @SuppressWarnings("static-access")
@@ -146,7 +151,7 @@ public class ImportAdmins extends ToolBase {
             writeThreadCount = Integer.parseInt( line.getOptionValue(WRITE_THREAD_COUNT));
         }
 
-        importAdminUsers(writeThreadCount, auditThreadCount);
+        importAdminUsers( writeThreadCount, auditThreadCount );
 
         importMetadata( writeThreadCount );
     }
@@ -159,7 +164,7 @@ public class ImportAdmins extends ToolBase {
 
         String[] fileNames = importDir.list(new PrefixFileFilter(ExportAdmins.ADMIN_USERS_PREFIX + "."));
 
-        logger.info("Applications to read: " + fileNames.length);
+        logger.info( "Applications to read: " + fileNames.length );
 
         for (String fileName : fileNames) {
             try {
@@ -210,8 +215,8 @@ public class ImportAdmins extends ToolBase {
             workQueue.add( entityProps );
         }
 
-        waitForQueueAndMeasure(workQueue, adminWriteThreads, "Admin Write");
-        waitForQueueAndMeasure(auditQueue, adminAuditThreads, "Admin Audit");
+        waitForQueueAndMeasure(workQueue, writeEmptyCount, adminWriteThreads, "Admin Write");
+        waitForQueueAndMeasure(auditQueue, auditEmptyCount, adminAuditThreads, "Admin Audit");
 
         logger.info("----- End: Imported {} admin users from file {}",
                 count, adminUsersFile.getAbsolutePath());
@@ -220,12 +225,13 @@ public class ImportAdmins extends ToolBase {
     }
 
     private static void waitForQueueAndMeasure(final BlockingQueue workQueue,
+                                               final AtomicInteger emptyCounter,
                                                final Map<Stoppable, Thread> threadMap,
                                                final String identifier) throws InterruptedException {
         double rateAverageSum = 0;
         int iterations = 0;
 
-        while (!workQueue.isEmpty()) {
+        while ( emptyCounter.get() < threadMap.size() ) {
             iterations += 1;
 
             int sizeLast = workQueue.size();
@@ -312,8 +318,8 @@ public class ImportAdmins extends ToolBase {
     private void importMetadata(int writeThreadCount) throws Exception {
 
         String[] fileNames = importDir.list(
-                new PrefixFileFilter(ExportAdmins.ADMIN_USER_METADATA_PREFIX + "."));
-        logger.info("Metadata files to read: " + fileNames.length);
+                new PrefixFileFilter( ExportAdmins.ADMIN_USER_METADATA_PREFIX + "." ) );
+        logger.info( "Metadata files to read: " + fileNames.length );
 
         for (String fileName : fileNames) {
             try {
@@ -373,17 +379,22 @@ public class ImportAdmins extends ToolBase {
             if (jsonToken.equals(JsonToken.FIELD_NAME) && depth == 2) {
 
                 jp.nextToken();
-
                 String entityOwnerId = jp.getCurrentName();
-                EntityRef entityRef = em.getRef(UUID.fromString(entityOwnerId));
 
-                Map<String, Object> metadata = (Map<String, Object>) jp.readValueAs(Map.class);
-
-                workQueue.put(new ImportMetadataTask(entityRef, metadata));
+                try {
+                    EntityRef entityRef = em.getRef( UUID.fromString( entityOwnerId ) );
+                    Map<String, Object> metadata = (Map<String, Object>) jp.readValueAs( Map.class );
+                    
+                    workQueue.put( new ImportMetadataTask( entityRef, metadata ) );
+                    logger.debug( "Put user {} in metadata queue", entityRef.getUuid() );
+                    
+                } catch ( Exception e ) {
+                    logger.debug( "Error with user {}, not putting in metadata queue", entityOwnerId );
+                }
             }
         }
 
-        waitForQueueAndMeasure(workQueue, metadataWorkerThreadMap, "Metadata Load");
+        waitForQueueAndMeasure(workQueue, metadataEmptyCount, metadataWorkerThreadMap, "Metadata Load");
 
         logger.info("----- End of metadata -----");
         jp.close();
@@ -523,9 +534,11 @@ public class ImportAdmins extends ToolBase {
 
                     if (entityProps == null) {
                         logger.warn("Reading from AUDIT queue was null!");
+                        auditEmptyCount.getAndIncrement();
                         Thread.sleep(1000);
                         continue;
                     }
+                    auditEmptyCount.set(0);
 
                     count++;
                     long startTime = System.currentTimeMillis();
@@ -599,15 +612,16 @@ public class ImportAdmins extends ToolBase {
 
                     if (task == null) {
                         logger.warn("Reading from metadata queue was null!");
+                        metadataEmptyCount.getAndIncrement();
                         Thread.sleep(1000);
                         continue;
                     }
+                    metadataEmptyCount.set(0);
                     
                     long startTime = System.currentTimeMillis();
                     
                     importEntityMetadata(em, task.entityRef, task.metadata);
                     
-                    metadataCount.addAndGet( 1 );
                     long stopTime = System.currentTimeMillis();
                     long duration = stopTime - startTime;
                     durationSum += duration;
@@ -662,9 +676,11 @@ public class ImportAdmins extends ToolBase {
 
                     if (entityProps == null) {
                         logger.warn("Reading from admin import queue was null!");
+                        writeEmptyCount.getAndIncrement();
                         Thread.sleep( 1000 );
                         continue;
                     }
+                    writeEmptyCount.set(0);
 
                     // Import/create the entity
                     UUID uuid = getId(entityProps);