You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/22 17:17:43 UTC

[02/15] incubator-usergrid git commit: Add readThread and writeThread CLI parameters.

Add readThread and writeThread CLI parameters.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2b65e619
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2b65e619
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2b65e619

Branch: refs/heads/two-dot-o
Commit: 2b65e619316b206720e574a21fe1b33462e0f2dd
Parents: b2bdbb5
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 08:03:18 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 08:03:18 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportApp.java    | 153 +++++++++++++------
 1 file changed, 106 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2b65e619/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index ceb3ecd..21c63a0 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.utils.StringUtils;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.util.MinimalPrettyPrinter;
@@ -46,11 +47,22 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 
 /**
- * Export application's collections.
+ * Export all entities and connections of a Usergrid app.
+ * 
+ * Exports data files to specified directory.
+ * 
+ * Will create as many output files as there are writeThreads (by default: 10).
+ * 
+ * Will create two types of files: *.uge for Usegrird entities and *.ugc for entity to entity connections.
+ * 
+ * Every line of the data files is a complete JSON object.
  */
 public class ExportApp extends ExportingToolBase {
     static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
     
+    private static final String READ_THREAD_COUNT = "readThreads";
+    private static final String WRITE_THREAD_COUNT = "writeThreads";
+
     // we will write two types of files: entities and connections
     BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
     BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
@@ -58,10 +70,15 @@ public class ExportApp extends ExportingToolBase {
     static final String APPLICATION_NAME = "application";
     
     int pollTimeoutSeconds = 10;
+    
+    int readThreadCount = 80; 
+    int writeThreadCount = 10;
+    
+    String applicationName;
+    String organizationName;
 
     // limiting output threads will limit output files 
-    final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(8);
-    final Scheduler scheduler = Schedulers.from( threadPoolExecutor );
+    Scheduler readScheduler;
 
     Map<Thread, JsonGenerator> entityGeneratorsByThread  = new HashMap<Thread, JsonGenerator>();
     Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
@@ -77,29 +94,37 @@ public class ExportApp extends ExportingToolBase {
     ObjectMapper mapper = new ObjectMapper();
     
     /**
-     * Export admin users using multiple threads.
-     * <p/>
-     * How it works:
-     * In main thread we query for IDs of all admin users, add each ID to read queue.
-     * Read-queue workers read admin user data, add data to write queue.
-     * One write-queue worker reads data writes to file.
+     * Tool entry point. 
      */
     @Override
     public void runTool(CommandLine line) throws Exception {
         
-        String applicationName = line.getOptionValue( APPLICATION_NAME );
-
-//        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
-//            try {
-//                readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
-//            } catch (NumberFormatException nfe) {
-//                logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
-//                return;
-//            }
-//        } else {
-//            readThreadCount = 20;
-//        }
+        applicationName = line.getOptionValue( APPLICATION_NAME );
+
+        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
+            try {
+                readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
+            } catch (NumberFormatException nfe) {
+                logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+                return;
+            }
+        }
         
+        if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
+            try {
+                writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
+            } catch (NumberFormatException nfe) {
+                logger.error( "-" + WRITE_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+                return;
+            }
+        }
+
+        ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
+        readScheduler = Schedulers.from( readThreadPoolExecutor );
+        
+        ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
+        final Scheduler writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+       
         startSpring();
 
         setVerbose( line );
@@ -111,6 +136,7 @@ public class ExportApp extends ExportingToolBase {
        
         UUID applicationId = emf.lookupApplication( applicationName );
         final EntityManager em = emf.getEntityManager( applicationId );
+        organizationName = em.getApplication().getOrganizationName();
 
         // start write queue workers
 
@@ -119,18 +145,18 @@ public class ExportApp extends ExportingToolBase {
         entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
             public Observable<ExportEntity> call(ExportEntity exportEntity) {
                 return Observable.just(exportEntity).doOnNext( 
-                        new EntityWriteAction() ).subscribeOn( scheduler );
+                        new EntityWriteAction() ).subscribeOn( writeScheduler );
             }
-        },10).subscribeOn( scheduler ).subscribe();
+        },10).subscribeOn( writeScheduler ).subscribe();
 
         ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
         rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
         connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
             public Observable<ExportConnection> call(ExportConnection connection ) {
                 return Observable.just(connection).doOnNext( 
-                        new ConnectionWriteAction()).subscribeOn( scheduler );
+                        new ConnectionWriteAction()).subscribeOn( writeScheduler );
             }
-        },10).subscribeOn( scheduler ).subscribe();
+        },10).subscribeOn( writeScheduler ).subscribe();
 
         // start processing data and filling up write queues
 
@@ -139,9 +165,9 @@ public class ExportApp extends ExportingToolBase {
         collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
             public Observable<String> call(String collection) {
                 return Observable.just(collection).doOnNext( 
-                        new CollectionAction( em ) ).subscribeOn( Schedulers.io() );
+                        new CollectionAction( em ) ).subscribeOn( readScheduler );
             }
-        },40).subscribeOn( Schedulers.io() ).subscribe();
+        },40).subscribeOn( readScheduler ).subscribe();
         
         // wait for write thread pollers to get started
 
