You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/14 19:31:22 UTC
[1/2] incubator-usergrid git commit: Add readThread and writeThread
CLI parameters.
Repository: incubator-usergrid
Updated Branches:
refs/heads/rxportapp b2bdbb545 -> 7a870d692
Add readThread and writeThread CLI parameters.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2b65e619
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2b65e619
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2b65e619
Branch: refs/heads/rxportapp
Commit: 2b65e619316b206720e574a21fe1b33462e0f2dd
Parents: b2bdbb5
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 08:03:18 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 08:03:18 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportApp.java | 153 +++++++++++++------
1 file changed, 106 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2b65e619/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index ceb3ecd..21c63a0 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.utils.StringUtils;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
@@ -46,11 +47,22 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
- * Export application's collections.
+ * Export all entities and connections of a Usergrid app.
+ *
+ * Exports data files to specified directory.
+ *
+ * Will create as many output files as there are writeThreads (by default: 10).
+ *
+ * Will create two types of files: *.uge for Usegrird entities and *.ugc for entity to entity connections.
+ *
+ * Every line of the data files is a complete JSON object.
*/
public class ExportApp extends ExportingToolBase {
static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
+ private static final String READ_THREAD_COUNT = "readThreads";
+ private static final String WRITE_THREAD_COUNT = "writeThreads";
+
// we will write two types of files: entities and connections
BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
@@ -58,10 +70,15 @@ public class ExportApp extends ExportingToolBase {
static final String APPLICATION_NAME = "application";
int pollTimeoutSeconds = 10;
+
+ int readThreadCount = 80;
+ int writeThreadCount = 10;
+
+ String applicationName;
+ String organizationName;
// limiting output threads will limit output files
- final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(8);
- final Scheduler scheduler = Schedulers.from( threadPoolExecutor );
+ Scheduler readScheduler;
Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
@@ -77,29 +94,37 @@ public class ExportApp extends ExportingToolBase {
ObjectMapper mapper = new ObjectMapper();
/**
- * Export admin users using multiple threads.
- * <p/>
- * How it works:
- * In main thread we query for IDs of all admin users, add each ID to read queue.
- * Read-queue workers read admin user data, add data to write queue.
- * One write-queue worker reads data writes to file.
+ * Tool entry point.
*/
@Override
public void runTool(CommandLine line) throws Exception {
- String applicationName = line.getOptionValue( APPLICATION_NAME );
-
-// if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
-// try {
-// readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
-// } catch (NumberFormatException nfe) {
-// logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
-// return;
-// }
-// } else {
-// readThreadCount = 20;
-// }
+ applicationName = line.getOptionValue( APPLICATION_NAME );
+
+ if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
+ try {
+ readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
+ } catch (NumberFormatException nfe) {
+ logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+ return;
+ }
+ }
+ if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
+ try {
+ writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
+ } catch (NumberFormatException nfe) {
+ logger.error( "-" + WRITE_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+ return;
+ }
+ }
+
+ ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
+ readScheduler = Schedulers.from( readThreadPoolExecutor );
+
+ ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
+ final Scheduler writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+
startSpring();
setVerbose( line );
@@ -111,6 +136,7 @@ public class ExportApp extends ExportingToolBase {
UUID applicationId = emf.lookupApplication( applicationName );
final EntityManager em = emf.getEntityManager( applicationId );
+ organizationName = em.getApplication().getOrganizationName();
// start write queue workers
@@ -119,18 +145,18 @@ public class ExportApp extends ExportingToolBase {
entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
public Observable<ExportEntity> call(ExportEntity exportEntity) {
return Observable.just(exportEntity).doOnNext(
- new EntityWriteAction() ).subscribeOn( scheduler );
+ new EntityWriteAction() ).subscribeOn( writeScheduler );
}
- },10).subscribeOn( scheduler ).subscribe();
+ },10).subscribeOn( writeScheduler ).subscribe();
ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
public Observable<ExportConnection> call(ExportConnection connection ) {
return Observable.just(connection).doOnNext(
- new ConnectionWriteAction()).subscribeOn( scheduler );
+ new ConnectionWriteAction()).subscribeOn( writeScheduler );
}
- },10).subscribeOn( scheduler ).subscribe();
+ },10).subscribeOn( writeScheduler ).subscribe();
// start processing data and filling up write queues
@@ -139,9 +165,9 @@ public class ExportApp extends ExportingToolBase {
collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
public Observable<String> call(String collection) {
return Observable.just(collection).doOnNext(
- new CollectionAction( em ) ).subscribeOn( Schedulers.io() );
+ new CollectionAction( em ) ).subscribeOn( readScheduler );
}
- },40).subscribeOn( Schedulers.io() ).subscribe();
+ },40).subscribeOn( readScheduler ).subscribe();
// wait for write thread pollers to get started
@@ -192,14 +218,18 @@ public class ExportApp extends ExportingToolBase {
Options options = super.createOptions();
- Option readThreads = OptionBuilder.hasArg().withType("")
+ Option appNameOption = OptionBuilder.hasArg().withType("")
.withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
- options.addOption( readThreads );
-
-// Option readThreads = OptionBuilder
-// .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
-// options.addOption( readThreads );
-
+ options.addOption( appNameOption );
+
+ Option readThreadsOption = OptionBuilder
+ .hasArg().withType(0).withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+ options.addOption( readThreadsOption );
+
+ Option writeThreadsOption = OptionBuilder
+ .hasArg().withType(0).withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+ options.addOption( writeThreadsOption );
+
return options;
}
@@ -276,9 +306,13 @@ public class ExportApp extends ExportingToolBase {
}
dictionariesByName.put( dictionary, dict );
}
+
ExportEntity exportEntity = new ExportEntity(
- em.getApplication().getApplicationName(),
- entity, dictionariesByName );
+ organizationName,
+ applicationName,
+ entity,
+ dictionariesByName );
+
subscriber.onNext( exportEntity );
count++;
@@ -333,11 +367,14 @@ public class ExportApp extends ExportingToolBase {
for (Entity connectedEntity : results.getEntities()) {
try {
+
ExportConnection connection = new ExportConnection(
- em.getApplication().getApplicationName(),
+ applicationName,
+ organizationName,
connectionType,
exportEntity.getEntity().getUuid(),
connectedEntity.getUuid());
+
subscriber.onNext( connection );
count++;
@@ -379,9 +416,9 @@ public class ExportApp extends ExportingToolBase {
entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
public Observable<ExportEntity> call(ExportEntity exportEntity) {
return Observable.just(exportEntity).doOnNext(
- new EntityAction( em ) ).subscribeOn( Schedulers.io() );
+ new EntityAction( em ) ).subscribeOn( readScheduler );
}
- }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+ }, 8).subscribeOn(readScheduler).toBlocking().last();
}
}
@@ -413,9 +450,9 @@ public class ExportApp extends ExportingToolBase {
entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
public Observable<ExportConnection> call(ExportConnection connection) {
return Observable.just(connection).doOnNext(
- new ConnectionsAction() ).subscribeOn( Schedulers.io() );
+ new ConnectionsAction() ).subscribeOn(readScheduler);
}
- }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+ }, 8).subscribeOn(readScheduler).toBlocking().last();
}
} catch (Exception e) {
@@ -472,7 +509,7 @@ public class ExportApp extends ExportingToolBase {
count++;
}
- logger.info("Done. Wrote {} entities", count);
+ logger.info("Done. De-queued {} entities", count);
if ( count > 0 ) {
subscriber.onCompleted();
} else {
@@ -513,7 +550,7 @@ public class ExportApp extends ExportingToolBase {
count++;
}
- logger.info("Done. Wrote {} connections", count);
+ logger.info("Done. De-queued {} connections", count);
if ( count > 0 ) {
subscriber.onCompleted();
} else {
@@ -531,7 +568,8 @@ public class ExportApp extends ExportingToolBase {
boolean wroteData = false;
- String fileName = "target/" + Thread.currentThread().getName() + ".ude";
+ String [] parts = Thread.currentThread().getName().split("-");
+ String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".entities";
JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
if ( gen == null ) {
@@ -572,7 +610,8 @@ public class ExportApp extends ExportingToolBase {
boolean wroteData = false;
- String fileName = "target/" + Thread.currentThread().getName() + ".ugc";
+ String [] parts = Thread.currentThread().getName().split("-");
+ String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".connections";
JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
if ( gen == null ) {
@@ -607,10 +646,12 @@ public class ExportApp extends ExportingToolBase {
}
class ExportEntity {
+ private String organization;
private String application;
private Entity entity;
private Map<String, Object> dictionaries;
- public ExportEntity( String application, Entity entity, Map<String, Object> dictionaries ) {
+ public ExportEntity( String organization, String application, Entity entity, Map<String, Object> dictionaries ) {
+ this.organization = organization;
this.application = application;
this.entity = entity;
this.dictionaries = dictionaries;
@@ -639,14 +680,24 @@ class ExportEntity {
public void setDictionaries(Map<String, Object> dictionaries) {
this.dictionaries = dictionaries;
}
+
+ public String getOrganization() {
+ return organization;
+ }
+
+ public void setOrganization(String organization) {
+ this.organization = organization;
+ }
}
class ExportConnection {
+ private String organization;
private String application;
private String connectionType;
private UUID sourceUuid;
private UUID targetUuid;
- public ExportConnection(String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+ public ExportConnection(String organization, String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+ this.organization= organization;
this.application = application;
this.connectionType = connectionType;
this.sourceUuid = sourceUuid;
@@ -684,4 +735,12 @@ class ExportConnection {
public void setTargetUuid(UUID targetUuid) {
this.targetUuid = targetUuid;
}
+
+ public String getOrganization() {
+ return organization;
+ }
+
+ public void setOrganization(String organization) {
+ this.organization = organization;
+ }
}
[2/2] incubator-usergrid git commit: Remove queues and make the whole
thing one "stream"
Posted by sn...@apache.org.
Remove queues and make the whole thing one "stream"
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7a870d69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7a870d69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7a870d69
Branch: refs/heads/rxportapp
Commit: 7a870d6929cea76f5bbec9aa8f5a8caa8dee07e4
Parents: 2b65e61
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 13:31:18 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 13:31:18 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportApp.java | 375 +++++--------------
.../usergrid/tools/ExportingToolBase.java | 2 +-
.../apache/usergrid/tools/ExportAppTest.java | 114 +++++-
3 files changed, 185 insertions(+), 306 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index 21c63a0..59509c0 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
+import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -42,7 +43,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@@ -59,40 +61,29 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class ExportApp extends ExportingToolBase {
static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
-
- private static final String READ_THREAD_COUNT = "readThreads";
- private static final String WRITE_THREAD_COUNT = "writeThreads";
-
- // we will write two types of files: entities and connections
- BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
- BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
static final String APPLICATION_NAME = "application";
-
- int pollTimeoutSeconds = 10;
-
- int readThreadCount = 80;
- int writeThreadCount = 10;
-
+ private static final String READ_THREAD_COUNT = "readThreads";
+ private static final String WRITE_THREAD_COUNT = "writeThreads";
+
String applicationName;
String organizationName;
- // limiting output threads will limit output files
+ AtomicInteger entitiesWritten = new AtomicInteger(0);
+ AtomicInteger connectionsWritten = new AtomicInteger(0);
+
Scheduler readScheduler;
+ Scheduler writeScheduler;
+ ObjectMapper mapper = new ObjectMapper();
Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
- List<String> emptyFiles = new ArrayList<String>();
+ // set via CLI
+ int readThreadCount = 80;
+ int writeThreadCount = 10; // limiting write will limit output files
- AtomicInteger activePollers = new AtomicInteger(0);
- AtomicInteger entitiesQueued = new AtomicInteger(0);
- AtomicInteger entitiesWritten = new AtomicInteger(0);
- AtomicInteger connectionsWritten = new AtomicInteger(0);
- AtomicInteger connectionsQueued = new AtomicInteger(0);
- ObjectMapper mapper = new ObjectMapper();
-
/**
* Tool entry point.
*/
@@ -123,7 +114,7 @@ public class ExportApp extends ExportingToolBase {
readScheduler = Schedulers.from( readThreadPoolExecutor );
ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
- final Scheduler writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+ writeScheduler = Schedulers.from( writeThreadPoolExecutor );
startSpring();
@@ -138,78 +129,26 @@ public class ExportApp extends ExportingToolBase {
final EntityManager em = emf.getEntityManager( applicationId );
organizationName = em.getApplication().getOrganizationName();
- // start write queue workers
-
- EntityWritesOnSubscribe entityWritesOnSub = new EntityWritesOnSubscribe( entityWriteQueue );
- rx.Observable entityWritesObservable = rx.Observable.create( entityWritesOnSub );
- entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
- public Observable<ExportEntity> call(ExportEntity exportEntity) {
- return Observable.just(exportEntity).doOnNext(
- new EntityWriteAction() ).subscribeOn( writeScheduler );
- }
- },10).subscribeOn( writeScheduler ).subscribe();
-
- ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
- rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
- connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
- public Observable<ExportConnection> call(ExportConnection connection ) {
- return Observable.just(connection).doOnNext(
- new ConnectionWriteAction()).subscribeOn( writeScheduler );
+ Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
+
+ collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
+
+ public Observable<ExportEntity> call(String collection) {
+ return Observable.create( new EntityObservable( em, collection ))
+ .doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
}
- },10).subscribeOn( writeScheduler ).subscribe();
-
- // start processing data and filling up write queues
-
- CollectionsOnSubscribe onSubscribe = new CollectionsOnSubscribe( em );
- rx.Observable collectionsObservable = rx.Observable.create( onSubscribe );
- collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
- public Observable<String> call(String collection) {
- return Observable.just(collection).doOnNext(
- new CollectionAction( em ) ).subscribeOn( readScheduler );
+
+ }, 10).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
+
+ public Observable<ExportConnection> call(ExportEntity exportEntity) {
+ return Observable.create( new ConnectionsObservable( em, exportEntity ))
+ .doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
}
- },40).subscribeOn( readScheduler ).subscribe();
-
- // wait for write thread pollers to get started
-
- try { Thread.sleep( 1000 ); } catch (InterruptedException ignored) {}
-
- // wait for write-thread pollers to stop
-
- while ( activePollers.get() > 0 ) {
- logger.info(
- "Active write threads: {}\n"
- +"Entities written: {}\n"
- +"Entities queued: {}\n"
- +"Connections written: {}\n"
- +"Connections queued: {}\n",
- new Object[] {
- activePollers.get(),
- entitiesWritten.get(),
- entitiesQueued.get(),
- connectionsWritten.get(),
- connectionsQueued.get()} );
- try { Thread.sleep( 5000 ); } catch (InterruptedException ignored) {}
- }
-
- // wrap up files
-
- for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
- //gen.writeEndArray();
- gen.flush();
- gen.close();
- }
-
- for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
- //gen.writeEndArray();
- gen.flush();
- gen.close();
- }
-
- for ( String fileName : emptyFiles ) {
- File emptyFile = new File(fileName);
- emptyFile.deleteOnExit();
- }
-
+
+ }, 10)
+ .subscribeOn( readScheduler )
+ .doOnCompleted( new FileWrapUpAction() )
+ .toBlocking().last();
}
@Override
@@ -222,12 +161,12 @@ public class ExportApp extends ExportingToolBase {
.withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
options.addOption( appNameOption );
- Option readThreadsOption = OptionBuilder
- .hasArg().withType(0).withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+ Option readThreadsOption = OptionBuilder.hasArg().withType(0)
+ .withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
options.addOption( readThreadsOption );
- Option writeThreadsOption = OptionBuilder
- .hasArg().withType(0).withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+ Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
+ .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
options.addOption( writeThreadsOption );
return options;
@@ -239,17 +178,15 @@ public class ExportApp extends ExportingToolBase {
/**
* Emits collection names found in application.
*/
- class CollectionsOnSubscribe implements rx.Observable.OnSubscribe<String> {
+ class CollectionsObservable implements rx.Observable.OnSubscribe<String> {
EntityManager em;
- public CollectionsOnSubscribe( EntityManager em ) {
+ public CollectionsObservable(EntityManager em) {
this.em = em;
}
public void call(Subscriber<? super String> subscriber) {
- logger.info("Starting to read collections");
-
int count = 0;
try {
Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
@@ -261,11 +198,12 @@ public class ExportApp extends ExportingToolBase {
} catch (Exception e) {
subscriber.onError( e );
}
- logger.info("Done. Read {} collection names", count);
if ( count > 0 ) {
subscriber.onCompleted();
+ logger.info( "Completed. Read {} collection names", count );
} else {
subscriber.unsubscribe();
+ logger.info( "No collections found" );
}
}
}
@@ -273,11 +211,11 @@ public class ExportApp extends ExportingToolBase {
/**
* Emits entities of collection.
*/
- class EntityOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
+ class EntityObservable implements rx.Observable.OnSubscribe<ExportEntity> {
EntityManager em;
String collection;
- public EntityOnSubscribe(EntityManager em, String collection) {
+ public EntityObservable(EntityManager em, String collection) {
this.em = em;
this.collection = collection;
}
@@ -286,6 +224,8 @@ public class ExportApp extends ExportingToolBase {
logger.info("Starting to read entities of collection {}", collection);
+ subscriber.onStart();
+
try {
int count = 0;
@@ -327,10 +267,11 @@ public class ExportApp extends ExportingToolBase {
results = em.searchCollection( em.getApplicationRef(), collection, query );
}
- logger.info("Done. Read {} entities", count);
if ( count > 0 ) {
subscriber.onCompleted();
+ logger.info("Completed collection {}. Read {} entities", collection, count);
} else {
+ logger.info("Completed collection {} empty", collection );
subscriber.unsubscribe();
}
@@ -343,18 +284,19 @@ public class ExportApp extends ExportingToolBase {
/**
* Emits connections of an entity.
*/
- class ConnectionsOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
+ class ConnectionsObservable implements rx.Observable.OnSubscribe<ExportConnection> {
EntityManager em;
ExportEntity exportEntity;
- public ConnectionsOnSubscribe(EntityManager em, ExportEntity exportEntity) {
+ public ConnectionsObservable(EntityManager em, ExportEntity exportEntity) {
this.em = em;
this.exportEntity = exportEntity;
}
public void call(Subscriber<? super ExportConnection> subscriber) {
- logger.info("Starting to read connections for entity type {}", exportEntity.getEntity().getType());
+ logger.info( "Starting to read connections for entity {} type {}",
+ exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
int count = 0;
@@ -388,173 +330,16 @@ public class ExportApp extends ExportingToolBase {
} catch (Exception e) {
subscriber.onError( e );
}
-
- logger.info("Done. Read {} connections", count);
- if ( count > 0 ) {
- subscriber.onCompleted();
- } else {
- subscriber.unsubscribe();
- }
- }
- }
-
- /**
- * Process collection by starting processing of its entities.
- */
- class CollectionAction implements Action1<String> {
- EntityManager em;
-
- public CollectionAction( EntityManager em ) {
- this.em = em;
- }
-
- public void call(String collection) {
-
- // process entities of collection in parallel
- EntityOnSubscribe onSubscribe = new EntityOnSubscribe( em, collection );
- rx.Observable entityObservable = rx.Observable.create( onSubscribe );
- entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
- public Observable<ExportEntity> call(ExportEntity exportEntity) {
- return Observable.just(exportEntity).doOnNext(
- new EntityAction( em ) ).subscribeOn( readScheduler );
- }
- }, 8).subscribeOn(readScheduler).toBlocking().last();
- }
- }
-
- /**
- * Process entity by adding it to entityWriteQueue and starting processing of its connections.
- */
- class EntityAction implements Action1<ExportEntity> {
- EntityManager em;
-
- public EntityAction( EntityManager em ) {
- this.em = em;
- }
-
- public void call(ExportEntity exportEntity) {
- //logger.debug( "Processing entity: " + exportEntity.getEntity().getUuid() );
-
- entityWriteQueue.add( exportEntity );
- entitiesQueued.getAndIncrement();
-
- // if entity has connections, process them in parallel
- try {
- Results connectedEntities = em.getConnectedEntities(
- exportEntity.getEntity().getUuid(), null, null, Results.Level.CORE_PROPERTIES );
-
- if ( !connectedEntities.isEmpty() ) {
- ConnectionsOnSubscribe onSubscribe = new ConnectionsOnSubscribe( em, exportEntity );
- rx.Observable entityObservable = rx.Observable.create( onSubscribe );
-
- entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
- public Observable<ExportConnection> call(ExportConnection connection) {
- return Observable.just(connection).doOnNext(
- new ConnectionsAction() ).subscribeOn(readScheduler);
- }
- }, 8).subscribeOn(readScheduler).toBlocking().last();
- }
-
- } catch (Exception e) {
- throw new RuntimeException( "Error getting connections", e );
- }
- }
- }
-
- /**
- * Process connection by adding it to connectionWriteQueue.
- */
- class ConnectionsAction implements Action1<ExportConnection> {
-
- public void call(ExportConnection conn) {
- //logger.debug( "Processing connections for entity: " + conn.getSourceUuid() );
- connectionWriteQueue.add(conn);
- connectionsQueued.getAndIncrement();
- }
- }
-
-
- // ----------------------------------------------------------------------------------------
- // writing data
-
- /**
- * Emits entities to be written.
- */
- class EntityWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportEntity> {
- BlockingQueue<ExportEntity> queue;
-
- public EntityWritesOnSubscribe( BlockingQueue<ExportEntity> queue ) {
- this.queue = queue;
- }
-
- public void call(Subscriber<? super ExportEntity> subscriber) {
- int count = 0;
-
- while ( true ) {
- ExportEntity entity = null;
- try {
- //logger.debug( "Wrote {}. Polling for entity to write...", count );
- activePollers.getAndIncrement();
- entity = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
- } catch (InterruptedException e) {
- logger.error("Entity poll interrupted", e);
- continue;
- } finally {
- activePollers.getAndDecrement();
- }
- if ( entity == null ) {
- break;
- }
- subscriber.onNext( entity );
- count++;
- }
-
- logger.info("Done. De-queued {} entities", count);
- if ( count > 0 ) {
- subscriber.onCompleted();
- } else {
- subscriber.unsubscribe();
- }
- }
- }
-
- /**
- * Emits connections to be written.
- */
- class ConnectionWritesOnSubscribe implements rx.Observable.OnSubscribe<ExportConnection> {
- BlockingQueue<ExportConnection> queue;
-
- public ConnectionWritesOnSubscribe( BlockingQueue<ExportConnection> queue ) {
- this.queue = queue;
- }
-
- public void call(Subscriber<? super ExportConnection> subscriber) {
- int count = 0;
- while ( true ) {
- ExportConnection connection = null;
- try {
- //logger.debug( "Wrote {}. Polling for connection to write", count );
- activePollers.getAndIncrement();
- connection = queue.poll( pollTimeoutSeconds, TimeUnit.SECONDS );
- } catch (InterruptedException e) {
- logger.error("Connection poll interrupted", e);
- continue;
- } finally {
- activePollers.getAndDecrement();
- }
- if ( connection == null ) {
- break;
- }
- subscriber.onNext( connection );
- count++;
- }
-
- logger.info("Done. De-queued {} connections", count);
if ( count > 0 ) {
subscriber.onCompleted();
+ logger.info("Completed entity {} type {} connections count {}",
+ new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
+
} else {
subscriber.unsubscribe();
+ logger.info( "Entity {} type {} has no connections",
+ exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
}
}
}
@@ -566,10 +351,9 @@ public class ExportApp extends ExportingToolBase {
public void call(ExportEntity entity) {
- boolean wroteData = false;
-
String [] parts = Thread.currentThread().getName().split("-");
- String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".entities";
+ String fileName = outputDir.getAbsolutePath() + File.separator
+ + applicationName.replace('/','-') + "-" + parts[3] + ".entities";
JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
if ( gen == null ) {
@@ -577,6 +361,7 @@ public class ExportApp extends ExportingToolBase {
// no generator so we are opening new file and writing the start of an array
try {
gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+ logger.info("Opened output file {}", fileName);
} catch (IOException e) {
throw new RuntimeException("Error opening output file: " + fileName, e);
}
@@ -589,15 +374,10 @@ public class ExportApp extends ExportingToolBase {
gen.writeObject( entity );
gen.writeRaw('\n');
entitiesWritten.getAndIncrement();
- wroteData = true;
} catch (IOException e) {
throw new RuntimeException("Error writing to output file: " + fileName, e);
}
-
- if ( !wroteData ) {
- emptyFiles.add( fileName );
- }
}
}
@@ -608,10 +388,9 @@ public class ExportApp extends ExportingToolBase {
public void call(ExportConnection conn) {
- boolean wroteData = false;
-
String [] parts = Thread.currentThread().getName().split("-");
- String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".connections";
+ String fileName = outputDir.getAbsolutePath() + File.separator
+ + applicationName.replace('/','-') + "-" + parts[3] + ".connections";
JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
if ( gen == null ) {
@@ -619,6 +398,7 @@ public class ExportApp extends ExportingToolBase {
// no generator so we are opening new file and writing the start of an array
try {
gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+ logger.info("Opened output file {}", fileName);
} catch (IOException e) {
throw new RuntimeException("Error opening output file: " + fileName, e);
}
@@ -631,18 +411,41 @@ public class ExportApp extends ExportingToolBase {
gen.writeObject( conn );
gen.writeRaw('\n');
connectionsWritten.getAndIncrement();
- wroteData = true;
} catch (IOException e) {
throw new RuntimeException("Error writing to output file: " + fileName, e);
}
+ }
+ }
+
+ private class FileWrapUpAction implements Action0 {
+ @Override
+ public void call() {
+
+ logger.info("-------------------------------------------------------------------");
+ logger.info("DONE! Entities: {} Connections: {}", entitiesWritten.get(), connectionsWritten.get());
+ logger.info("-------------------------------------------------------------------");
- if ( !wroteData ) {
- emptyFiles.add( fileName );
+ for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
+ try {
+ //gen.writeEndArray();
+ gen.flush();
+ gen.close();
+ } catch (IOException e) {
+ logger.error("Error closing output file", e);
+ }
+ }
+ for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
+ try {
+ //gen.writeEndArray();
+ gen.flush();
+ gen.close();
+ } catch (IOException e) {
+ logger.error("Error closing output file", e);
+ }
}
}
}
-
}
class ExportEntity {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
index 3de220c..a7c3905 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
@@ -138,7 +138,7 @@ public abstract class ExportingToolBase extends ToolBase {
if ( !file.mkdirs() ) {
- throw new RuntimeException( String.format( "Unable to create diretory %s", dirName ) );
+ throw new RuntimeException( String.format( "Unable to create directory %s", dirName ) );
}
return file;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7a870d69/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
index 14b9311..af8306f 100644
--- a/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
+++ b/stack/tools/src/test/java/org/apache/usergrid/tools/ExportAppTest.java
@@ -27,14 +27,26 @@ import org.apache.usergrid.persistence.EntityManager;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
public class ExportAppTest {
static final Logger logger = LoggerFactory.getLogger( ExportAppTest.class );
+
+ int NUM_COLLECTIONS = 5;
+ int NUM_ENTITIES = 10;
+ int NUM_CONNECTIONS = 1;
@ClassRule
public static ServiceITSetup setup = new ServiceITSetupImpl( ServiceITSuite.cassandraResource );
@@ -52,46 +64,110 @@ public class ExportAppTest {
ApplicationInfo appInfo = setup.getMgmtSvc().createApplication(
orgInfo.getOrganization().getUuid(), "app_" + rand );
- EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
+ final EntityManager em = setup.getEmf().getEntityManager( appInfo.getId() );
- // create 10 connected things
+ // create connected things
- List<Entity> connectedThings = new ArrayList<Entity>();
+ final List<Entity> connectedThings = new ArrayList<Entity>();
String connectedType = "connected_thing";
em.createApplicationCollection(connectedType);
- for ( int j=0; j<10; j++) {
+ for ( int j=0; j<NUM_CONNECTIONS; j++) {
final String name = "connected_thing_" + j;
connectedThings.add( em.create( connectedType, new HashMap<String, Object>() {{
put( "name", name );
}} ) );
}
- // create 10 collections of 10 things, every other thing is connected to the connected things
-
- for ( int i=0; i<10; i++) {
- String type = "thing_"+i;
- em.createApplicationCollection(type);
- for ( int j=0; j<10; j++) {
- final String name = "thing_" + j;
- Entity source = em.create(type, new HashMap<String, Object>() {{ put("name", name); }});
- if ( j % 2 == 0 ) {
- for ( Entity target : connectedThings ) {
- em.createConnection( source, "has", target );
+ // create collections of things, every other thing is connected to the connected things
+
+ final AtomicInteger entitiesCount = new AtomicInteger(0);
+ final AtomicInteger connectionCount = new AtomicInteger(0);
+
+ ExecutorService execService = Executors.newFixedThreadPool( 50);
+ final Scheduler scheduler = Schedulers.from( execService );
+
+ Observable.range( 0, NUM_COLLECTIONS ).flatMap( new Func1<Integer, Observable<?>>() {
+ @Override
+ public Observable<?> call(Integer i) {
+
+ return Observable.just( i ).doOnNext( new Action1<Integer>() {
+ @Override
+ public void call(Integer i) {
+
+ final String type = "thing_"+i;
+ try {
+ em.createApplicationCollection( type );
+ connectionCount.getAndIncrement();
+
+ } catch (Exception e) {
+ throw new RuntimeException( "Error creating collection", e );
+ }
+
+ Observable.range( 0, NUM_ENTITIES ).flatMap( new Func1<Integer, Observable<?>>() {
+ @Override
+ public Observable<?> call(Integer j) {
+ return Observable.just( j ).doOnNext( new Action1<Integer>() {
+ @Override
+ public void call(Integer j) {
+
+ final String name = "thing_" + j;
+ try {
+ final Entity source = em.create(
+ type, new HashMap<String, Object>() {{ put("name", name); }});
+ entitiesCount.getAndIncrement();
+ logger.info( "Created entity {} type {}", name, type );
+
+ for ( Entity target : connectedThings ) {
+ em.createConnection( source, "has", target );
+ connectionCount.getAndIncrement();
+ logger.info( "Created connection from entity {} type {} to {}",
+ new Object[]{name, type, target.getName()} );
+ }
+
+
+ } catch (Exception e) {
+ throw new RuntimeException( "Error creating collection", e );
+ }
+
+
+ }
+
+ } );
+
+ }
+ }, 50 ).subscribeOn( scheduler ).subscribe(); // toBlocking().last();
+
}
- }
+ } );
+
+
}
+ }, 30 ).subscribeOn( scheduler ).toBlocking().last();
+
+ while ( entitiesCount.get() < NUM_COLLECTIONS * NUM_ENTITIES ) {
+ Thread.sleep( 5000 );
+ logger.info( "Still working. Created {} entities and {} connections",
+ entitiesCount.get(), connectionCount.get() );
}
-
- // export to file
- String directoryName = "./target/export" + rand;
+ logger.info( "Done. Created {} entities and {} connections", entitiesCount.get(), connectionCount.get() );
+
+ long start = System.currentTimeMillis();
+
+ String directoryName = "target/export" + rand;
ExportApp exportApp = new ExportApp();
exportApp.startTool( new String[]{
"-application", appInfo.getName(),
+ "-readThreads", "50",
+ "-writeThreads", "10",
"-host", "localhost:" + ServiceITSuite.cassandraResource.getRpcPort(),
"-outputDir", directoryName
}, false );
+ logger.info("time = " + (System.currentTimeMillis() - start)/1000 + "s");
+
+
+
}
}
\ No newline at end of file