You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/21 22:09:21 UTC
[07/10] incubator-usergrid git commit: Make flatmap max observables
match write thread count and use Schedulers.io() instead of a custom
readScheduler.
Make flatmap max observables match write thread count and use Schedulers.io() instead of a custom readScheduler.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b58390d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b58390d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b58390d9
Branch: refs/heads/master
Commit: b58390d96eb423287247774533b18e9c4ee43843
Parents: dab84e9
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jul 15 13:53:55 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jul 15 13:53:55 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportApp.java | 39 +++++---------------
.../apache/usergrid/tools/ExportAppTest.java | 12 +++---
2 files changed, 14 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b58390d9/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index c302a74..b2da0ea 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -54,7 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*
* Will create as many output files as there are writeThreads (by default: 10).
*
- * Will create two types of files: *.uge for Usegrird entities and *.ugc for entity to entity connections.
+ * Will create two types of files: *.entities for Usegrird entities and *.collections for entity to entity connections.
*
* Every line of the data files is a complete JSON object.
*/
@@ -62,7 +62,6 @@ public class ExportApp extends ExportingToolBase {
static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
static final String APPLICATION_NAME = "application";
- private static final String READ_THREAD_COUNT = "readThreads";
private static final String WRITE_THREAD_COUNT = "writeThreads";
String applicationName;
@@ -71,16 +70,13 @@ public class ExportApp extends ExportingToolBase {
AtomicInteger entitiesWritten = new AtomicInteger(0);
AtomicInteger connectionsWritten = new AtomicInteger(0);
- Scheduler readScheduler;
Scheduler writeScheduler;
ObjectMapper mapper = new ObjectMapper();
Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
- // set via CLI
- int readThreadCount = 80;
- int writeThreadCount = 10; // limiting write will limit output files
+ int writeThreadCount = 10; // set via CLI option; limiting write will limit output files
@Override
@@ -93,10 +89,6 @@ public class ExportApp extends ExportingToolBase {
.withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
options.addOption( appNameOption );
- Option readThreadsOption = OptionBuilder.hasArg().withType(0)
- .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
- options.addOption( readThreadsOption );
-
Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
.withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
options.addOption( writeThreadsOption );
@@ -113,15 +105,6 @@ public class ExportApp extends ExportingToolBase {
applicationName = line.getOptionValue( APPLICATION_NAME );
- if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
- try {
- readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
- } catch (NumberFormatException nfe) {
- logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
- return;
- }
- }
-
if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
try {
writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
@@ -144,9 +127,6 @@ public class ExportApp extends ExportingToolBase {
final EntityManager em = emf.getEntityManager( applicationId );
organizationName = em.getApplication().getOrganizationName();
- ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
- readScheduler = Schedulers.from( readThreadPoolExecutor );
-
ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
writeScheduler = Schedulers.from( writeThreadPoolExecutor );
@@ -155,19 +135,18 @@ public class ExportApp extends ExportingToolBase {
collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
public Observable<ExportEntity> call(String collection) {
- return Observable.create( new EntityObservable( em, collection ))
+ return Observable.create( new EntityObservable( em, collection ) )
.doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
}
-
- }, 10).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
-
+
+ }, writeThreadCount ).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
+
public Observable<ExportConnection> call(ExportEntity exportEntity) {
- return Observable.create( new ConnectionsObservable( em, exportEntity ))
+ return Observable.create( new ConnectionsObservable( em, exportEntity ) )
.doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
}
-
- }, 10)
- .subscribeOn( readScheduler )
+
+ }, writeThreadCount)
.doOnCompleted( new FileWrapUpAction() )
.toBlocking().last();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b58390d9/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index c5411fd..a1e3f6b 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -24,7 +24,6 @@ import org.apache.usergrid.management.ApplicationInfo;
import org.apache.usergrid.management.OrganizationOwnerInfo;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
-import org.junit.Assert;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,9 +131,9 @@ public class ExportAppTest {
File exportDir = new File(directoryName);
assertTrue( getFileCount( exportDir, "entities" ) > 0 );
- assertTrue( getFileCount( exportDir, "collections" ) > 0 );
- assertTrue( getFileCount( exportDir, "entities" ) >= 100 );
- assertTrue( getFileCount( exportDir, "collections" ) >= 100 );
+ assertTrue( getFileCount( exportDir, "connections" ) > 0 );
+ assertTrue( getFileCount( exportDir, "entities" ) <= 100 );
+ assertTrue( getFileCount( exportDir, "connections" ) <= 100 );
File exportDir1 = new File(directoryName + "1");
exportApp.startTool( new String[]{
@@ -147,9 +146,8 @@ public class ExportAppTest {
logger.info( "1 thread time = " + (System.currentTimeMillis() - start) / 1000 + "s" );
- exportDir = new File(directoryName);
- assertEquals( 1, getFileCount( exportDir, "entities" ));
- assertEquals( 1, getFileCount( exportDir, "collections" ));
+ assertEquals( 1, getFileCount( exportDir1, "entities" ));
+ assertEquals( 1, getFileCount( exportDir1, "connections" ));
}
private static int getFileCount(File exportDir, final String ext ) {