You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/14 19:31:23 UTC

[2/2] incubator-usergrid git commit: Remove queues and make the whole thing one "stream"

Remove queues and make the whole thing one "stream"


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7a870d69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7a870d69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7a870d69

Branch: refs/heads/rxportapp
Commit: 7a870d6929cea76f5bbec9aa8f5a8caa8dee07e4
Parents: 2b65e61
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 13:31:18 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 13:31:18 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportApp.java    | 375 +++++--------------
 .../usergrid/tools/ExportingToolBase.java       |   2 +-
 .../apache/usergrid/tools/ExportAppTest.java    | 114 +++++-
 3 files changed, 185 insertions(+), 306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index 21c63a0..59509c0 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Scheduler;
 import rx.Subscriber;
+import rx.functions.Action0;
 import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
@@ -42,7 +43,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
@@ -59,40 +61,29 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class ExportApp extends ExportingToolBase {
     static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
-    
-    private static final String READ_THREAD_COUNT = "readThreads";
-    private static final String WRITE_THREAD_COUNT = "writeThreads";
-
-    // we will write two types of files: entities and connections
-    BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
-    BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
 
     static final String APPLICATION_NAME = "application";
-    
-    int pollTimeoutSeconds = 10;
-    
-    int readThreadCount = 80; 
-    int writeThreadCount = 10;
-    
+    private static final String READ_THREAD_COUNT = "readThreads";
+    private static final String WRITE_THREAD_COUNT = "writeThreads";
+   
     String applicationName;
     String organizationName;
 
-    // limiting output threads will limit output files 
+    AtomicInteger entitiesWritten = new AtomicInteger(0);
+    AtomicInteger connectionsWritten = new AtomicInteger(0);
+
     Scheduler readScheduler;
+    Scheduler writeScheduler;
 
+    ObjectMapper mapper = new ObjectMapper();
     Map<Thread, JsonGenerator> entityGeneratorsByThread  = new HashMap<Thread, JsonGenerator>();
     Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
 
-    List<String> emptyFiles = new ArrayList<String>();
+    // set via CLI
+    int readThreadCount = 80;
+    int writeThreadCount = 10; // limiting write will limit output files 
     
-    AtomicInteger activePollers = new AtomicInteger(0);
-    AtomicInteger entitiesQueued = new AtomicInteger(0);
-    AtomicInteger entitiesWritten = new AtomicInteger(0);
-    AtomicInteger connectionsWritten = new AtomicInteger(0);
-    AtomicInteger connectionsQueued = new AtomicInteger(0);
 
-    ObjectMapper mapper = new ObjectMapper();
-    
     /**
      * Tool entry point. 
      */
@@ -123,7 +114,7 @@ public class ExportApp extends ExportingToolBase {
         readScheduler = Schedulers.from( readThreadPoolExecutor );
         
         ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
-        final Scheduler writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+        writeScheduler = Schedulers.from( writeThreadPoolExecutor );
        
         startSpring();
 
@@ -138,78 +129,26 @@ public class ExportApp extends ExportingToolBase {
         final EntityManager em = emf.getEntityManager( applicationId );
         organizationName = em.getApplication().getOrganizationName();
 
-        // start write queue workers
-
-        EntityWritesOnSubscribe entityWritesOnSub = new EntityWritesOnSubscribe( entityWriteQueue );
-        rx.Observable entityWritesObservable = rx.Observable.create( entityWritesOnSub );
-        entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
-            public Observable<ExportEntity> call(ExportEntity exportEntity) {
-                return Observable.just(exportEntity).doOnNext( 
-                        new EntityWriteAction() ).subscribeOn( writeScheduler );
-            }
-        },10).subscribeOn( writeScheduler ).subscribe();
-
-        ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
-        rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
-        connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
-            public Observable<ExportConnection> call(ExportConnection connection ) {
-                return Observable.just(connection).doOnNext( 
-                        new ConnectionWriteAction()).subscribeOn( writeScheduler );
+        Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
+        
+        collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
+            
+            public Observable<ExportEntity> call(String collection) {
+                return Observable.create( new EntityObservable( em, collection ))
+                        .doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
             }
-        },10).subscribeOn( writeScheduler ).subscribe();
-
-        // start processing data and filling up write queues
-
-        CollectionsOnSubscribe onSubscribe = new CollectionsOnSubscribe( em );
-        rx.Observable collectionsObservable = rx.Observable.create( onSubscribe );
-        collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
-            public Observable<String> call(String collection) {
-                return Observable.just(collection).doOnNext( 
-                        new CollectionAction( em ) ).subscribeOn( readScheduler );
+            
+        }, 10).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
+            
+            public Observable<ExportConnection> call(ExportEntity exportEntity) {
+                return Observable.create( new ConnectionsObservable( em, exportEntity ))
+                        .doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
             }
