You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/22 17:17:48 UTC

[07/15] incubator-usergrid git commit: Make flatmap max observables match write thread count and use Schedulers.io() instead of a custom readScheduler.

Make flatmap max observables match write thread count and use Schedulers.io() instead of a custom readScheduler.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b58390d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b58390d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b58390d9

Branch: refs/heads/two-dot-o
Commit: b58390d96eb423287247774533b18e9c4ee43843
Parents: dab84e9
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 15 13:53:55 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 15 13:53:55 2015 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/tools/ExportApp.java    | 39 +++++---------------
 .../apache/usergrid/tools/ExportAppTest.java    | 12 +++---
 2 files changed, 14 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b58390d9/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index c302a74..b2da0ea 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -54,7 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * 
  * Will create as many output files as there are writeThreads (by default: 10).
  * 
- * Will create two types of files: *.uge for Usegrird entities and *.ugc for entity to entity connections.
+ * Will create two types of files: *.entities for Usegrird entities and *.collections for entity to entity connections.
  * 
  * Every line of the data files is a complete JSON object.
  */
@@ -62,7 +62,6 @@ public class ExportApp extends ExportingToolBase {
     static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
 
     static final String APPLICATION_NAME = "application";
-    private static final String READ_THREAD_COUNT = "readThreads";
     private static final String WRITE_THREAD_COUNT = "writeThreads";
    
     String applicationName;
@@ -71,16 +70,13 @@ public class ExportApp extends ExportingToolBase {
     AtomicInteger entitiesWritten = new AtomicInteger(0);
     AtomicInteger connectionsWritten = new AtomicInteger(0);
 
-    Scheduler readScheduler;
     Scheduler writeScheduler;
 
     ObjectMapper mapper = new ObjectMapper();
     Map<Thread, JsonGenerator> entityGeneratorsByThread  = new HashMap<Thread, JsonGenerator>();
     Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
 
-    // set via CLI
-    int readThreadCount = 80;
-    int writeThreadCount = 10; // limiting write will limit output files 
+    int writeThreadCount = 10; // set via CLI option; limiting write will limit output files 
 
 
     @Override
@@ -93,10 +89,6 @@ public class ExportApp extends ExportingToolBase {
                 .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
         options.addOption( appNameOption );
 
-        Option readThreadsOption = OptionBuilder.hasArg().withType(0)
-                .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
-        options.addOption( readThreadsOption );
-
         Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
                 .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
         options.addOption( writeThreadsOption );
@@ -113,15 +105,6 @@ public class ExportApp extends ExportingToolBase {
         
         applicationName = line.getOptionValue( APPLICATION_NAME );
 
-        if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
-            try {
-                readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
-            } catch (NumberFormatException nfe) {
-                logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
-                return;
-            }
-        }
-        
         if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
             try {
                 writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
@@ -144,9 +127,6 @@ public class ExportApp extends ExportingToolBase {
         final EntityManager em = emf.getEntityManager( applicationId );
         organizationName = em.getApplication().getOrganizationName();
 
-        ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
-        readScheduler = Schedulers.from( readThreadPoolExecutor );
-
         ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
         writeScheduler = Schedulers.from( writeThreadPoolExecutor );
 
@@ -155,19 +135,18 @@ public class ExportApp extends ExportingToolBase {
         collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
             
             public Observable<ExportEntity> call(String collection) {
-                return Observable.create( new EntityObservable( em, collection ))
+                return Observable.create( new EntityObservable( em, collection ) )
                         .doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
             }
-            
-        }, 10).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
-            
+
+        }, writeThreadCount ).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
+
             public Observable<ExportConnection> call(ExportEntity exportEntity) {
-                return Observable.create( new ConnectionsObservable( em, exportEntity ))
+                return Observable.create( new ConnectionsObservable( em, exportEntity ) )
                         .doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
             }
-            
-        }, 10)
-            .subscribeOn( readScheduler )
+
+        }, writeThreadCount)
             .doOnCompleted( new FileWrapUpAction() )
             .toBlocking().last();
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b58390d9/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index c5411fd..a1e3f6b 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -24,7 +24,6 @@ import org.apache.usergrid.management.ApplicationInfo;
 import org.apache.usergrid.management.OrganizationOwnerInfo;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
-import org.junit.Assert;
 import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,9 +131,9 @@ public class ExportAppTest {
         
         File exportDir = new File(directoryName);
         assertTrue( getFileCount( exportDir, "entities"    ) > 0 );
-        assertTrue( getFileCount( exportDir, "collections" ) > 0 );
-        assertTrue( getFileCount( exportDir, "entities" ) >= 100 );
-        assertTrue( getFileCount( exportDir, "collections" ) >= 100 );
+        assertTrue( getFileCount( exportDir, "connections" ) > 0 );
+        assertTrue( getFileCount( exportDir, "entities"    ) <= 100 );
+        assertTrue( getFileCount( exportDir, "connections" ) <= 100 );
 
         File exportDir1 = new File(directoryName + "1");
         exportApp.startTool( new String[]{
@@ -147,9 +146,8 @@ public class ExportAppTest {
 
         logger.info( "1 thread time = " + (System.currentTimeMillis() - start) / 1000 + "s" );
 
-        exportDir = new File(directoryName);
-        assertEquals( 1, getFileCount( exportDir, "entities" ));
-        assertEquals( 1, getFileCount( exportDir, "collections" ));
+        assertEquals( 1, getFileCount( exportDir1, "entities" ));
+        assertEquals( 1, getFileCount( exportDir1, "connections" ));
     }
 
     private static int getFileCount(File exportDir, final String ext ) {