@@ -192,14 +218,18 @@ public class ExportApp extends ExportingToolBase {
 
         Options options = super.createOptions();
 
-        Option readThreads = OptionBuilder.hasArg().withType("")
+        Option appNameOption = OptionBuilder.hasArg().withType("")
                 .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
-        options.addOption( readThreads );
-        
-//        Option readThreads = OptionBuilder
-//                .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
-//        options.addOption( readThreads );
-        
+        options.addOption( appNameOption );
+
+        Option readThreadsOption = OptionBuilder
+                .hasArg().withType(0).withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+        options.addOption( readThreadsOption );
+
+        Option writeThreadsOption = OptionBuilder
+                .hasArg().withType(0).withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+        options.addOption( writeThreadsOption );
+
         return options;
     }
 
@@ -276,9 +306,13 @@ public class ExportApp extends ExportingToolBase {
                                 }
                                 dictionariesByName.put( dictionary, dict );
                             }
+                            
                             ExportEntity exportEntity = new ExportEntity( 
-                                    em.getApplication().getApplicationName(), 
-                                    entity, dictionariesByName );
+                                    organizationName, 
+                                    applicationName, 
+                                    entity, 
+                                    dictionariesByName );
+                            
                             subscriber.onNext( exportEntity );
                             count++;
                             
@@ -333,11 +367,14 @@ public class ExportApp extends ExportingToolBase {
 
                     for (Entity connectedEntity : results.getEntities()) {
                         try {
+                            
                             ExportConnection connection = new ExportConnection( 
-                                    em.getApplication().getApplicationName(), 
+                                    applicationName,
+                                    organizationName,
                                     connectionType, 
                                     exportEntity.getEntity().getUuid(), 
                                     connectedEntity.getUuid());
+                            
                             subscriber.onNext( connection );
                             count++;
 
@@ -379,9 +416,9 @@ public class ExportApp extends ExportingToolBase {
             entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
                 public Observable<ExportEntity> call(ExportEntity exportEntity) {
                     return Observable.just(exportEntity).doOnNext( 
-                            new EntityAction( em ) ).subscribeOn( Schedulers.io() );
+                            new EntityAction( em ) ).subscribeOn( readScheduler );
                 }
-            }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+            }, 8).subscribeOn(readScheduler).toBlocking().last();
         }
     }
 
@@ -413,9 +450,9 @@ public class ExportApp extends ExportingToolBase {
                     entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
                         public Observable<ExportConnection> call(ExportConnection connection) {
                             return Observable.just(connection).doOnNext( 
-                                    new ConnectionsAction() ).subscribeOn( Schedulers.io() );
+                                    new ConnectionsAction() ).subscribeOn(readScheduler);
                         }
-                    }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+                    }, 8).subscribeOn(readScheduler).toBlocking().last();
                 }
                 
             } catch (Exception e) {
@@ -472,7 +509,7 @@ public class ExportApp extends ExportingToolBase {
                 count++;
             }
 
-            logger.info("Done. Wrote {} entities", count);
+            logger.info("Done. De-queued {} entities", count);
             if ( count > 0 ) {
                 subscriber.onCompleted();
             } else {
@@ -513,7 +550,7 @@ public class ExportApp extends ExportingToolBase {
                 count++;
             }
 
-            logger.info("Done. Wrote {} connections", count);
+            logger.info("Done. De-queued {} connections", count);
             if ( count > 0 ) {
                 subscriber.onCompleted();
             } else {
@@ -531,7 +568,8 @@ public class ExportApp extends ExportingToolBase {
 
             boolean wroteData = false;
 
-            String fileName = "target/" + Thread.currentThread().getName() + ".ude";
+            String [] parts = Thread.currentThread().getName().split("-");
+            String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".entities";
 
             JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
             if ( gen == null ) {
@@ -572,7 +610,8 @@ public class ExportApp extends ExportingToolBase {
 
             boolean wroteData = false;
 
-            String fileName = "target/" + Thread.currentThread().getName() + ".ugc";
+            String [] parts = Thread.currentThread().getName().split("-");
+            String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".connections";
 
             JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
             if ( gen == null ) {
@@ -607,10 +646,12 @@ public class ExportApp extends ExportingToolBase {
 }
 
 class ExportEntity {
+    private String organization;
     private String application;
     private Entity entity;
     private Map<String, Object> dictionaries;
-    public ExportEntity( String application, Entity entity, Map<String, Object> dictionaries ) {
+    public ExportEntity( String organization, String application, Entity entity, Map<String, Object> dictionaries ) {
+        this.organization = organization;
         this.application = application;
         this.entity = entity;
         this.dictionaries = dictionaries;
@@ -639,14 +680,24 @@ class ExportEntity {
     public void setDictionaries(Map<String, Object> dictionaries) {
         this.dictionaries = dictionaries;
     }
+
+    public String getOrganization() {
+        return organization;
+    }
+
+    public void setOrganization(String organization) {
+        this.organization = organization;
+    }
 }
 
 class ExportConnection {
+    private String organization;
     private String application;
     private String connectionType;
     private UUID sourceUuid;
     private UUID targetUuid;
-    public ExportConnection(String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+    public ExportConnection(String organization, String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+        this.organization= organization;
         this.application = application;
         this.connectionType = connectionType;
         this.sourceUuid = sourceUuid;
@@ -684,4 +735,12 @@ class ExportConnection {
     public void setTargetUuid(UUID targetUuid) {
         this.targetUuid = targetUuid;
     }
+
+    public String getOrganization() {
+        return organization;
+    }
+
+    public void setOrganization(String organization) {
+        this.organization = organization;
+    }
 }