-        },40).subscribeOn( readScheduler ).subscribe();
-        
-        // wait for write thread pollers to get started
-
-        try { Thread.sleep( 1000 ); } catch (InterruptedException ignored) {}
-
-        // wait for write-thread pollers to stop
-
-        while ( activePollers.get() > 0 ) {
-            logger.info(
-                     "Active write threads: {}\n"
-                    +"Entities written:     {}\n"
-                    +"Entities queued:      {}\n"
-                    +"Connections written:  {}\n"
-                    +"Connections queued:   {}\n",
-                    new Object[] { 
-                        activePollers.get(),
-                        entitiesWritten.get(), 
-                        entitiesQueued.get(),
-                        connectionsWritten.get(), 
-                        connectionsQueued.get()} );
-            try { Thread.sleep( 5000 ); } catch (InterruptedException ignored) {}
-        }
-
-        // wrap up files
-
-        for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
-            //gen.writeEndArray();
-            gen.flush();
-            gen.close();
-        }
-
-        for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
-            //gen.writeEndArray();
-            gen.flush();
-            gen.close();
-        }
-
-        for ( String fileName : emptyFiles ) {
-            File emptyFile = new File(fileName);
-            emptyFile.deleteOnExit();
-        }
-
+            
+        }, 10)
+            .subscribeOn( readScheduler )
+            .doOnCompleted( new FileWrapUpAction() )
+            .toBlocking().last();
     }
 
     @Override
