You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/11/20 00:21:04 UTC
[4/4] usergrid git commit: Fixes for tools.jar not compiling in 2.0
Fixes for tools.jar not compiling in 2.0
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/be483819
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/be483819
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/be483819
Branch: refs/heads/RemoveCollectionDuplicates
Commit: be483819216fcfe3bcf18801936178bf6a341a98
Parents: 543d2b4
Author: George Reyes <gr...@apache.org>
Authored: Thu Nov 19 15:20:58 2015 -0800
Committer: George Reyes <gr...@apache.org>
Committed: Thu Nov 19 15:20:58 2015 -0800
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ExportAdmins.java | 976 +++++------
.../org/apache/usergrid/tools/ExportApp.java | 741 ++++-----
.../org/apache/usergrid/tools/ImportAdmins.java | 1536 +++++++++---------
.../org/apache/usergrid/tools/IndexRebuild.java | 12 +-
.../RepairingMismatchedApplicationMetadata.java | 3 +-
.../tools/UserCollectionsDuplicateFix.java | 71 +
.../org/apache/usergrid/tools/UserManager.java | 4 +-
.../apache/usergrid/tools/WarehouseExport.java | 516 ------
.../apache/usergrid/tools/WarehouseUpsert.java | 156 --
.../apache/usergrid/tools/ExportAppTest.java | 140 +-
.../usergrid/tools/ExportImportAdminsTest.java | 420 ++---
11 files changed, 1995 insertions(+), 2580 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/be483819/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
index 0bb74ab..96274b0 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportAdmins.java
@@ -55,495 +55,499 @@ import java.util.concurrent.atomic.AtomicInteger;
* cassandra.lock.keyspace=My_Usergrid_Locks
*/
public class ExportAdmins extends ExportingToolBase {
-
- static final Logger logger = LoggerFactory.getLogger( ExportAdmins.class );
-
- public static final String ADMIN_USERS_PREFIX = "admin-users";
- public static final String ADMIN_USER_METADATA_PREFIX = "admin-user-metadata";
-
- // map admin user UUID to list of organizations to which user belongs
- private Map<UUID, List<Org>> userToOrgsMap = new HashMap<UUID, List<Org>>(50000);
-
- private Map<String, UUID> orgNameToUUID = new HashMap<String, UUID>(50000);
-
- private Set<UUID> orgsWritten = new HashSet<UUID>(50000);
-
- private Set<UUID> duplicateOrgs = new HashSet<UUID>();
-
- private static final String READ_THREAD_COUNT = "readThreads";
- private int readThreadCount;
-
- AtomicInteger userCount = new AtomicInteger( 0 );
-
- boolean ignoreInvalidUsers = false; // true to ignore users with no credentials or orgs
-
-
- /**
- * Represents an AdminUser that has been read and is ready for export.
- */
- class AdminUserWriteTask {
- Entity adminUser;
- Map<String, Map<Object, Object>> dictionariesByName;
- BiMap<UUID, String> orgNamesByUuid;
- }
-
-
- /**
- * Represents an organization associated with a user.
- */
- private class Org {
- UUID orgId;
- String orgName;
- public Org( UUID orgId, String orgName ) {
- this.orgId = orgId;
- this.orgName = orgName;
- }
- }
-
-
- /**
- * Export admin users using multiple threads.
- * <p/>
- * How it works:
- * In main thread we query for IDs of all admin users, add each ID to read queue.
- * Read-queue workers read admin user data, add data to write queue.
- * One write-queue worker reads data writes to file.
- */
@Override
- public void runTool(CommandLine line) throws Exception {
- startSpring();
+ public void runTool( final CommandLine line ) throws Exception {
- setVerbose( line );
-
- applyOrgId( line );
- prepareBaseOutputFileName( line );
- outputDir = createOutputParentDir();
- logger.info( "Export directory: " + outputDir.getAbsolutePath() );
-
- if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
- try {
- readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
- } catch (NumberFormatException nfe) {
- logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
- return;
- }
- } else {
- readThreadCount = 20;
- }
-
- buildOrgMap();
-
- // start write queue worker
-
- BlockingQueue<AdminUserWriteTask> writeQueue = new LinkedBlockingQueue<AdminUserWriteTask>();
- AdminUserWriter adminUserWriter = new AdminUserWriter( writeQueue );
- Thread writeThread = new Thread( adminUserWriter );
- writeThread.start();
- logger.debug( "Write thread started" );
-
- // start read queue workers
-
- BlockingQueue<UUID> readQueue = new LinkedBlockingQueue<UUID>();
- List<AdminUserReader> readers = new ArrayList<AdminUserReader>();
- for (int i = 0; i < readThreadCount; i++) {
- AdminUserReader worker = new AdminUserReader( readQueue, writeQueue );
- Thread readerThread = new Thread( worker, "AdminUserReader-" + i );
- readerThread.start();
- readers.add( worker );
- }
- logger.debug( readThreadCount + " read worker threads started" );
-
- // query for IDs, add each to read queue
-
- Query query = new Query();
- query.setLimit( MAX_ENTITY_FETCH );
- query.setResultsLevel( Query.Level.IDS );
- EntityManager em = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
- Results ids = em.searchCollection( em.getApplicationRef(), "users", query );
-
- while (ids.size() > 0) {
- for (UUID uuid : ids.getIds()) {
- readQueue.add( uuid );
- //logger.debug( "Added uuid to readQueue: " + uuid );
- }
- if (ids.getCursor() == null) {
- break;
- }
- query.setCursor( ids.getCursor() );
- ids = em.searchCollection( em.getApplicationRef(), "users", query );
- }
-
- logger.debug( "Waiting for write thread to complete" );
-
- boolean done = false;
- while ( !done ) {
- writeThread.join( 10000, 0 );
- done = !writeThread.isAlive();
- logger.info( "Wrote {} users", userCount.get() );
- }
}
-
- @Override
- @SuppressWarnings("static-access")
- public Options createOptions() {
-
- Options options = super.createOptions();
-
- Option readThreads = OptionBuilder
- .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
-
- options.addOption( readThreads );
- return options;
- }
-
-
- /**
- * Shouldn't have to do this but getOrganizationsForAdminUser() is not 100% reliable in some Usergrid installations.
- */
- private void buildOrgMap() throws Exception {
-
- logger.info( "Building org map" );
-
- ExecutorService execService = Executors.newFixedThreadPool( this.readThreadCount );
-
- EntityManager em = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
- String queryString = "select *";
- Query query = Query.fromQL( queryString );
- query.withLimit( 1000 );
- Results organizations = null;
- int count = 0;
- do {
- organizations = em.searchCollection( em.getApplicationRef(), "groups", query );
- for ( Entity organization : organizations.getEntities() ) {
- execService.submit( new OrgMapWorker( organization ) );
- count++;
- }
-
- if ( count % 1000 == 0 ) {
- logger.info("Queued {} org map workers", count);
- }
- query.setCursor( organizations.getCursor() );
- }
- while ( organizations != null && organizations.hasCursor() );
-
- execService.shutdown();
- while ( !execService.awaitTermination( 10, TimeUnit.SECONDS ) ) {
- logger.info( "Processed {} orgs for map", userToOrgsMap.size() );
- }
-
- logger.info("Org map complete, counted {} organizations", count);
- }
-
-
- public class OrgMapWorker implements Runnable {
- private final Entity orgEntity;
-
- public OrgMapWorker( Entity orgEntity ) {
- this.orgEntity = orgEntity;
- }
-
- @Override
- public void run() {
- try {
- final String orgName = orgEntity.getProperty( "path" ).toString();
- final UUID orgId = orgEntity.getUuid();
-
- for (UserInfo user : managementService.getAdminUsersForOrganization( orgEntity.getUuid() )) {
- try {
- Entity admin = managementService.getAdminUserEntityByUuid( user.getUuid() );
- Org org = new Org( orgId, orgName );
-
- synchronized (userToOrgsMap) {
- List<Org> userOrgs = userToOrgsMap.get( admin.getUuid() );
- if (userOrgs == null) {
- userOrgs = new ArrayList<Org>();
- userToOrgsMap.put( admin.getUuid(), userOrgs );
- }
- userOrgs.add( org );
- }
-
- synchronized (orgNameToUUID) {
- UUID existingOrgId = orgNameToUUID.get( orgName );
- ;
- if (existingOrgId != null && !orgId.equals( existingOrgId )) {
- if ( !duplicateOrgs.contains( orgId )) {
- logger.info( "Org {}:{} is a duplicate", orgId, orgName );
- duplicateOrgs.add(orgId);
- }
- } else {
- orgNameToUUID.put( orgName, orgId );
- }
- }
-
- } catch (Exception e) {
- logger.warn( "Cannot get orgs for userId {}", user.getUuid() );
- }
- }
- } catch ( Exception e ) {
- logger.error("Error getting users for org {}:{}", orgEntity.getName(), orgEntity.getUuid());
- }
- }
- }
-
-
- public class AdminUserReader implements Runnable {
-
- private final BlockingQueue<UUID> readQueue;
- private final BlockingQueue<AdminUserWriteTask> writeQueue;
-
- public AdminUserReader( BlockingQueue<UUID> readQueue, BlockingQueue<AdminUserWriteTask> writeQueue ) {
- this.readQueue = readQueue;
- this.writeQueue = writeQueue;
- }
-
-
- @Override
- public void run() {
- try {
- readAndQueueAdminUsers();
- } catch (Exception e) {
- logger.error("Error reading data for export", e);
- }
- }
-
-
- private void readAndQueueAdminUsers() throws Exception {
-
- EntityManager em = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
-
- while ( true ) {
-
- UUID uuid = null;
- try {
- uuid = readQueue.poll( 30, TimeUnit.SECONDS );
- if ( uuid == null ) {
- break;
- }
-
- Entity entity = em.get( uuid );
-
- AdminUserWriteTask task = new AdminUserWriteTask();
- task.adminUser = entity;
-
- addDictionariesToTask( task, entity );
- addOrganizationsToTask( task );
-
- String actionTaken = "Processed";
-
- if (ignoreInvalidUsers && (task.orgNamesByUuid.isEmpty()
- || task.dictionariesByName.isEmpty()
- || task.dictionariesByName.get( "credentials" ).isEmpty())) {
-
- actionTaken = "Ignored";
-
- } else {
- writeQueue.add( task );
- }
-
- Map<String, Object> creds = (Map<String, Object>) (task.dictionariesByName.isEmpty() ?
- 0 : task.dictionariesByName.get( "credentials" ));
-
- logger.error( "{} admin user {}:{}:{} has organizations={} dictionaries={} credentials={}",
- new Object[]{
- actionTaken,
- task.adminUser.getProperty( "username" ),
- task.adminUser.getProperty( "email" ),
- task.adminUser.getUuid(),
- task.orgNamesByUuid.size(),
- task.dictionariesByName.size(),
- creds == null ? 0 : creds.size()
- } );
-
- } catch ( Exception e ) {
- logger.error("Error reading data for user " + uuid, e );
- }
- }
- }
-
-
- private void addDictionariesToTask(AdminUserWriteTask task, Entity entity) throws Exception {
- EntityManager em = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
-
- task.dictionariesByName = new HashMap<String, Map<Object, Object>>();
-
- Set<String> dictionaries = em.getDictionaries( entity );
-
- if ( dictionaries.isEmpty() ) {
- logger.error("User {}:{} has no dictionaries", task.adminUser.getName(), task.adminUser.getUuid() );
- return;
- }
-
- Map<Object, Object> credentialsDictionary = em.getDictionaryAsMap( entity, "credentials" );
-
- if ( credentialsDictionary != null ) {
- task.dictionariesByName.put( "credentials", credentialsDictionary );
- }
- }
-
- private void addOrganizationsToTask(AdminUserWriteTask task) throws Exception {
-
- task.orgNamesByUuid = managementService.getOrganizationsForAdminUser( task.adminUser.getUuid() );
-
-<<<<<<< HEAD
- List<Org> orgs = orgMap.get( task.adminUser.getProperty( "username" ).toString().toLowerCase() );
-
-=======
- List<Org> orgs = userToOrgsMap.get( task.adminUser.getProperty( "username" ).toString().toLowerCase() );
-
->>>>>>> master
- if ( orgs != null && task.orgNamesByUuid.size() < orgs.size() ) {
-
- // list of orgs from getOrganizationsForAdminUser() is less than expected, use userToOrgsMap
- BiMap<UUID, String> bimap = HashBiMap.create();
- for (Org org : orgs) {
- bimap.put( org.orgId, org.orgName );
- }
- task.orgNamesByUuid = bimap;
- }
-<<<<<<< HEAD
-
- if ( task.orgNamesByUuid.isEmpty() ) {
- logger.error("{}:{}:{} has no orgs", new Object[] {
- task.adminUser.getProperty("username"),
- task.adminUser.getProperty("email"),
- task.adminUser.getUuid() } );
- }
-=======
->>>>>>> master
- }
- }
-
- class AdminUserWriter implements Runnable {
-
- private final BlockingQueue<AdminUserWriteTask> taskQueue;
-
- public AdminUserWriter( BlockingQueue<AdminUserWriteTask> taskQueue ) {
- this.taskQueue = taskQueue;
- }
-
-
- @Override
- public void run() {
- try {
- writeEntities();
- } catch (Exception e) {
- logger.error("Error writing export data", e);
- }
- }
-
-
- private void writeEntities() throws Exception {
- EntityManager em = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
-
- // write one JSON file for management application users
- JsonGenerator usersFile =
- getJsonGenerator( createOutputFile( ADMIN_USERS_PREFIX, em.getApplication().getName() ) );
- usersFile.writeStartArray();
-
- // write one JSON file for metadata: collections, connections and dictionaries of those users
- JsonGenerator metadataFile =
- getJsonGenerator( createOutputFile( ADMIN_USER_METADATA_PREFIX, em.getApplication().getName() ) );
- metadataFile.writeStartObject();
-
- while ( true ) {
-
- try {
- AdminUserWriteTask task = taskQueue.poll( 30, TimeUnit.SECONDS );
- if ( task == null ) {
- break;
- }
-
- // write user to application file
- usersFile.writeObject( task.adminUser );
- echo( task.adminUser );
-
- // write metadata to metadata file
- metadataFile.writeFieldName( task.adminUser.getUuid().toString() );
- metadataFile.writeStartObject();
-
- saveOrganizations( metadataFile, task );
- saveDictionaries( metadataFile, task );
-
- metadataFile.writeEndObject();
-
- logger.debug( "Exported user {}:{}:{}", new Object[] {
- task.adminUser.getProperty("username"),
- task.adminUser.getProperty("email"),
- task.adminUser.getUuid() } );
-
- userCount.addAndGet( 1 );
-
- } catch (InterruptedException e) {
- throw new Exception("Interrupted", e);
- }
- }
-
- metadataFile.writeEndObject();
- metadataFile.close();
-
- usersFile.writeEndArray();
- usersFile.close();
-
- logger.info( "Exported TOTAL {} admin users and {} organizations", userCount.get(), orgsWritten.size() );
- }
-
-
- private void saveDictionaries( JsonGenerator jg, AdminUserWriteTask task ) throws Exception {
-
- jg.writeFieldName( "dictionaries" );
- jg.writeStartObject();
-
- for (String dictionary : task.dictionariesByName.keySet() ) {
-
- Map<Object, Object> dict = task.dictionariesByName.get( dictionary );
-
- if (dict.isEmpty()) {
- continue;
- }
-
- jg.writeFieldName( dictionary );
-
- jg.writeStartObject();
-
- for (Map.Entry<Object, Object> entry : dict.entrySet()) {
- jg.writeFieldName( entry.getKey().toString() );
- jg.writeObject( entry.getValue() );
- }
-
- jg.writeEndObject();
- }
- jg.writeEndObject();
- }
-
-
- private void saveOrganizations( JsonGenerator jg, AdminUserWriteTask task ) throws Exception {
-
- final BiMap<UUID, String> orgs = task.orgNamesByUuid;
-
- jg.writeFieldName( "organizations" );
-
- jg.writeStartArray();
-
- for (UUID uuid : orgs.keySet()) {
-
- jg.writeStartObject();
-
- jg.writeFieldName( "uuid" );
- jg.writeObject( uuid );
-
- jg.writeFieldName( "name" );
- jg.writeObject( orgs.get( uuid ) );
-
- jg.writeEndObject();
-
- synchronized (orgsWritten) {
- orgsWritten.add( uuid );
- }
- }
-
- jg.writeEndArray();
- }
- }
+ // static final Logger logger = LoggerFactory.getLogger( ExportAdmins.class );
+//
+// public static final String ADMIN_USERS_PREFIX = "admin-users";
+// public static final String ADMIN_USER_METADATA_PREFIX = "admin-user-metadata";
+//
+// // map admin user UUID to list of organizations to which user belongs
+// private Map<UUID, List<Org>> userToOrgsMap = new HashMap<UUID, List<Org>>(50000);
+//
+// private Map<String, UUID> orgNameToUUID = new HashMap<String, UUID>(50000);
+//
+// private Set<UUID> orgsWritten = new HashSet<UUID>(50000);
+//
+// private Set<UUID> duplicateOrgs = new HashSet<UUID>();
+//
+// private static final String READ_THREAD_COUNT = "readThreads";
+// private int readThreadCount;
+//
+// AtomicInteger userCount = new AtomicInteger( 0 );
+//
+// boolean ignoreInvalidUsers = false; // true to ignore users with no credentials or orgs
+//
+//
+// /**
+// * Represents an AdminUser that has been read and is ready for export.
+// */
+// class AdminUserWriteTask {
+// Entity adminUser;
+// Map<String, Map<Object, Object>> dictionariesByName;
+// BiMap<UUID, String> orgNamesByUuid;
+// }
+//
+//
+// /**
+// * Represents an organization associated with a user.
+// */
+// private class Org {
+// UUID orgId;
+// String orgName;
+// public Org( UUID orgId, String orgName ) {
+// this.orgId = orgId;
+// this.orgName = orgName;
+// }
+// }
+//
+//
+// /**
+// * Export admin users using multiple threads.
+// * <p/>
+// * How it works:
+// * In main thread we query for IDs of all admin users, add each ID to read queue.
+// * Read-queue workers read admin user data, add data to write queue.
+// * One write-queue worker reads data writes to file.
+// */
+// @Override
+// public void runTool(CommandLine line) throws Exception {
+// startSpring();
+//
+// setVerbose( line );
+//
+// applyOrgId( line );
+// prepareBaseOutputFileName( line );
+// outputDir = createOutputParentDir();
+// logger.info( "Export directory: " + outputDir.getAbsolutePath() );
+//
+// if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
+// try {
+// readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
+// } catch (NumberFormatException nfe) {
+// logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+// return;
+// }
+// } else {
+// readThreadCount = 20;
+// }
+//
+// buildOrgMap();
+//
+// // start write queue worker
+//
+// BlockingQueue<AdminUserWriteTask> writeQueue = new LinkedBlockingQueue<AdminUserWriteTask>();
+// AdminUserWriter adminUserWriter = new AdminUserWriter( writeQueue );
+// Thread writeThread = new Thread( adminUserWriter );
+// writeThread.start();
+// logger.debug( "Write thread started" );
+//
+// // start read queue workers
+//
+// BlockingQueue<UUID> readQueue = new LinkedBlockingQueue<UUID>();
+// List<AdminUserReader> readers = new ArrayList<AdminUserReader>();
+// for (int i = 0; i < readThreadCount; i++) {
+// AdminUserReader worker = new AdminUserReader( readQueue, writeQueue );
+// Thread readerThread = new Thread( worker, "AdminUserReader-" + i );
+// readerThread.start();
+// readers.add( worker );
+// }
+// logger.debug( readThreadCount + " read worker threads started" );
+//
+// // query for IDs, add each to read queue
+//
+// Query query = new Query();
+// query.setLimit( MAX_ENTITY_FETCH );
+// query.setResultsLevel( Query.Level.IDS );
+// EntityManager em = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
+// Results ids = em.searchCollection( em.getApplicationRef(), "users", query );
+//
+// while (ids.size() > 0) {
+// for (UUID uuid : ids.getIds()) {
+// readQueue.add( uuid );
+// //logger.debug( "Added uuid to readQueue: " + uuid );
+// }
+// if (ids.getCursor() == null) {
+// break;
+// }
+// query.setCursor( ids.getCursor() );
+// ids = em.searchCollection( em.getApplicationRef(), "users", query );
+// }
+//
+// logger.debug( "Waiting for write thread to complete" );
+//
+// boolean done = false;
+// while ( !done ) {
+// writeThread.join( 10000, 0 );
+// done = !writeThread.isAlive();
+// logger.info( "Wrote {} users", userCount.get() );
+// }
+// }
+//
+//
+// @Override
+// @SuppressWarnings("static-access")
+// public Options createOptions() {
+//
+// Options options = super.createOptions();
+//
+// Option readThreads = OptionBuilder
+// .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
+//
+// options.addOption( readThreads );
+// return options;
+// }
+//
+//
+// /**
+// * Shouldn't have to do this but getOrganizationsForAdminUser() is not 100% reliable in some Usergrid installations.
+// */
+// private void buildOrgMap() throws Exception {
+//
+// logger.info( "Building org map" );
+//
+// ExecutorService execService = Executors.newFixedThreadPool( this.readThreadCount );
+//
+// EntityManager em = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
+// String queryString = "select *";
+// Query query = Query.fromQL( queryString );
+// query.withLimit( 1000 );
+// Results organizations = null;
+// int count = 0;
+// do {
+// organizations = em.searchCollection( em.getApplicationRef(), "groups", query );
+// for ( Entity organization : organizations.getEntities() ) {
+// execService.submit( new OrgMapWorker( organization ) );
+// count++;
+// }
+//
+// if ( count % 1000 == 0 ) {
+// logger.info("Queued {} org map workers", count);
+// }
+// query.setCursor( organizations.getCursor() );
+// }
+// while ( organizations != null && organizations.hasCursor() );
+//
+// execService.shutdown();
+// while ( !execService.awaitTermination( 10, TimeUnit.SECONDS ) ) {
+// logger.info( "Processed {} orgs for map", userToOrgsMap.size() );
+// }
+//
+// logger.info("Org map complete, counted {} organizations", count);
+// }
+//
+//
+// public class OrgMapWorker implements Runnable {
+// private final Entity orgEntity;
+//
+// public OrgMapWorker( Entity orgEntity ) {
+// this.orgEntity = orgEntity;
+// }
+//
+// @Override
+// public void run() {
+// try {
+// final String orgName = orgEntity.getProperty( "path" ).toString();
+// final UUID orgId = orgEntity.getUuid();
+//
+// for (UserInfo user : managementService.getAdminUsersForOrganization( orgEntity.getUuid() )) {
+// try {
+// Entity admin = managementService.getAdminUserEntityByUuid( user.getUuid() );
+// Org org = new Org( orgId, orgName );
+//
+// synchronized (userToOrgsMap) {
+// List<Org> userOrgs = userToOrgsMap.get( admin.getUuid() );
+// if (userOrgs == null) {
+// userOrgs = new ArrayList<Org>();
+// userToOrgsMap.put( admin.getUuid(), userOrgs );
+// }
+// userOrgs.add( org );
+// }
+//
+// synchronized (orgNameToUUID) {
+// UUID existingOrgId = orgNameToUUID.get( orgName );
+// ;
+// if (existingOrgId != null && !orgId.equals( existingOrgId )) {
+// if ( !duplicateOrgs.contains( orgId )) {
+// logger.info( "Org {}:{} is a duplicate", orgId, orgName );
+// duplicateOrgs.add(orgId);
+// }
+// } else {
+// orgNameToUUID.put( orgName, orgId );
+// }
+// }
+//
+// } catch (Exception e) {
+// logger.warn( "Cannot get orgs for userId {}", user.getUuid() );
+// }
+// }
+// } catch ( Exception e ) {
+// logger.error("Error getting users for org {}:{}", orgEntity.getName(), orgEntity.getUuid());
+// }
+// }
+// }
+//
+//
+// public class AdminUserReader implements Runnable {
+//
+// private final BlockingQueue<UUID> readQueue;
+// private final BlockingQueue<AdminUserWriteTask> writeQueue;
+//
+// public AdminUserReader( BlockingQueue<UUID> readQueue, BlockingQueue<AdminUserWriteTask> writeQueue ) {
+// this.readQueue = readQueue;
+// this.writeQueue = writeQueue;
+// }
+//
+//
+// @Override
+// public void run() {
+// try {
+// readAndQueueAdminUsers();
+// } catch (Exception e) {
+// logger.error("Error reading data for export", e);
+// }
+// }
+//
+//
+// private void readAndQueueAdminUsers() throws Exception {
+//
+// EntityManager em = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
+//
+// while ( true ) {
+//
+// UUID uuid = null;
+// try {
+// uuid = readQueue.poll( 30, TimeUnit.SECONDS );
+// if ( uuid == null ) {
+// break;
+// }
+//
+// Entity entity = em.get( uuid );
+//
+// AdminUserWriteTask task = new AdminUserWriteTask();
+// task.adminUser = entity;
+//
+// addDictionariesToTask( task, entity );
+// addOrganizationsToTask( task );
+//
+// String actionTaken = "Processed";
+//
+// if (ignoreInvalidUsers && (task.orgNamesByUuid.isEmpty()
+// || task.dictionariesByName.isEmpty()
+// || task.dictionariesByName.get( "credentials" ).isEmpty())) {
+//
+// actionTaken = "Ignored";
+//
+// } else {
+// writeQueue.add( task );
+// }
+//
+// Map<String, Object> creds = (Map<String, Object>) (task.dictionariesByName.isEmpty() ?
+// 0 : task.dictionariesByName.get( "credentials" ));
+//
+// logger.error( "{} admin user {}:{}:{} has organizations={} dictionaries={} credentials={}",
+// new Object[]{
+// actionTaken,
+// task.adminUser.getProperty( "username" ),
+// task.adminUser.getProperty( "email" ),
+// task.adminUser.getUuid(),
+// task.orgNamesByUuid.size(),
+// task.dictionariesByName.size(),
+// creds == null ? 0 : creds.size()
+// } );
+//
+// } catch ( Exception e ) {
+// logger.error("Error reading data for user " + uuid, e );
+// }
+// }
+// }
+//
+//
+// private void addDictionariesToTask(AdminUserWriteTask task, Entity entity) throws Exception {
+// EntityManager em = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
+//
+// task.dictionariesByName = new HashMap<String, Map<Object, Object>>();
+//
+// Set<String> dictionaries = em.getDictionaries( entity );
+//
+// if ( dictionaries.isEmpty() ) {
+// logger.error("User {}:{} has no dictionaries", task.adminUser.getName(), task.adminUser.getUuid() );
+// return;
+// }
+//
+// Map<Object, Object> credentialsDictionary = em.getDictionaryAsMap( entity, "credentials" );
+//
+// if ( credentialsDictionary != null ) {
+// task.dictionariesByName.put( "credentials", credentialsDictionary );
+// }
+// }
+//
+// private void addOrganizationsToTask(AdminUserWriteTask task) throws Exception {
+//
+// task.orgNamesByUuid = managementService.getOrganizationsForAdminUser( task.adminUser.getUuid() );
+//
+//<<<<<<< HEAD
+// List<Org> orgs = orgMap.get( task.adminUser.getProperty( "username" ).toString().toLowerCase() );
+//
+//=======
+// List<Org> orgs = userToOrgsMap.get( task.adminUser.getProperty( "username" ).toString().toLowerCase() );
+//
+//>>>>>>> master
+// if ( orgs != null && task.orgNamesByUuid.size() < orgs.size() ) {
+//
+// // list of orgs from getOrganizationsForAdminUser() is less than expected, use userToOrgsMap
+// BiMap<UUID, String> bimap = HashBiMap.create();
+// for (Org org : orgs) {
+// bimap.put( org.orgId, org.orgName );
+// }
+// task.orgNamesByUuid = bimap;
+// }
+//<<<<<<< HEAD
+//
+// if ( task.orgNamesByUuid.isEmpty() ) {
+// logger.error("{}:{}:{} has no orgs", new Object[] {
+// task.adminUser.getProperty("username"),
+// task.adminUser.getProperty("email"),
+// task.adminUser.getUuid() } );
+// }
+//=======
+//>>>>>>> master
+// }
+// }
+//
+// class AdminUserWriter implements Runnable {
+//
+// private final BlockingQueue<AdminUserWriteTask> taskQueue;
+//
+// public AdminUserWriter( BlockingQueue<AdminUserWriteTask> taskQueue ) {
+// this.taskQueue = taskQueue;
+// }
+//
+//
+// @Override
+// public void run() {
+// try {
+// writeEntities();
+// } catch (Exception e) {
+// logger.error("Error writing export data", e);
+// }
+// }
+//
+//
+// private void writeEntities() throws Exception {
+// EntityManager em = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID );
+//
+// // write one JSON file for management application users
+// JsonGenerator usersFile =
+// getJsonGenerator( createOutputFile( ADMIN_USERS_PREFIX, em.getApplication().getName() ) );
+// usersFile.writeStartArray();
+//
+// // write one JSON file for metadata: collections, connections and dictionaries of those users
+// JsonGenerator metadataFile =
+// getJsonGenerator( createOutputFile( ADMIN_USER_METADATA_PREFIX, em.getApplication().getName() ) );
+// metadataFile.writeStartObject();
+//
+// while ( true ) {
+//
+// try {
+// AdminUserWriteTask task = taskQueue.poll( 30, TimeUnit.SECONDS );
+// if ( task == null ) {
+// break;
+// }
+//
+// // write user to application file
+// usersFile.writeObject( task.adminUser );
+// echo( task.adminUser );
+//
+// // write metadata to metadata file
+// metadataFile.writeFieldName( task.adminUser.getUuid().toString() );
+// metadataFile.writeStartObject();
+//
+// saveOrganizations( metadataFile, task );
+// saveDictionaries( metadataFile, task );
+//
+// metadataFile.writeEndObject();
+//
+// logger.debug( "Exported user {}:{}:{}", new Object[] {
+// task.adminUser.getProperty("username"),
+// task.adminUser.getProperty("email"),
+// task.adminUser.getUuid() } );
+//
+// userCount.addAndGet( 1 );
+//
+// } catch (InterruptedException e) {
+// throw new Exception("Interrupted", e);
+// }
+// }
+//
+// metadataFile.writeEndObject();
+// metadataFile.close();
+//
+// usersFile.writeEndArray();
+// usersFile.close();
+//
+// logger.info( "Exported TOTAL {} admin users and {} organizations", userCount.get(), orgsWritten.size() );
+// }
+//
+//
+// private void saveDictionaries( JsonGenerator jg, AdminUserWriteTask task ) throws Exception {
+//
+// jg.writeFieldName( "dictionaries" );
+// jg.writeStartObject();
+//
+// for (String dictionary : task.dictionariesByName.keySet() ) {
+//
+// Map<Object, Object> dict = task.dictionariesByName.get( dictionary );
+//
+// if (dict.isEmpty()) {
+// continue;
+// }
+//
+// jg.writeFieldName( dictionary );
+//
+// jg.writeStartObject();
+//
+// for (Map.Entry<Object, Object> entry : dict.entrySet()) {
+// jg.writeFieldName( entry.getKey().toString() );
+// jg.writeObject( entry.getValue() );
+// }
+//
+// jg.writeEndObject();
+// }
+// jg.writeEndObject();
+// }
+//
+//
+// private void saveOrganizations( JsonGenerator jg, AdminUserWriteTask task ) throws Exception {
+//
+// final BiMap<UUID, String> orgs = task.orgNamesByUuid;
+//
+// jg.writeFieldName( "organizations" );
+//
+// jg.writeStartArray();
+//
+// for (UUID uuid : orgs.keySet()) {
+//
+// jg.writeStartObject();
+//
+// jg.writeFieldName( "uuid" );
+// jg.writeObject( uuid );
+//
+// jg.writeFieldName( "name" );
+// jg.writeObject( orgs.get( uuid ) );
+//
+// jg.writeEndObject();
+//
+// synchronized (orgsWritten) {
+// orgsWritten.add( uuid );
+// }
+// }
+//
+// jg.writeEndArray();
+// }
+// }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/be483819/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index db975e6..4123910 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -22,8 +22,9 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.Query;
+//import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.utils.StringUtils;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
@@ -49,382 +50,386 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* Export all entities and connections of a Usergrid app.
- *
+ *
* Exports data files to specified directory.
- *
+ *
* Will create as many output files as there are writeThreads (by default: 10).
- *
+ *
* Will create two types of files: *.entities for Usegrird entities and *.collections for entity to entity connections.
- *
+ *
* Every line of the data files is a complete JSON object.
*/
public class ExportApp extends ExportingToolBase {
- static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
-
- static final String APPLICATION_NAME = "application";
- private static final String WRITE_THREAD_COUNT = "writeThreads";
-
- String applicationName;
- String organizationName;
-
- AtomicInteger entitiesWritten = new AtomicInteger(0);
- AtomicInteger connectionsWritten = new AtomicInteger(0);
-
- Scheduler writeScheduler;
-
- ObjectMapper mapper = new ObjectMapper();
- Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
- Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
-
- int writeThreadCount = 10; // set via CLI option; limiting write will limit output files
-
-
- @Override
- @SuppressWarnings("static-access")
- public Options createOptions() {
-
- Options options = super.createOptions();
-
- Option appNameOption = OptionBuilder.hasArg().withType("")
- .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
- options.addOption( appNameOption );
-
- Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
- .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
- options.addOption( writeThreadsOption );
-
- return options;
- }
-
-
- /**
- * Tool entry point.
- */
@Override
- public void runTool(CommandLine line) throws Exception {
-
- applicationName = line.getOptionValue( APPLICATION_NAME );
-
- if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
- try {
- writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
- } catch (NumberFormatException nfe) {
- logger.error( "-" + WRITE_THREAD_COUNT + " must be specified as an integer. Aborting..." );
- return;
- }
- }
-
- setVerbose( line );
-
- applyOrgId( line );
- prepareBaseOutputFileName( line );
- outputDir = createOutputParentDir();
- logger.info( "Export directory: " + outputDir.getAbsolutePath() );
-
- startSpring();
-
- UUID applicationId = emf.lookupApplication( applicationName );
- if (applicationId == null) {
- throw new RuntimeException( "Cannot find application " + applicationName );
- }
- final EntityManager em = emf.getEntityManager( applicationId );
- organizationName = em.getApplication().getOrganizationName();
-
- ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
- writeScheduler = Schedulers.from( writeThreadPoolExecutor );
-
- Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
-
- collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
-
- public Observable<ExportEntity> call(String collection) {
-
- return Observable.create( new EntityObservable( em, collection ) )
- .doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
- }
-
- }, writeThreadCount ).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
-
- public Observable<ExportConnection> call(ExportEntity exportEntity) {
-
- return Observable.create( new ConnectionsObservable( em, exportEntity ) )
- .doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
- }
-
- }, writeThreadCount )
- .doOnCompleted( new FileWrapUpAction() )
- .toBlocking().last();
- }
-
-
- // ----------------------------------------------------------------------------------------
- // reading data
-
-
- /**
- * Emits collection names found in application.
- */
- class CollectionsObservable implements rx.Observable.OnSubscribe<String> {
- EntityManager em;
-
- public CollectionsObservable(EntityManager em) {
- this.em = em;
- }
-
- public void call(Subscriber<? super String> subscriber) {
-
- int count = 0;
- try {
- Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
- for ( String collection : collectionMetadata.keySet() ) {
- subscriber.onNext( collection );
- count++;
- }
-
- } catch (Exception e) {
- subscriber.onError( e );
- }
-
- subscriber.onCompleted();
- logger.info( "Completed. Read {} collection names", count );
- }
- }
-
-
- /**
- * Emits entities of collection.
- */
- class EntityObservable implements rx.Observable.OnSubscribe<ExportEntity> {
- EntityManager em;
- String collection;
-
- public EntityObservable(EntityManager em, String collection) {
- this.em = em;
- this.collection = collection;
- }
-
- public void call(Subscriber<? super ExportEntity> subscriber) {
-
- logger.info("Starting to read entities of collection {}", collection);
-
- subscriber.onStart();
-
- try {
- int count = 0;
-
- Query query = new Query();
- query.setLimit( MAX_ENTITY_FETCH );
-
- Results results = em.searchCollection( em.getApplicationRef(), collection, query );
-
- while (results.size() > 0) {
- for (Entity entity : results.getEntities()) {
- try {
- Set<String> dictionaries = em.getDictionaries( entity );
- Map dictionariesByName = new HashMap<String, Map<Object, Object>>();
- for (String dictionary : dictionaries) {
- Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
- if (dict.isEmpty()) {
- continue;
- }
- dictionariesByName.put( dictionary, dict );
- }
-
- ExportEntity exportEntity = new ExportEntity(
- organizationName,
- applicationName,
- entity,
- dictionariesByName );
-
- subscriber.onNext( exportEntity );
- count++;
-
- } catch (Exception e) {
- logger.error("Error reading entity " + entity.getUuid() +" from collection " + collection);
- }
- }
- if (results.getCursor() == null) {
- break;
- }
- query.setCursor( results.getCursor() );
- results = em.searchCollection( em.getApplicationRef(), collection, query );
- }
-
- subscriber.onCompleted();
- logger.info("Completed collection {}. Read {} entities", collection, count);
-
- } catch ( Exception e ) {
- subscriber.onError(e);
- }
- }
- }
-
-
- /**
- * Emits connections of an entity.
- */
- class ConnectionsObservable implements rx.Observable.OnSubscribe<ExportConnection> {
- EntityManager em;
- ExportEntity exportEntity;
-
- public ConnectionsObservable(EntityManager em, ExportEntity exportEntity) {
- this.em = em;
- this.exportEntity = exportEntity;
- }
-
- public void call(Subscriber<? super ExportConnection> subscriber) {
-
- logger.info( "Starting to read connections for entity {} type {}",
- exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
-
- int count = 0;
-
- try {
- Set<String> connectionTypes = em.getConnectionTypes( exportEntity.getEntity() );
- for (String connectionType : connectionTypes) {
-
- Results results = em.getConnectedEntities(
- exportEntity.getEntity().getUuid(), connectionType, null, Results.Level.CORE_PROPERTIES );
-
- for (Entity connectedEntity : results.getEntities()) {
- try {
-
- ExportConnection connection = new ExportConnection(
- applicationName,
- organizationName,
- connectionType,
- exportEntity.getEntity().getUuid(),
- connectedEntity.getUuid());
-
- subscriber.onNext( connection );
- count++;
-
- } catch (Exception e) {
- logger.error( "Error reading connection entity "
- + exportEntity.getEntity().getUuid() + " -> " + connectedEntity.getType());
- }
- }
- }
-
- } catch (Exception e) {
- subscriber.onError( e );
- }
-
- subscriber.onCompleted();
- logger.info("Completed entity {} type {} connections count {}",
- new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
- }
- }
-
-
- // ----------------------------------------------------------------------------------------
- // writing data
-
-
- /**
- * Writes entities to JSON file.
- */
- class EntityWriteAction implements Action1<ExportEntity> {
-
- public void call(ExportEntity entity) {
-
- String [] parts = Thread.currentThread().getName().split("-");
- String fileName = outputDir.getAbsolutePath() + File.separator
- + applicationName.replace('/','-') + "-" + parts[3] + ".entities";
-
- JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
- if ( gen == null ) {
-
- // no generator so we are opening new file and writing the start of an array
- try {
- gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
- logger.info("Opened output file {}", fileName);
- } catch (IOException e) {
- throw new RuntimeException("Error opening output file: " + fileName, e);
- }
- gen.setPrettyPrinter( new MinimalPrettyPrinter(""));
- gen.setCodec( mapper );
- entityGeneratorsByThread.put( Thread.currentThread(), gen );
- }
-
- try {
- gen.writeObject( entity );
- gen.writeRaw('\n');
- entitiesWritten.getAndIncrement();
-
- } catch (IOException e) {
- throw new RuntimeException("Error writing to output file: " + fileName, e);
- }
- }
- }
-
-
- /**
- * Writes connection to JSON file.
- */
- class ConnectionWriteAction implements Action1<ExportConnection> {
-
- public void call(ExportConnection conn) {
-
- String [] parts = Thread.currentThread().getName().split("-");
- String fileName = outputDir.getAbsolutePath() + File.separator
- + applicationName.replace('/','-') + "-" + parts[3] + ".connections";
-
- JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
- if ( gen == null ) {
-
- // no generator so we are opening new file and writing the start of an array
- try {
- gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
- logger.info("Opened output file {}", fileName);
- } catch (IOException e) {
- throw new RuntimeException("Error opening output file: " + fileName, e);
- }
- gen.setPrettyPrinter( new MinimalPrettyPrinter(""));
- gen.setCodec( mapper );
- connectionGeneratorsByThread.put( Thread.currentThread(), gen );
- }
-
- try {
- gen.writeObject( conn );
- gen.writeRaw('\n');
- connectionsWritten.getAndIncrement();
-
- } catch (IOException e) {
- throw new RuntimeException("Error writing to output file: " + fileName, e);
- }
- }
- }
-
-
- private class FileWrapUpAction implements Action0 {
- @Override
- public void call() {
-
- logger.info("-------------------------------------------------------------------");
- logger.info("DONE! Entities: {} Connections: {}", entitiesWritten.get(), connectionsWritten.get());
- logger.info("-------------------------------------------------------------------");
+ public void runTool( final CommandLine line ) throws Exception {
- for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
- try {
- //gen.writeEndArray();
- gen.flush();
- gen.close();
- } catch (IOException e) {
- logger.error("Error closing output file", e);
- }
- }
- for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
- try {
- //gen.writeEndArray();
- gen.flush();
- gen.close();
- } catch (IOException e) {
- logger.error("Error closing output file", e);
- }
- }
- }
}
+ // static final Logger logger = LoggerFactory.getLogger( ExportApp.class );
+//
+// static final String APPLICATION_NAME = "application";
+// private static final String WRITE_THREAD_COUNT = "writeThreads";
+//
+// String applicationName;
+// String organizationName;
+//
+// AtomicInteger entitiesWritten = new AtomicInteger(0);
+// AtomicInteger connectionsWritten = new AtomicInteger(0);
+//
+// Scheduler writeScheduler;
+//
+// ObjectMapper mapper = new ObjectMapper();
+// Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
+// Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
+//
+// int writeThreadCount = 10; // set via CLI option; limiting write will limit output files
+//
+//
+// @Override
+// @SuppressWarnings("static-access")
+// public Options createOptions() {
+//
+// Options options = super.createOptions();
+//
+// Option appNameOption = OptionBuilder.hasArg().withType("")
+// .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
+// options.addOption( appNameOption );
+//
+// Option writeThreadsOption = OptionBuilder.hasArg().withType(0)
+// .withDescription( "Write Threads -" + WRITE_THREAD_COUNT ).create(WRITE_THREAD_COUNT);
+// options.addOption( writeThreadsOption );
+//
+// return options;
+// }
+//
+//
+// /**
+// * Tool entry point.
+// */
+// @Override
+// public void runTool(CommandLine line) throws Exception {
+//
+// applicationName = line.getOptionValue( APPLICATION_NAME );
+//
+// if (StringUtils.isNotEmpty( line.getOptionValue( WRITE_THREAD_COUNT ) )) {
+// try {
+// writeThreadCount = Integer.parseInt( line.getOptionValue( WRITE_THREAD_COUNT ) );
+// } catch (NumberFormatException nfe) {
+// logger.error( "-" + WRITE_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+// return;
+// }
+// }
+//
+// setVerbose( line );
+//
+// applyOrgId( line );
+// prepareBaseOutputFileName( line );
+// outputDir = createOutputParentDir();
+// logger.info( "Export directory: " + outputDir.getAbsolutePath() );
+//
+// startSpring();
+//
+// UUID applicationId = emf.lookupApplication( applicationName );
+// if (applicationId == null) {
+// throw new RuntimeException( "Cannot find application " + applicationName );
+// }
+// final EntityManager em = emf.getEntityManager( applicationId );
+// organizationName = em.getApplication().getOrganizationName();
+//
+// ExecutorService writeThreadPoolExecutor = Executors.newFixedThreadPool( writeThreadCount );
+// writeScheduler = Schedulers.from( writeThreadPoolExecutor );
+//
+//// Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
+////
+//// collectionsObservable.flatMap( new Func1<String, Observable<ExportEntity>>() {
+////
+//// public Observable<ExportEntity> call(String collection) {
+////
+//// return Observable.create( new EntityObservable( em, collection ) )
+//// .doOnNext( new EntityWriteAction() ).subscribeOn( writeScheduler );
+//// }
+////
+//// }, writeThreadCount ).flatMap( new Func1<ExportEntity, Observable<ExportConnection>>() {
+////
+//// public Observable<ExportConnection> call(ExportEntity exportEntity) {
+////
+//// return Observable.create( new ConnectionsObservable( em, exportEntity ) )
+//// .doOnNext( new ConnectionWriteAction() ).subscribeOn( writeScheduler );
+//// }
+////
+//// }, writeThreadCount )
+//// .doOnCompleted( new FileWrapUpAction() )
+//// .toBlocking().last();
+// }
+//
+//
+// // ----------------------------------------------------------------------------------------
+// // reading data
+//
+//
+// /**
+// * Emits collection names found in application.
+// */
+// class CollectionsObservable implements rx.Observable.OnSubscribe<String> {
+// EntityManager em;
+//
+// public CollectionsObservable(EntityManager em) {
+// this.em = em;
+// }
+//
+// public void call(Subscriber<? super String> subscriber) {
+//
+// int count = 0;
+// try {
+// Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
+// for ( String collection : collectionMetadata.keySet() ) {
+// subscriber.onNext( collection );
+// count++;
+// }
+//
+// } catch (Exception e) {
+// subscriber.onError( e );
+// }
+//
+// subscriber.onCompleted();
+// logger.info( "Completed. Read {} collection names", count );
+// }
+// }
+//
+//
+// /**
+// * Emits entities of collection.
+// */
+// class EntityObservable implements rx.Observable.OnSubscribe<ExportEntity> {
+// EntityManager em;
+// String collection;
+//
+// public EntityObservable(EntityManager em, String collection) {
+// this.em = em;
+// this.collection = collection;
+// }
+//
+// public void call(Subscriber<? super ExportEntity> subscriber) {
+//
+// logger.info("Starting to read entities of collection {}", collection);
+//
+// // subscriber.onStart();
+//
+// try {
+// int count = 0;
+//
+// Query query = new Query();
+// query.setLimit( MAX_ENTITY_FETCH );
+//
+// Results results = em.searchCollection( em.getApplicationRef(), collection, query );
+//
+// while (results.size() > 0) {
+// for (Entity entity : results.getEntities()) {
+// try {
+// Set<String> dictionaries = em.getDictionaries( entity );
+// Map dictionariesByName = new HashMap<String, Map<Object, Object>>();
+// for (String dictionary : dictionaries) {
+// Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
+// if (dict.isEmpty()) {
+// continue;
+// }
+// dictionariesByName.put( dictionary, dict );
+// }
+//
+// ExportEntity exportEntity = new ExportEntity(
+// organizationName,
+// applicationName,
+// entity,
+// dictionariesByName );
+//
+// subscriber.onNext( exportEntity );
+// count++;
+//
+// } catch (Exception e) {
+// logger.error("Error reading entity " + entity.getUuid() +" from collection " + collection);
+// }
+// }
+// if (results.getCursor() == null) {
+// break;
+// }
+// query.setCursor( results.getCursor() );
+// results = em.searchCollection( em.getApplicationRef(), collection, query );
+// }
+//
+// subscriber.onCompleted();
+// logger.info("Completed collection {}. Read {} entities", collection, count);
+//
+// } catch ( Exception e ) {
+// subscriber.onError(e);
+// }
+// }
+// }
+//
+//
+// /**
+// * Emits connections of an entity.
+// */
+// class ConnectionsObservable implements rx.Observable.OnSubscribe<ExportConnection> {
+// EntityManager em;
+// ExportEntity exportEntity;
+//
+// public ConnectionsObservable(EntityManager em, ExportEntity exportEntity) {
+// this.em = em;
+// this.exportEntity = exportEntity;
+// }
+//
+// public void call(Subscriber<? super ExportConnection> subscriber) {
+//
+// logger.info( "Starting to read connections for entity {} type {}",
+// exportEntity.getEntity().getName(), exportEntity.getEntity().getType() );
+//
+// int count = 0;
+//
+// try {
+// Set<String> connectionTypes = em.getConnectionTypes( exportEntity.getEntity() );
+// for (String connectionType : connectionTypes) {
+//
+// Results results = em.getConnectedEntities(
+// exportEntity.getEntity().getUuid(), connectionType, null, Query.Level.CORE_PROPERTIES );
+//
+// for (Entity connectedEntity : results.getEntities()) {
+// try {
+//
+// ExportConnection connection = new ExportConnection(
+// applicationName,
+// organizationName,
+// connectionType,
+// exportEntity.getEntity().getUuid(),
+// connectedEntity.getUuid());
+//
+// subscriber.onNext( connection );
+// count++;
+//
+// } catch (Exception e) {
+// logger.error( "Error reading connection entity "
+// + exportEntity.getEntity().getUuid() + " -> " + connectedEntity.getType());
+// }
+// }
+// }
+//
+// } catch (Exception e) {
+// subscriber.onError( e );
+// }
+//
+// subscriber.onCompleted();
+// logger.info("Completed entity {} type {} connections count {}",
+// new Object[] { exportEntity.getEntity().getName(), exportEntity.getEntity().getType(), count });
+// }
+// }
+//
+//
+// // ----------------------------------------------------------------------------------------
+// // writing data
+//
+//
+// /**
+// * Writes entities to JSON file.
+// */
+// class EntityWriteAction implements Action1<ExportEntity> {
+//
+// public void call(ExportEntity entity) {
+//
+// String [] parts = Thread.currentThread().getName().split("-");
+// String fileName = outputDir.getAbsolutePath() + File.separator
+// + applicationName.replace('/','-') + "-" + parts[3] + ".entities";
+//
+// JsonGenerator gen = entityGeneratorsByThread.get( Thread.currentThread() );
+// if ( gen == null ) {
+//
+// // no generator so we are opening new file and writing the start of an array
+// try {
+// gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+// logger.info("Opened output file {}", fileName);
+// } catch (IOException e) {
+// throw new RuntimeException("Error opening output file: " + fileName, e);
+// }
+// gen.setPrettyPrinter( new MinimalPrettyPrinter(""));
+// gen.setCodec( mapper );
+// entityGeneratorsByThread.put( Thread.currentThread(), gen );
+// }
+//
+// try {
+// gen.writeObject( entity );
+// gen.writeRaw('\n');
+// entitiesWritten.getAndIncrement();
+//
+// } catch (IOException e) {
+// throw new RuntimeException("Error writing to output file: " + fileName, e);
+// }
+// }
+// }
+//
+//
+// /**
+// * Writes connection to JSON file.
+// */
+// class ConnectionWriteAction implements Action1<ExportConnection> {
+//
+// public void call(ExportConnection conn) {
+//
+// String [] parts = Thread.currentThread().getName().split("-");
+// String fileName = outputDir.getAbsolutePath() + File.separator
+// + applicationName.replace('/','-') + "-" + parts[3] + ".connections";
+//
+// JsonGenerator gen = connectionGeneratorsByThread.get( Thread.currentThread() );
+// if ( gen == null ) {
+//
+// // no generator so we are opening new file and writing the start of an array
+// try {
+// gen = jsonFactory.createJsonGenerator( new FileOutputStream( fileName ) );
+// logger.info("Opened output file {}", fileName);
+// } catch (IOException e) {
+// throw new RuntimeException("Error opening output file: " + fileName, e);
+// }
+// gen.setPrettyPrinter( new MinimalPrettyPrinter(""));
+// gen.setCodec( mapper );
+// connectionGeneratorsByThread.put( Thread.currentThread(), gen );
+// }
+//
+// try {
+// gen.writeObject( conn );
+// gen.writeRaw('\n');
+// connectionsWritten.getAndIncrement();
+//
+// } catch (IOException e) {
+// throw new RuntimeException("Error writing to output file: " + fileName, e);
+// }
+// }
+// }
+//
+//
+// private class FileWrapUpAction implements Action0 {
+// @Override
+// public void call() {
+//
+// logger.info("-------------------------------------------------------------------");
+// logger.info("DONE! Entities: {} Connections: {}", entitiesWritten.get(), connectionsWritten.get());
+// logger.info("-------------------------------------------------------------------");
+//
+// for ( JsonGenerator gen : entityGeneratorsByThread.values() ) {
+// try {
+// //gen.writeEndArray();
+// gen.flush();
+// gen.close();
+// } catch (IOException e) {
+// logger.error("Error closing output file", e);
+// }
+// }
+// for ( JsonGenerator gen : connectionGeneratorsByThread.values() ) {
+// try {
+// //gen.writeEndArray();
+// gen.flush();
+// gen.close();
+// } catch (IOException e) {
+// logger.error("Error closing output file", e);
+// }
+// }
+// }
+// }
}