You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/07/22 17:17:43 UTC
[02/15] incubator-usergrid git commit: Add readThread and writeThread
CLI parameters.
Add readThread and writeThread CLI parameters.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2b65e619
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2b65e619
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2b65e619
Branch: refs/heads/two-dot-o
Commit: 2b65e619316b206720e574a21fe1b33462e0f2dd
Parents: b2bdbb5
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Jul 14 08:03:18 2015 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Jul 14 08:03:18 2015 -0400
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportApp.java | 153 +++++++++++++------
1 file changed, 106 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2b65e619/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index ceb3ecd..21c63a0 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.utils.StringUtils;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
@@ -46,11 +47,22 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
- * Export application's collections.
+ * Export all entities and connections of a Usergrid app.
+ *
+ * Exports data files to specified directory.
+ *
+ * Will create as many output files as there are writeThreads (by default: 10).
+ *
+ * Will create two types of files: *.uge for Usegrird entities and *.ugc for entity to entity connections.
+ *
+ * Every line of the data files is a complete JSON object.
*/
public class ExportApp extends ExportingToolBase {
static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
+ private static final String READ_THREAD_COUNT = "readThreads";
+ private static final String WRITE_THREAD_COUNT = "writeThreads";
+
// we will write two types of files: entities and connections
BlockingQueue<ExportEntity> entityWriteQueue = new LinkedBlockingQueue<ExportEntity>();
BlockingQueue<ExportConnection> connectionWriteQueue = new LinkedBlockingQueue<ExportConnection>();
@@ -58,10 +70,15 @@ public class ExportApp extends ExportingToolBase {
static final String APPLICATION_NAME = "application";
int pollTimeoutSeconds = 10;
+
+ int readThreadCount = 80;
+ int writeThreadCount = 10;
+
+ String applicationName;
+ String organizationName;
// limiting output threads will limit output files
- final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(8);
- final Scheduler scheduler = Schedulers.from( threadPoolExecutor );
+ Scheduler readScheduler;
Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
@@ -77,29 +94,37 @@ public class ExportApp extends ExportingToolBase {
ObjectMapper mapper = new ObjectMapper();
/**
- * Export admin users using multiple threads.
- * <p/>
- * How it works:
- * In main thread we query for IDs of all admin users, add each ID to read queue.
- * Read-queue workers read admin user data, add data to write queue.
- * One write-queue worker reads data writes to file.
+ * Tool entry point.
*/
@Override
public void runTool(CommandLine line) throws Exception {
- String applicationName = line.getOptionValue( APPLICATION_NAME );
-
-// if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
-// try {
-// readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
-// } catch (NumberFormatException nfe) {
-// logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
-// return;
-// }
-// } else {
-// readThreadCount = 20;
-// }
+ applicationName = line.getOptionValue( APPLICATION_NAME );
+
+ if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
+ try {
+ readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
+ } catch (NumberFormatException nfe) {
+ logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+ return;
+ }
+ }
+ if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
+ try {
+ writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
+ } catch (NumberFormatException nfe) {
+ logger.error( "-" + WRITE_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+ return;
+ }
+ }
+
+ ExecutorService readThreadPoolExecutor = Executors.newFixedThreadPool( readThreadCount );
+ readScheduler = Schedulers.from( readThreadPoolExecutor );
+
+ ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
+ final Scheduler writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+
startSpring();
setVerbose( line );
@@ -111,6 +136,7 @@ public class ExportApp extends ExportingToolBase {
UUID applicationId = emf.lookupApplication( applicationName );
final EntityManager em = emf.getEntityManager( applicationId );
+ organizationName = em.getApplication().getOrganizationName();
// start write queue workers
@@ -119,18 +145,18 @@ public class ExportApp extends ExportingToolBase {
entityWritesObservable.flatMap( new Func1<ExportEntity, Observable<?>>() {
public Observable<ExportEntity> call(ExportEntity exportEntity) {
return Observable.just(exportEntity).doOnNext(
- new EntityWriteAction() ).subscribeOn( scheduler );
+ new EntityWriteAction() ).subscribeOn( writeScheduler );
}
- },10).subscribeOn( scheduler ).subscribe();
+ },10).subscribeOn( writeScheduler ).subscribe();
ConnectionWritesOnSubscribe connectionWritesOnSub = new ConnectionWritesOnSubscribe( connectionWriteQueue );
rx.Observable connectionWritesObservable = rx.Observable.create( connectionWritesOnSub );
connectionWritesObservable.flatMap( new Func1<ExportConnection, Observable<?>>() {
public Observable<ExportConnection> call(ExportConnection connection ) {
return Observable.just(connection).doOnNext(
- new ConnectionWriteAction()).subscribeOn( scheduler );
+ new ConnectionWriteAction()).subscribeOn( writeScheduler );
}
- },10).subscribeOn( scheduler ).subscribe();
+ },10).subscribeOn( writeScheduler ).subscribe();
// start processing data and filling up write queues
@@ -139,9 +165,9 @@ public class ExportApp extends ExportingToolBase {
collectionsObservable.flatMap( new Func1<String, Observable<String>>() {
public Observable<String> call(String collection) {
return Observable.just(collection).doOnNext(
- new CollectionAction( em ) ).subscribeOn( Schedulers.io() );
+ new CollectionAction( em ) ).subscribeOn( readScheduler );
}
- },40).subscribeOn( Schedulers.io() ).subscribe();
+ },40).subscribeOn( readScheduler ).subscribe();
// wait for write thread pollers to get started
@@ -192,14 +218,18 @@ public class ExportApp extends ExportingToolBase {
Options options = super.createOptions();
- Option readThreads = OptionBuilder.hasArg().withType("")
+ Option appNameOption = OptionBuilder.hasArg().withType("")
.withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
- options.addOption( readThreads );
-
-// Option readThreads = OptionBuilder
-// .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
-// options.addOption( readThreads );
-
+ options.addOption( appNameOption );
+
+ Option readThreadsOption = OptionBuilder
+ .hasArg().withType(0).withDescription( "Read Threads -" + READ_THREAD_COUNT ).create( READ_THREAD_COUNT );
+ options.addOption( readThreadsOption );
+
+ Option writeThreadsOption = OptionBuilder
+ .hasArg().withType(0).withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+ options.addOption( writeThreadsOption );
+
return options;
}
@@ -276,9 +306,13 @@ public class ExportApp extends ExportingToolBase {
}
dictionariesByName.put( dictionary, dict );
}
+
ExportEntity exportEntity = new ExportEntity(
- em.getApplication().getApplicationName(),
- entity, dictionariesByName );
+ organizationName,
+ applicationName,
+ entity,
+ dictionariesByName );
+
subscriber.onNext( exportEntity );
count++;
@@ -333,11 +367,14 @@ public class ExportApp extends ExportingToolBase {
for (Entity connectedEntity : results.getEntities()) {
try {
+
ExportConnection connection = new ExportConnection(
- em.getApplication().getApplicationName(),
+ applicationName,
+ organizationName,
connectionType,
exportEntity.getEntity().getUuid(),
connectedEntity.getUuid());
+
subscriber.onNext( connection );
count++;
@@ -379,9 +416,9 @@ public class ExportApp extends ExportingToolBase {
entityObservable.flatMap( new Func1<ExportEntity, Observable<ExportEntity>>() {
public Observable<ExportEntity> call(ExportEntity exportEntity) {
return Observable.just(exportEntity).doOnNext(
- new EntityAction( em ) ).subscribeOn( Schedulers.io() );
+ new EntityAction( em ) ).subscribeOn( readScheduler );
}
- }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+ }, 8).subscribeOn(readScheduler).toBlocking().last();
}
}
@@ -413,9 +450,9 @@ public class ExportApp extends ExportingToolBase {
entityObservable.flatMap( new Func1<ExportConnection, Observable<ExportConnection>>() {
public Observable<ExportConnection> call(ExportConnection connection) {
return Observable.just(connection).doOnNext(
- new ConnectionsAction() ).subscribeOn( Schedulers.io() );
+ new ConnectionsAction() ).subscribeOn(readScheduler);
}
- }, 8).subscribeOn(Schedulers.io()).toBlocking().last();
+ }, 8).subscribeOn(readScheduler).toBlocking().last();
}
} catch (Exception e) {
@@ -472,7 +509,7 @@ public class ExportApp extends ExportingToolBase {
count++;
}
- logger.info("Done. Wrote {} entities", count);
+ logger.info("Done. De-queued {} entities", count);
if ( count > 0 ) {
subscriber.onCompleted();
} else {
@@ -513,7 +550,7 @@ public class ExportApp extends ExportingToolBase {
count++;
}
- logger.info("Done. Wrote {} connections", count);
+ logger.info("Done. De-queued {} connections", count);
if ( count > 0 ) {
subscriber.onCompleted();
} else {
@@ -531,7 +568,8 @@ public class ExportApp extends ExportingToolBase {
boolean wroteData = false;
- String fileName = "target/" + Thread.currentThread().getName() + ".ude";
+ String [] parts = Thread.currentThread().getName().split("-");
+ String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".entities";
JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
if ( gen == null ) {
@@ -572,7 +610,8 @@ public class ExportApp extends ExportingToolBase {
boolean wroteData = false;
- String fileName = "target/" + Thread.currentThread().getName() + ".ugc";
+ String [] parts = Thread.currentThread().getName().split("-");
+ String fileName = "target/" + applicationName + "-" + organizationName + parts[3] + ".connections";
JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
if ( gen == null ) {
@@ -607,10 +646,12 @@ public class ExportApp extends ExportingToolBase {
}
class ExportEntity {
+ private String organization;
private String application;
private Entity entity;
private Map<String, Object> dictionaries;
- public ExportEntity( String application, Entity entity, Map<String, Object> dictionaries ) {
+ public ExportEntity( String organization, String application, Entity entity, Map<String, Object> dictionaries ) {
+ this.organization = organization;
this.application = application;
this.entity = entity;
this.dictionaries = dictionaries;
@@ -639,14 +680,24 @@ class ExportEntity {
public void setDictionaries(Map<String, Object> dictionaries) {
this.dictionaries = dictionaries;
}
+
+ public String getOrganization() {
+ return organization;
+ }
+
+ public void setOrganization(String organization) {
+ this.organization = organization;
+ }
}
class ExportConnection {
+ private String organization;
private String application;
private String connectionType;
private UUID sourceUuid;
private UUID targetUuid;
- public ExportConnection(String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+ public ExportConnection(String organization, String application, String connectionType, UUID sourceUuid, UUID targetUuid) {
+ this.organization= organization;
this.application = application;
this.connectionType = connectionType;
this.sourceUuid = sourceUuid;
@@ -684,4 +735,12 @@ class ExportConnection {
public void setTargetUuid(UUID targetUuid) {
this.targetUuid = targetUuid;
}
+
+ public String getOrganization() {
+ return organization;
+ }
+
+ public void setOrganization(String organization) {
+ this.organization = organization;
+ }
}