@@ -222,12 +161,12 @@ public class ExportApp extends ExportingToolBase {
                 .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
         options.addOption( appNameOption );
 
-        Option readThreadsOption = OptionBuilder
-                .hasArg().withType(0).withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+        Option readThreadsOption = OptionBuilder.hasArg().withType(0)
+                .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
         options.addOption( readThreadsOption );
 
-        Option writeThreadsOption = OptionBuilder
-                .hasArg().withType(0).withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+        Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
+                .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
         options.addOption( writeThreadsOption );
 
         return options;
@@ -239,17 +178,15 @@ public class ExportApp extends ExportingToolBase {
     /**
      * Emits collection names found in application.
      */
-    class CollectionsOnSubscribe implements rx.Observable.OnSubscribe<String> {
+    class CollectionsObservable implements rx.Observable.OnSubscribe<String> {
         EntityManager em;
                 
-        public CollectionsOnSubscribe( EntityManager em ) {
+        public CollectionsObservable(EntityManager em) {
             this.em = em;
         }
 
         public void call(Subscriber<? super String> subscriber) {
             
-            logger.info("Starting to read collections");
-            
             int count = 0;
             try {
                 Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
@@ -261,11 +198,12 @@ public class ExportApp extends ExportingToolBase {
             } catch (Exception e) {
                 subscriber.onError( e );
             }
-            logger.info("Done. Read {} collection names", count);
             if ( count > 0 ) {
                 subscriber.onCompleted();
+                logger.info( "Completed. Read {} collection names", count );
             } else {
                 subscriber.unsubscribe();
+                logger.info( "No collections found" );
             }
         }
     }
@@ -273,11 +211,11 @@ public class ExportApp extends ExportingToolBase {
     /**
      * Emits entities of collection.
      */
-    class EntityOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+    class EntityObservable implements rx.Observable.OnSubscribe<ExportEntity> {
         EntityManager em;
         String collection;
 
-        public EntityOnSubscribe(EntityManager em, String collection) {
+        public EntityObservable(EntityManager em, String collection) {
             this.em = em;
             this.collection = collection;
         }
@@ -286,6 +224,8 @@ public class ExportApp extends ExportingToolBase {
 
             logger.info("Starting to read entities of collection {}", collection);
             
+            subscriber.onStart();
+            
             try {
                 int count = 0;
 
@@ -327,10 +267,11 @@ public class ExportApp extends ExportingToolBase {
                     results = em.searchCollection( em.getApplicationRef(), collection, query );
                 }
 
-                logger.info("Done. Read {} entities", count);
                 if ( count > 0 ) {
                     subscriber.onCompleted();
+                    logger.info("Completed collection {}. Read {} entities", collection, count);
                 } else {
+                    logger.info("Completed collection {} empty", collection );
                     subscriber.unsubscribe();
                 }
                 
@@ -343,18 +284,19 @@ public class ExportApp extends ExportingToolBase {
     /**
      * Emits connections of an entity.
      */
-    class ConnectionsOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+    class ConnectionsObservable implements rx.Observable.OnSubscribe<ExportConnection> {
         EntityManager em;
         ExportEntity exportEntity;
 
-        public ConnectionsOnSubscribe(EntityManager em, ExportEntity exportEntity) {
+        public ConnectionsObservable(EntityManager em, ExportEntity exportEntity) {
             this.em = em;
             this.exportEntity = exportEntity;
         }
 
         public void call(Subscriber<? super ExportConnection> subscriber) {
 
-            logger.info("Starting to read connections for entity type {}", exportEntity.getEntity().getType());
+            logger.info( "Starting to read connections for entity {} type {}",
+                    exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
             
             int count = 0;
             
@@ -388,173 +330,16 @@ public class ExportApp extends ExportingToolBase {
             } catch (Exception e) {
                 subscriber.onError( e );
             }
-
-            logger.info("Done. Read {} connections", count);
-            if ( count > 0 ) {
-                subscriber.onCompleted();
-            } else {
-                subscriber.unsubscribe();
-            }
-        }
-    }
-
-    /**
-     * Process collection by starting processing of its entities.
-     */
-    class CollectionAction implements Action1<String> {
-        EntityManager em;
-
-        public CollectionAction( EntityManager em ) {
-            this.em = em;
-        }
-
-        public void call(String collection) {
-
-            // process entities of collection in parallel            
-            EntityOnSubscribe onSubscribe = new EntityOnSubscribe( em, collection );
-            rx.Observable entityObservable = rx.Observable.create( onSubscribe );
-            entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
-                public Observable<ExportEntity> call(ExportEntity exportEntity) {
-                    return Observable.just(exportEntity).doOnNext( 
-                            new EntityAction( em ) ).subscribeOn( readScheduler );
-                }
-            }, 8).subscribeOn(readScheduler).toBlocking().last();
-        }
-    }
-
-    /**
-     * Process entity by adding it to entityWriteQueue and starting processing of its connections.
-     */
-    class EntityAction implements Action1<ExportEntity> {
-        EntityManager em;
-
-        public EntityAction( EntityManager em ) {
-            this.em = em;
-        }
-        
-        public void call(ExportEntity exportEntity) {
-            //logger.debug( "Processing entity: " + exportEntity.getEntity().getUuid() );
-
-            entityWriteQueue.add( exportEntity );
-            entitiesQueued.getAndIncrement();
-
-            // if entity has connections, process them in parallel
-            try {
-                Results connectedEntities = em.getConnectedEntities(
-                        exportEntity.getEntity().getUuid(), null, null, Results.Level.CORE_PROPERTIES );
-
-                if ( !connectedEntities.isEmpty() ) {
-                    ConnectionsOnSubscribe onSubscribe = new ConnectionsOnSubscribe( em, exportEntity );
-                    rx.Observable entityObservable = rx.Observable.create( onSubscribe );
-                    
-                    entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
-                        public Observable<ExportConnection> call(ExportConnection connection) {
-                            return Observable.just(connection).doOnNext( 
-                                    new ConnectionsAction() ).subscribeOn(readScheduler);
-                        }
-                    }, 8).subscribeOn(readScheduler).toBlocking().last();
-                }
-                
-            } catch (Exception e) {
-                throw new RuntimeException( "Error getting connections", e );
-            }
-        }
-    }
-
-    /**
-     * Process connection by adding it to connectionWriteQueue. 
-     */
-    class ConnectionsAction implements Action1<ExportConnection> {
-
-        public void call(ExportConnection conn) {
-            //logger.debug( "Processing connections for entity: " + conn.getSourceUuid() );
-            connectionWriteQueue.add(conn);
-            connectionsQueued.getAndIncrement();
-        }
-    }
-
-
-    // ----------------------------------------------------------------------------------------
-    // writing data
-
-    /**
-     * Emits entities to be written.
-     */
-    class EntityWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
-        BlockingQueue<ExportEntity> queue;
-
-        public EntityWritesOnSubscribe( BlockingQueue<ExportEntity> queue ) {
-            this.queue = queue;
-        }
-
-        public void call(Subscriber<? super ExportEntity> subscriber) {
-            int count = 0;
-            
-            while ( true ) {
-                ExportEntity entity = null;
-                try {
-                    //logger.debug( "Wrote {}. Polling for entity to write...", count );
-                    activePollers.getAndIncrement();
-                    entity = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
-                } catch (InterruptedException e) {
-                    logger.error("Entity poll interrupted", e);
-                    continue;
-                } finally {
-                    activePollers.getAndDecrement();
-                }
-                if ( entity == null ) {
-                    break;
-                }
-                subscriber.onNext( entity );
-                count++;
-            }
-
-            logger.info("Done. De-queued {} entities", count);
-            if ( count > 0 ) {
-                subscriber.onCompleted();
-            } else {
-                subscriber.unsubscribe();
-            }
-        }
-    }
-
-    /**
-     * Emits connections to be written.
-     */
-    class ConnectionWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
-        BlockingQueue<ExportConnection> queue;
-
-        public ConnectionWritesOnSubscribe( BlockingQueue<ExportConnection> queue ) {
-            this.queue = queue;
-        }
-
-        public void call(Subscriber<? super ExportConnection> subscriber) {
-            int count = 0;
             
-            while ( true ) {
-                ExportConnection connection = null;
-                try {
-                    //logger.debug( "Wrote {}. Polling for connection to write", count );
-                    activePollers.getAndIncrement();
-                    connection = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
-                } catch (InterruptedException e) {
-                    logger.error("Connection poll interrupted", e);
-                    continue;
-                } finally {
-                    activePollers.getAndDecrement();
-                }
-                if ( connection == null ) {
-                    break;
-                }
-                subscriber.onNext( connection );
-                count++;
-            }
-
-            logger.info("Done. De-queued {} connections", count);
             if ( count > 0 ) {
                 subscriber.onCompleted();
+                logger.info("Completed entity {} type {} connections count {}",
+                        new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
+                
             } else {
                 subscriber.unsubscribe();
+                logger.info( "Entity {} type {} has no connections",
+                        exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
             }
         }
     }
@@ -566,10 +351,9 @@ public class ExportApp extends ExportingToolBase {
 
         public void call(ExportEntity entity) {
 
-            boolean wroteData = false;
-
             String [] parts = Thread.currentThread().getName().split("-");
-            String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".entities";
+            String fileName = outputDir.getAbsolutePath() + File.separator
+                    + applicationName.replace('/','-') + "-" + parts[3] + ".entities";
 
             JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
             if ( gen == null ) {
@@ -577,6 +361,7 @@ public class ExportApp extends ExportingToolBase {
                 // no generator so we are opening new file and writing the start of an array
                 try {
                     gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+                    logger.info("Opened output file {}", fileName);
                 } catch (IOException e) {
                     throw new RuntimeException("Error opening output file: " + fileName, e);
                 }
@@ -589,15 +374,10 @@ public class ExportApp extends ExportingToolBase {
                 gen.writeObject( entity );
                 gen.writeRaw('\n');
                 entitiesWritten.getAndIncrement();
-                wroteData = true;
 
             } catch (IOException e) {
                 throw new RuntimeException("Error writing to output file: " + fileName, e);
             }
-
-            if ( !wroteData ) {
-                emptyFiles.add( fileName );
-            }
         }
     }
 
@@ -608,10 +388,9 @@ public class ExportApp extends ExportingToolBase {
 
         public void call(ExportConnection conn) {
 
-            boolean wroteData = false;
-
             String [] parts = Thread.currentThread().getName().split("-");
-            String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".connections";
+            String fileName = outputDir.getAbsolutePath() + File.separator
+                    + applicationName.replace('/','-') + "-" + parts[3] + ".connections";
 
             JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
             if ( gen == null ) {
@@ -619,6 +398,7 @@ public class ExportApp extends ExportingToolBase {
                 // no generator so we are opening new file and writing the start of an array
                 try {
                     gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+                    logger.info("Opened output file {}", fileName);
                 } catch (IOException e) {
                     throw new RuntimeException("Error opening output file: " + fileName, e);
                 }
@@ -631,18 +411,41 @@ public class ExportApp extends ExportingToolBase {
                 gen.writeObject( conn );
                 gen.writeRaw('\n');
                 connectionsWritten.getAndIncrement();
-                wroteData = true;
 
             } catch (IOException e) {
                 throw new RuntimeException("Error writing to output file: " + fileName, e);
             }
+        }
+    }
+
+    private class FileWrapUpAction implements Action0 {
+        @Override
+        public void call() {
+
+            logger.info("-------------------------------------------------------------------");
+            logger.info("DONE! Entities: {} Connections: {}", entitiesWritten.get(), connectionsWritten.get());
+            logger.info("-------------------------------------------------------------------");
 
-            if ( !wroteData ) {
-                emptyFiles.add( fileName );
+            for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
+                try {
+                    //gen.writeEndArray();
+                    gen.flush();
+                    gen.close();
+                } catch (IOException e) {
+                    logger.error("Error closing output file", e);
+                }
+            }
+            for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
+                try {
+                    //gen.writeEndArray();
+                    gen.flush();
+                    gen.close();
+                } catch (IOException e) {
+                    logger.error("Error closing output file", e);
+                }
             }
         }
     }
-
 }
 
 class ExportEntity {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
index 3de220c..a7c3905 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
@@ -138,7 +138,7 @@ public abstract class ExportingToolBase extends ToolBase {
 
         if ( !file.mkdirs() ) {
 
-            throw new RuntimeException( String.format( "Unable to create diretory %s", dirName ) );
+            throw new RuntimeException( String.format( "Unable to create directory %s", dirName ) );
         }
 
         return file;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index 14b9311..af8306f 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -27,14 +27,26 @@ import org.apache.usergrid.persistence.EntityManager;
 import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 public class ExportAppTest {
     static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
+    
+    int NUM_COLLECTIONS = 5;
+    int NUM_ENTITIES = 10; 
+    int NUM_CONNECTIONS = 1;
 
     @ClassRule
     public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -52,46 +64,110 @@ public class ExportAppTest {
         ApplicationInfo appInfo = setup.getMgmtSvc().createApplication(
                 orgInfo.getOrganization().getUuid(), "app_" + rand );
 
-        EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
+        final EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
         
-        // create 10 connected things
+        // create connected things
 
-        List<Entity> connectedThings = new ArrayList<Entity>();
+        final List<Entity> connectedThings = new ArrayList<Entity>();
         String connectedType = "connected_thing";
         em.createApplicationCollection(connectedType);
-        for ( int j=0; j<10; j++) {
+        for ( int j=0; j<NUM_CONNECTIONS; j++) {
             final String name = "connected_thing_" + j;
             connectedThings.add( em.create( connectedType, new HashMap<String, Object>() {{
                 put( "name", name );
             }} ) );
         }
        
-        // create 10 collections of 10 things, every other thing is connected to the connected things
-        
-        for ( int i=0; i<10; i++) {
-            String type = "thing_"+i;
-            em.createApplicationCollection(type);
-            for ( int j=0; j<10; j++) {
-                final String name = "thing_" + j;
-                Entity source = em.create(type, new HashMap<String, Object>() {{ put("name", name); }});
-                if ( j % 2 == 0 ) {
-                    for ( Entity target : connectedThings ) {
-                        em.createConnection( source, "has", target );
+        // create collections of things, every other thing is connected to the connected things
+
+        final AtomicInteger entitiesCount = new AtomicInteger(0);
+        final AtomicInteger connectionCount = new AtomicInteger(0);
+
+        ExecutorService execService = Executors.newFixedThreadPool( 50);
+        final Scheduler scheduler = Schedulers.from( execService );
+
+        Observable.range( 0, NUM_COLLECTIONS ).flatMap( new Func1<Integer, Observable<?>>() {
+            @Override
+            public Observable<?> call(Integer i) {
+                
+                return Observable.just( i ).doOnNext( new Action1<Integer>() {
+                    @Override
+                    public void call(Integer i) {
+                        
+                        final String type = "thing_"+i;
+                        try {
+                            em.createApplicationCollection( type );
+                            connectionCount.getAndIncrement();
+                            
+                        } catch (Exception e) {
+                            throw new RuntimeException( "Error creating collection", e );
+                        }
+                       
+                        Observable.range( 0, NUM_ENTITIES ).flatMap( new Func1<Integer, Observable<?>>() {
+                            @Override
+                            public Observable<?> call(Integer j) {
+                                return Observable.just( j ).doOnNext( new Action1<Integer>() {
+                                    @Override
+                                    public void call(Integer j) {
+                                        
+                                        final String name = "thing_" + j;
+                                        try {
+                                            final Entity source = em.create( 
+                                                    type, new HashMap<String, Object>() {{ put("name", name); }});
+                                            entitiesCount.getAndIncrement();
+                                            logger.info( "Created entity {} type {}", name, type );
+                                            
+                                            for ( Entity target : connectedThings ) {
+                                                em.createConnection( source, "has", target );
+                                                connectionCount.getAndIncrement();
+                                                logger.info( "Created connection from entity {} type {} to {}",
+                                                        new Object[]{name, type, target.getName()} );
+                                            }
+
+
+                                        } catch (Exception e) {
+                                            throw new RuntimeException( "Error creating collection", e );
+                                        }
+                                        
+                                        
+                                    }
+                                    
+                                } );
+
+                            }
+                        }, 50 ).subscribeOn( scheduler ).subscribe(); // toBlocking().last();
+                        
                     }
-                }
+                } );
+                
+
             }
+        }, 30 ).subscribeOn( scheduler ).toBlocking().last();
+
+        while ( entitiesCount.get() < NUM_COLLECTIONS * NUM_ENTITIES ) {
+            Thread.sleep( 5000 );
+            logger.info( "Still working. Created {} entities and {} connections", 
+                    entitiesCount.get(), connectionCount.get() );
         }
-        
-        // export to file
 
-        String directoryName = "./target/export" + rand;
+        logger.info( "Done. Created {} entities and {} connections", entitiesCount.get(), connectionCount.get() );
+
+        long start = System.currentTimeMillis();
+        
+        String directoryName = "target/export" + rand;
 
         ExportApp exportApp = new ExportApp();
         exportApp.startTool( new String[]{
                 "-application", appInfo.getName(),
+                "-readThreads", "50",
+                "-writeThreads", "10",
                 "-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
                 "-outputDir", directoryName
         }, false );
         
+        logger.info("time = " + (System.currentTimeMillis() - start)/1000 + "s");
+
+
+        
     }
 }
\ No newline at end of file