You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/11/20 00:21:03 UTC
[3/4] usergrid git commit: Fixes for tools.jar not compiling in 2.0
http://git-wip-us.apache.org/repos/asf/usergrid/blob/be483819/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
index f39ef9b..d9823407 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ImportAdmins.java
@@ -27,7 +27,7 @@ import org.apache.usergrid.management.OrganizationInfo;
import org.apache.usergrid.management.UserInfo;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Identifier;
+//import org.apache.usergrid.persistence.Identifier;
import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.entities.User;
import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
@@ -47,18 +47,18 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
-import static org.apache.usergrid.persistence.cassandra.CassandraService.MANAGEMENT_APPLICATION_ID;
+//import static org.apache.usergrid.persistence.cassandra.CassandraService.MANAGEMENT_APPLICATION_ID;
/**
* Import Admin Users and metadata including organizations and passwords.
- *
- * Usage Example:
- *
+ *
+ * Usage Example:
+ *
* java -Xmx8000m -Dlog4j.configuration=file:/home/me/log4j.properties -classpath . \
* -jar usergrid-tools-1.0.2.jar ImportAdmins -writeThreads 100 -auditThreads 100 \
- * -host casshost -inputDir=/home/me/import-data
- *
+ * -host casshost -inputDir=/home/me/import-data
+ *
* If you want to provide any property overrides, put properties file named usergrid-custom-tools.properties
* in the same directory where you run the above command. For example, you might want to set the Cassandra
* client threads and import to a specific set of keyspaces:
@@ -69,767 +69,771 @@ import static org.apache.usergrid.persistence.cassandra.CassandraService.MANAGEM
* cassandra.lock.keyspace=My_Other_Usergrid_Locks
*/
public class ImportAdmins extends ToolBase {
-
- private static final Logger logger = LoggerFactory.getLogger(ImportAdmins.class);
-
- /**
- * Input directory where the .json export files are
- */
- static final String INPUT_DIR = "inputDir";
- static final String WRITE_THREAD_COUNT = "writeThreads";
- static final String AUDIT_THREAD_COUNT = "auditThreads";
-
- static File importDir;
-
- static final String DEFAULT_INPUT_DIR = "export";
-
- private Map<Stoppable, Thread> adminWriteThreads = new HashMap<Stoppable, Thread>();
- private Map<Stoppable, Thread> adminAuditThreads = new HashMap<Stoppable, Thread>();
- private Map<Stoppable, Thread> metadataWorkerThreadMap = new HashMap<Stoppable, Thread>();
-
- Map<UUID, DuplicateUser> dupsByDupUuid = new HashMap<UUID, DuplicateUser>(200);
-
- JsonFactory jsonFactory = new JsonFactory();
-
- AtomicInteger userCount = new AtomicInteger( 0 );
- AtomicInteger metadataCount = new AtomicInteger( 0 );
-
- AtomicInteger writeEmptyCount = new AtomicInteger( 0 );
- AtomicInteger auditEmptyCount = new AtomicInteger( 0 );
- AtomicInteger metadataEmptyCount = new AtomicInteger( 0 );
-
-
- static class DuplicateUser {
- String email;
- String username;
- public DuplicateUser( String propName, Map<String, Object> user ) {
- if ( "email".equals(propName)) {
- email = user.get("email").toString();
- } else {
- username = user.get("username").toString();
- }
- }
- }
-
-
-
- @Override
- @SuppressWarnings("static-access")
- public Options createOptions() {
-
- Option hostOption = OptionBuilder.withArgName("host")
- .hasArg()
- .withDescription("Cassandra host").create("host");
-
- Option inputDir = OptionBuilder
- .hasArg()
- .withDescription("input directory -inputDir").create(INPUT_DIR);
-
- Option writeThreads = OptionBuilder
- .hasArg()
- .withDescription("Write Threads -writeThreads").create(WRITE_THREAD_COUNT);
-
- Option auditThreads = OptionBuilder
- .hasArg()
- .withDescription("Audit Threads -auditThreads").create(AUDIT_THREAD_COUNT);
-
- Option verbose = OptionBuilder
- .withDescription("Print on the console an echo of the content written to the file")
- .create(VERBOSE);
-
- Options options = new Options();
- options.addOption(hostOption);
- options.addOption(writeThreads);
- options.addOption(auditThreads);
- options.addOption( inputDir );
- options.addOption( verbose );
-
- return options;
- }
-
-
@Override
- public void runTool(CommandLine line) throws Exception {
-
- startSpring();
-
- setVerbose(line);
-
- openImportDirectory(line);
-
- int auditThreadCount = 1;
- int writeThreadCount = 1;
-
- if (line.hasOption(AUDIT_THREAD_COUNT)) {
- auditThreadCount = Integer.parseInt(line.getOptionValue(AUDIT_THREAD_COUNT));
- }
-
- if (line.hasOption(WRITE_THREAD_COUNT)) {
- writeThreadCount = Integer.parseInt( line.getOptionValue(WRITE_THREAD_COUNT));
- }
-
- importAdminUsers( writeThreadCount, auditThreadCount );
-
- importMetadata( writeThreadCount );
- }
-
-
- /**
- * Import admin users.
- */
- private void importAdminUsers(int writeThreadCount, int auditThreadCount) throws Exception {
-
- String[] fileNames = importDir.list(new PrefixFileFilter(ExportAdmins.ADMIN_USERS_PREFIX + "."));
-
- logger.info( "Applications to read: " + fileNames.length );
-
- for (String fileName : fileNames) {
- try {
- importAdminUsers(fileName, writeThreadCount, auditThreadCount);
- } catch (Exception e) {
- logger.warn("Unable to import application: " + fileName, e);
- }
- }
- }
-
-
- /**
- * Imports admin users.
- *
- * @param fileName Name of admin user data file.
- */
- private void importAdminUsers(final String fileName,
- final int writeThreadCount,
- final int auditThreadCount) throws Exception {
-
- int count = 0;
-
- File adminUsersFile = new File(importDir, fileName);
-
- logger.info("----- Loading file: " + adminUsersFile.getAbsolutePath());
- JsonParser jp = getJsonParserForFile(adminUsersFile);
-
- int loopCounter = 0;
-
- BlockingQueue<Map<String, Object>> workQueue = new LinkedBlockingQueue<Map<String, Object>>();
- BlockingQueue<Map<String, Object>> auditQueue = new LinkedBlockingQueue<Map<String, Object>>();
-
- startAdminWorkers(workQueue, auditQueue, writeThreadCount);
- startAdminAuditors(auditQueue, auditThreadCount);
-
- JsonToken token = jp.nextToken();
- validateStartArray(token);
-
- while (jp.nextValue() != JsonToken.END_ARRAY) {
- loopCounter += 1;
+ public void runTool( final CommandLine line ) throws Exception {
- @SuppressWarnings("unchecked")
- Map<String, Object> entityProps = jp.readValueAs(HashMap.class);
- if (loopCounter % 1000 == 0) {
- logger.debug( "Publishing to queue... counter=" + loopCounter );
- }
-
- workQueue.add( entityProps );
- }
-
- waitForQueueAndMeasure(workQueue, writeEmptyCount, adminWriteThreads, "Admin Write");
- waitForQueueAndMeasure(auditQueue, auditEmptyCount, adminAuditThreads, "Admin Audit");
-
- logger.info("----- End: Imported {} admin users from file {}",
- count, adminUsersFile.getAbsolutePath());
-
- jp.close();
- }
-
- private static void waitForQueueAndMeasure(final BlockingQueue workQueue,
- final AtomicInteger emptyCounter,
- final Map<Stoppable, Thread> threadMap,
- final String identifier) throws InterruptedException {
- double rateAverageSum = 0;
- int iterations = 0;
-
- while ( emptyCounter.get() < threadMap.size() ) {
- iterations += 1;
-
- int sizeLast = workQueue.size();
- long lastTime = System.currentTimeMillis();
- logger.info("Queue {} is not empty, remaining size={}, waiting...", identifier, sizeLast);
- Thread.sleep(10000);
-
- long timeNow = System.currentTimeMillis();
- int sizeNow = workQueue.size();
-
- int processed = sizeLast - sizeNow;
-
- long timeDelta = timeNow - lastTime;
-
- double rateLast = (double) processed / (timeDelta / 1000);
- rateAverageSum += rateLast;
-
- long timeRemaining = (long) ( sizeLast / (rateAverageSum / iterations) );
-
- logger.info("++PROGRESS ({}): sizeLast={} nowSize={} processed={} rateLast={}/s rateAvg={}/s timeRemaining={}s",
- new Object[] {
- identifier, sizeLast, sizeNow, processed, rateLast, (rateAverageSum / iterations), timeRemaining } );
- }
-
- for (Stoppable worker : threadMap.keySet()) {
- worker.setDone(true);
- }
- }
-
- private void startAdminAuditors(BlockingQueue<Map<String, Object>> auditQueue, int workerCount) {
- for (int x = 0; x < workerCount; x++) {
- AuditWorker worker = new AuditWorker(auditQueue);
- Thread workerThread = new Thread(worker, "AdminAuditor-" + x);
- workerThread.start();
- adminAuditThreads.put(worker, workerThread);
- }
- logger.info("Started {} admin auditors", workerCount);
-
- }
-
-
- private void startAdminWorkers(BlockingQueue<Map<String, Object>> workQueue,
- BlockingQueue<Map<String, Object>> auditQueue,
- int workerCount) {
-
- for (int x = 0; x < workerCount; x++) {
- ImportAdminWorker worker = new ImportAdminWorker(workQueue, auditQueue);
- Thread workerThread = new Thread(worker, "AdminWriter-" + x);
- workerThread.start();
- adminWriteThreads.put(worker, workerThread);
- }
-
- logger.info("Started {} admin workers", workerCount);
- }
-
-
- private String getType(Map<String, Object> entityProps) {
- return (String) entityProps.get(PROPERTY_TYPE);
}
-
- private UUID getId(Map<String, Object> entityProps) {
- return UUID.fromString((String) entityProps.get(PROPERTY_UUID));
- }
-
-
- private void validateStartArray(JsonToken token) {
- if (token != JsonToken.START_ARRAY) {
- throw new RuntimeException("Token should be START ARRAY but it is:" + token.asString());
- }
- }
-
-
- private JsonParser getJsonParserForFile(File organizationFile) throws Exception {
- JsonParser jp = jsonFactory.createJsonParser( organizationFile );
- jp.setCodec( new ObjectMapper() );
- return jp;
- }
-
-
- /**
- * Import collections. Collections files are named: collections.<application_name>.Timestamp.json
- */
- private void importMetadata(int writeThreadCount) throws Exception {
-
- String[] fileNames = importDir.list(
- new PrefixFileFilter( ExportAdmins.ADMIN_USER_METADATA_PREFIX + "." ) );
- logger.info( "Metadata files to read: " + fileNames.length );
-
- for (String fileName : fileNames) {
- try {
- importMetadata(fileName, writeThreadCount);
- } catch (Exception e) {
- logger.warn("Unable to import metadata file: " + fileName, e);
- }
- }
- }
-
- private void startMetadataWorkers(BlockingQueue<ImportMetadataTask> workQueue, int writeThreadCount) {
-
- for (int x = 0; x < writeThreadCount; x++) {
- ImportMetadataWorker worker = new ImportMetadataWorker(workQueue);
- Thread workerThread = new Thread(worker, "ImportMetadataTask-" + x );
- workerThread.start();
- metadataWorkerThreadMap.put(worker, workerThread);
- }
-
- logger.info( "Started {} metadata workers", writeThreadCount );
- }
-
-
- @SuppressWarnings("unchecked")
- private void importMetadata(String fileName, int writeThreads) throws Exception {
-
- EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-
- File metadataFile = new File(importDir, fileName);
-
- logger.info("----- Loading metadata file: " + metadataFile.getAbsolutePath());
-
- JsonParser jp = getJsonParserForFile(metadataFile);
-
- JsonToken jsonToken = null; // jp.nextToken();// START_OBJECT this is the outer hashmap
-
- int depth = 1;
-
- BlockingQueue<ImportMetadataTask> workQueue = new LinkedBlockingQueue<ImportMetadataTask>();
- startMetadataWorkers(workQueue, writeThreads);
-
- while (depth > 0) {
-
- jsonToken = jp.nextToken();
-
- if (jsonToken == null) {
- logger.info("token is null, breaking");
- break;
- }
-
- if (jsonToken.equals(JsonToken.START_OBJECT)) {
- depth++;
- } else if (jsonToken.equals(JsonToken.END_OBJECT)) {
- depth--;
- }
-
- if (jsonToken.equals(JsonToken.FIELD_NAME) && depth == 2) {
-
- jp.nextToken();
- String entityOwnerId = jp.getCurrentName();
-
- try {
- EntityRef entityRef = new SimpleEntityRef( "user", UUID.fromString( entityOwnerId ) );
- Map<String, Object> metadata = (Map<String, Object>) jp.readValueAs( Map.class );
-
- workQueue.put( new ImportMetadataTask( entityRef, metadata ) );
- logger.debug( "Put user {} in metadata queue", entityRef.getUuid() );
-
- } catch ( Exception e ) {
- logger.debug( "Error with user {}, not putting in metadata queue", entityOwnerId );
- }
- }
- }
-
- waitForQueueAndMeasure(workQueue, metadataEmptyCount, metadataWorkerThreadMap, "Metadata Load");
-
- logger.info("----- End of metadata -----");
- jp.close();
- }
-
-
- /**
- * Imports the entity's connecting references (collections and connections)
- */
- @SuppressWarnings("unchecked")
- private void importEntityMetadata(
- EntityManager em, EntityRef entityRef, Map<String, Object> metadata) throws Exception {
-
- DuplicateUser dup = dupsByDupUuid.get( entityRef.getUuid() );
-
- if ( dup == null ) { // not a duplicate
-
- User user = em.get( entityRef, User.class );
- final UserInfo userInfo = managementService.getAdminUserByEmail( user.getEmail() );
-
- if (user == null || userInfo == null) {
- logger.error( "User {} does not exist, not processing metadata", entityRef.getUuid() );
- return;
- }
-
- List<Object> organizationsList = (List<Object>) metadata.get("organizations");
- if (organizationsList != null && !organizationsList.isEmpty()) {
-
- for (Object orgObject : organizationsList) {
-
- Map<String, Object> orgMap = (Map<String, Object>) orgObject;
- UUID orgUuid = UUID.fromString( (String) orgMap.get( "uuid" ) );
- String orgName = (String) orgMap.get( "name" );
-
- OrganizationInfo orgInfo = managementService.getOrganizationByUuid( orgUuid );
-
- if (orgInfo == null) { // org does not exist yet, create it and add user
- try {
- managementService.createOrganization( orgUuid, orgName, userInfo, false );
- orgInfo = managementService.getOrganizationByUuid( orgUuid );
-
- logger.debug( "Created new org {} for user {}",
- new Object[]{orgInfo.getName(), user.getEmail()} );
-
- } catch (DuplicateUniquePropertyExistsException dpee) {
- logger.debug( "Org {} already exists", orgName );
- }
- } else { // org exists, add original user to it
- try {
- managementService.addAdminUserToOrganization( userInfo, orgInfo, false );
- logger.debug( "Added to org user {}:{}:{}",
- new Object[]{
- orgInfo.getName(),
- user.getUsername(),
- user.getEmail(),
- user.getUuid()
- });
-
- } catch (Exception e) {
- logger.error( "Error Adding user {} to org {}", new Object[]{user.getEmail(), orgName} );
- }
- }
- }
- }
-
- Map<String, Object> dictionariesMap = (Map<String, Object>) metadata.get("dictionaries");
- if (dictionariesMap != null && !dictionariesMap.isEmpty()) {
- for (String name : dictionariesMap.keySet()) {
- try {
- Map<String, Object> dictionary = (Map<String, Object>) dictionariesMap.get(name);
- em.addMapToDictionary( entityRef, name, dictionary);
-
- logger.debug( "Creating dictionary for {} name {}",
- new Object[]{entityRef, name} );
-
- } catch (Exception e) {
- if (logger.isDebugEnabled()) {
- logger.error("Error importing dictionary name "
- + name + " for user " + entityRef.getUuid(), e);
- } else {
- logger.error("Error importing dictionary name "
- + name + " for user " + entityRef.getUuid());
- }
- }
- }
-
- } else {
- logger.warn("User {} has no dictionaries", entityRef.getUuid() );
- }
-
- } else { // this is a duplicate user, so merge orgs
-
- logger.info("Processing duplicate username={} email={}", dup.email, dup.username );
-
- Identifier identifier = dup.email != null ?
- Identifier.fromEmail( dup.email ) : Identifier.from( dup.username );
- User originalUser = em.get( em.getUserByIdentifier(identifier), User.class );
-
- // get map of original user's orgs
-
- UserInfo originalUserInfo = managementService.getAdminUserByEmail( originalUser.getEmail() );
- Map<String, Object> originalUserOrgData =
- managementService.getAdminUserOrganizationData( originalUser.getUuid() );
- Map<String, Map<String, Object>> originalUserOrgs =
- (Map<String, Map<String, Object>>) originalUserOrgData.get( "organizations" );
-
- // loop through duplicate user's orgs and give orgs to original user
-
- List<Object> organizationsList = (List<Object>) metadata.get("organizations");
- for (Object orgObject : organizationsList) {
-
- Map<String, Object> orgMap = (Map<String, Object>) orgObject;
- UUID orgUuid = UUID.fromString( (String) orgMap.get( "uuid" ) );
- String orgName = (String) orgMap.get( "name" );
-
- if (originalUserOrgs.get( orgName ) == null) { // original user does not have this org
-
- OrganizationInfo orgInfo = managementService.getOrganizationByUuid( orgUuid );
-
- if (orgInfo == null) { // org does not exist yet, create it and add original user to it
- try {
- managementService.createOrganization( orgUuid, orgName, originalUserInfo, false );
- orgInfo = managementService.getOrganizationByUuid( orgUuid );
-
- logger.debug( "Created new org {} for user {}:{}:{} from duplicate user {}:{}",
- new Object[]{
- orgInfo.getName(),
- originalUser.getUsername(),
- originalUser.getEmail(),
- originalUser.getUuid(),
- dup.username, dup.email
- });
-
- } catch (DuplicateUniquePropertyExistsException dpee) {
- logger.debug( "Org {} already exists", orgName );
- }
- } else { // org exists so add original user to it
- try {
- managementService.addAdminUserToOrganization( originalUserInfo, orgInfo, false );
- logger.debug( "Added to org user {}:{}:{} from duplicate user {}:{}",
- new Object[]{
- orgInfo.getName(),
- originalUser.getUsername(),
- originalUser.getEmail(),
- originalUser.getUuid(),
- dup.username, dup.email
- });
-
- } catch (Exception e) {
- logger.error( "Error Adding user {} to org {}",
- new Object[]{originalUserInfo.getEmail(), orgName} );
- }
- }
-
- } // else original user already has this org
-
- }
- }
- }
-
-
- /**
- * Open up the import directory based on <code>importDir</code>
- */
- private void openImportDirectory(CommandLine line) {
-
- boolean hasInputDir = line.hasOption(INPUT_DIR);
-
- if (hasInputDir) {
- importDir = new File(line.getOptionValue(INPUT_DIR));
- } else {
- importDir = new File(DEFAULT_INPUT_DIR);
- }
-
- logger.info("Importing from:" + importDir.getAbsolutePath());
- logger.info("Status. Exists: " + importDir.exists() + " - Readable: " + importDir.canRead());
- }
-
-
- interface Stoppable {
- void setDone(boolean done);
- }
-
- class AuditWorker implements Runnable, Stoppable {
- private BlockingQueue<Map<String, Object>> workQueue;
- private boolean done;
-
- public AuditWorker(BlockingQueue<Map<String, Object>> workQueue) {
- this.workQueue = workQueue;
- }
-
- @Override
- public void setDone(boolean done) {
- this.done = done;
- }
-
- @Override
- public void run() {
- int count = 0;
-
- EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-
- long durationSum = 0;
-
- while (!done) {
- try {
- Map<String, Object> entityProps = this.workQueue.poll(30, TimeUnit.SECONDS);
-
- if (entityProps == null) {
- logger.warn("Reading from AUDIT queue was null!");
- auditEmptyCount.getAndIncrement();
- Thread.sleep(1000);
- continue;
- }
- auditEmptyCount.set(0);
-
- count++;
- long startTime = System.currentTimeMillis();
-
- UUID uuid = (UUID) entityProps.get(PROPERTY_UUID);
- String type = getType(entityProps);
-
- if (em.get(uuid) == null) {
- logger.error( "FATAL ERROR: wrote an entity {}:{} and it's missing", uuid, type );
- System.exit(1);
- }
-
- echo(entityProps);
-
- long stopTime = System.currentTimeMillis();
-
- long duration = stopTime - startTime;
- durationSum += duration;
-
- //logger.debug( "Audited {}th admin", userCount );
-
- if ( count % 100 == 0 ) {
- logger.info( "Audited {}. Average Audit Rate: {}(ms)", count, durationSum / count );
- }
-
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
-
-
- class ImportMetadataTask {
- public EntityRef entityRef;
- public Map<String, Object> metadata;
-
- public ImportMetadataTask(EntityRef entityRef, Map<String, Object> metadata) {
- this.entityRef = entityRef;
- this.metadata = metadata;
- }
- }
-
- class ImportMetadataWorker implements Runnable, Stoppable {
- private BlockingQueue<ImportMetadataTask> workQueue;
- private boolean done = false;
-
- public ImportMetadataWorker(final BlockingQueue<ImportMetadataTask> workQueue) {
- this.workQueue = workQueue;
-
- }
-
- @Override
- public void setDone(boolean done) {
- this.done = done;
- }
-
- @Override
- public void run() {
- int count = 0;
-
- EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-
- long durationSum = 0;
-
- while (!done) {
- try {
- ImportMetadataTask task = this.workQueue.poll( 30, TimeUnit.SECONDS );
-
- if (task == null) {
- logger.warn("Reading from metadata queue was null!");
- metadataEmptyCount.getAndIncrement();
- Thread.sleep(1000);
- continue;
- }
- metadataEmptyCount.set( 0 );
-
- long startTime = System.currentTimeMillis();
-
- importEntityMetadata( em, task.entityRef, task.metadata );
-
- long stopTime = System.currentTimeMillis();
- long duration = stopTime - startTime;
- durationSum += duration;
- metadataCount.getAndIncrement();
- count++;
-
- if ( count % 30 == 0 ) {
- logger.info( "Imported {} metadata of total {} expected. " +
- "Average metadata Imported Rate: {}(ms)",
- new Object[] { metadataCount.get(), userCount.get(), durationSum / count });
- }
-
- } catch (Exception e) {
- logger.debug("Error reading writing metadata", e);
- }
- }
- }
- }
-
-
- class ImportAdminWorker implements Runnable, Stoppable {
-
- private BlockingQueue<Map<String, Object>> workQueue;
- private BlockingQueue<Map<String, Object>> auditQueue;
- private boolean done = false;
-
-
- public ImportAdminWorker(final BlockingQueue<Map<String, Object>> workQueue,
- final BlockingQueue<Map<String, Object>> auditQueue) {
- this.workQueue = workQueue;
- this.auditQueue = auditQueue;
- }
-
- @Override
- public void setDone(boolean done) {
- this.done = done;
- }
-
- @Override
- public void run() {
- int count = 0;
-
- EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
-
- long durationSum = 0;
-
- while (!done) {
-
- try {
-
- Map<String, Object> entityProps = this.workQueue.poll(30, TimeUnit.SECONDS);
-
- if (entityProps == null) {
- logger.warn("Reading from admin import queue was null!");
- writeEmptyCount.getAndIncrement();
- Thread.sleep( 1000 );
- continue;
- }
- writeEmptyCount.set(0);
-
- // Import/create the entity
- UUID uuid = getId(entityProps);
- String type = getType( entityProps );
-
- try {
- long startTime = System.currentTimeMillis();
-
- em.create(uuid, type, entityProps);
-
- logger.debug( "Imported admin user {}:{}:{}",
- new Object[] { entityProps.get( "username" ), entityProps.get("email"), uuid } );
-
- userCount.getAndIncrement();
- auditQueue.put(entityProps);
- long stopTime = System.currentTimeMillis();
- long duration = stopTime - startTime;
- durationSum += duration;
-
- count++;
- if (count % 30 == 0) {
- logger.info( "This worked has imported {} users of total {} imported so far. " +
- "Average Creation Rate: {}ms",
- new Object[] { count, userCount.get(), durationSum / count });
- }
-
- } catch (DuplicateUniquePropertyExistsException de) {
- String dupProperty = de.getPropertyName();
- handleDuplicateAccount( em, dupProperty, entityProps );
- continue;
-
-
- } catch (Exception e) {
- logger.error("Error", e);
- }
- } catch (InterruptedException e) {
- logger.error( "Error", e );
- }
-
- }
- }
-
-
- private void handleDuplicateAccount(EntityManager em, String dupProperty, Map<String, Object> entityProps ) {
-
- logger.info( "Processing duplicate user {}:{}:{} with duplicate {}", new Object[]{
- entityProps.get( "username" ),
- entityProps.get( "email" ),
- entityProps.get( "uuid" ),
- dupProperty} );
-
- UUID dupUuid = UUID.fromString( entityProps.get("uuid").toString() );
- try {
- dupsByDupUuid.put( dupUuid, new DuplicateUser( dupProperty, entityProps ) );
-
- } catch (Exception e) {
- logger.error("Error processing dup user {}:{}:{}",
- new Object[] {entityProps.get( "username" ), entityProps.get("email"), dupUuid});
- return;
- }
-
- }
- }
+ // private static final Logger logger = LoggerFactory.getLogger(ImportAdmins.class);
+//
+// /**
+// * Input directory where the .json export files are
+// */
+// static final String INPUT_DIR = "inputDir";
+// static final String WRITE_THREAD_COUNT = "writeThreads";
+// static final String AUDIT_THREAD_COUNT = "auditThreads";
+//
+// static File importDir;
+//
+// static final String DEFAULT_INPUT_DIR = "export";
+//
+// private Map<Stoppable, Thread> adminWriteThreads = new HashMap<Stoppable, Thread>();
+// private Map<Stoppable, Thread> adminAuditThreads = new HashMap<Stoppable, Thread>();
+// private Map<Stoppable, Thread> metadataWorkerThreadMap = new HashMap<Stoppable, Thread>();
+//
+// Map<UUID, DuplicateUser> dupsByDupUuid = new HashMap<UUID, DuplicateUser>(200);
+//
+// JsonFactory jsonFactory = new JsonFactory();
+//
+// AtomicInteger userCount = new AtomicInteger( 0 );
+// AtomicInteger metadataCount = new AtomicInteger( 0 );
+//
+// AtomicInteger writeEmptyCount = new AtomicInteger( 0 );
+// AtomicInteger auditEmptyCount = new AtomicInteger( 0 );
+// AtomicInteger metadataEmptyCount = new AtomicInteger( 0 );
+//
+//
+// static class DuplicateUser {
+// String email;
+// String username;
+// public DuplicateUser( String propName, Map<String, Object> user ) {
+// if ( "email".equals(propName)) {
+// email = user.get("email").toString();
+// } else {
+// username = user.get("username").toString();
+// }
+// }
+// }
+//
+//
+//
+// @Override
+// @SuppressWarnings("static-access")
+// public Options createOptions() {
+//
+// Option hostOption = OptionBuilder.withArgName("host")
+// .hasArg()
+// .withDescription("Cassandra host").create("host");
+//
+// Option inputDir = OptionBuilder
+// .hasArg()
+// .withDescription("input directory -inputDir").create(INPUT_DIR);
+//
+// Option writeThreads = OptionBuilder
+// .hasArg()
+// .withDescription("Write Threads -writeThreads").create(WRITE_THREAD_COUNT);
+//
+// Option auditThreads = OptionBuilder
+// .hasArg()
+// .withDescription("Audit Threads -auditThreads").create(AUDIT_THREAD_COUNT);
+//
+// Option verbose = OptionBuilder
+// .withDescription("Print on the console an echo of the content written to the file")
+// .create(VERBOSE);
+//
+// Options options = new Options();
+// options.addOption(hostOption);
+// options.addOption(writeThreads);
+// options.addOption(auditThreads);
+// options.addOption( inputDir );
+// options.addOption( verbose );
+//
+// return options;
+// }
+//
+//
+// @Override
+// public void runTool(CommandLine line) throws Exception {
+//
+// startSpring();
+//
+// setVerbose(line);
+//
+// openImportDirectory(line);
+//
+// int auditThreadCount = 1;
+// int writeThreadCount = 1;
+//
+// if (line.hasOption(AUDIT_THREAD_COUNT)) {
+// auditThreadCount = Integer.parseInt(line.getOptionValue(AUDIT_THREAD_COUNT));
+// }
+//
+// if (line.hasOption(WRITE_THREAD_COUNT)) {
+// writeThreadCount = Integer.parseInt( line.getOptionValue(WRITE_THREAD_COUNT));
+// }
+//
+// importAdminUsers( writeThreadCount, auditThreadCount );
+//
+// importMetadata( writeThreadCount );
+// }
+//
+//
+// /**
+// * Import admin users.
+// */
+// private void importAdminUsers(int writeThreadCount, int auditThreadCount) throws Exception {
+//
+// String[] fileNames = importDir.list(new PrefixFileFilter(ExportAdmins.ADMIN_USERS_PREFIX + "."));
+//
+// logger.info( "Applications to read: " + fileNames.length );
+//
+// for (String fileName : fileNames) {
+// try {
+// importAdminUsers(fileName, writeThreadCount, auditThreadCount);
+// } catch (Exception e) {
+// logger.warn("Unable to import application: " + fileName, e);
+// }
+// }
+// }
+//
+//
+// /**
+// * Imports admin users.
+// *
+// * @param fileName Name of admin user data file.
+// */
+// private void importAdminUsers(final String fileName,
+// final int writeThreadCount,
+// final int auditThreadCount) throws Exception {
+//
+// int count = 0;
+//
+// File adminUsersFile = new File(importDir, fileName);
+//
+// logger.info("----- Loading file: " + adminUsersFile.getAbsolutePath());
+// JsonParser jp = getJsonParserForFile(adminUsersFile);
+//
+// int loopCounter = 0;
+//
+// BlockingQueue<Map<String, Object>> workQueue = new LinkedBlockingQueue<Map<String, Object>>();
+// BlockingQueue<Map<String, Object>> auditQueue = new LinkedBlockingQueue<Map<String, Object>>();
+//
+// startAdminWorkers(workQueue, auditQueue, writeThreadCount);
+// startAdminAuditors(auditQueue, auditThreadCount);
+//
+// JsonToken token = jp.nextToken();
+// validateStartArray(token);
+//
+// while (jp.nextValue() != JsonToken.END_ARRAY) {
+// loopCounter += 1;
+//
+// @SuppressWarnings("unchecked")
+// Map<String, Object> entityProps = jp.readValueAs(HashMap.class);
+// if (loopCounter % 1000 == 0) {
+// logger.debug( "Publishing to queue... counter=" + loopCounter );
+// }
+//
+// workQueue.add( entityProps );
+// }
+//
+// waitForQueueAndMeasure(workQueue, writeEmptyCount, adminWriteThreads, "Admin Write");
+// waitForQueueAndMeasure(auditQueue, auditEmptyCount, adminAuditThreads, "Admin Audit");
+//
+// logger.info("----- End: Imported {} admin users from file {}",
+// count, adminUsersFile.getAbsolutePath());
+//
+// jp.close();
+// }
+//
+// private static void waitForQueueAndMeasure(final BlockingQueue workQueue,
+// final AtomicInteger emptyCounter,
+// final Map<Stoppable, Thread> threadMap,
+// final String identifier) throws InterruptedException {
+// double rateAverageSum = 0;
+// int iterations = 0;
+//
+// while ( emptyCounter.get() < threadMap.size() ) {
+// iterations += 1;
+//
+// int sizeLast = workQueue.size();
+// long lastTime = System.currentTimeMillis();
+// logger.info("Queue {} is not empty, remaining size={}, waiting...", identifier, sizeLast);
+// Thread.sleep(10000);
+//
+// long timeNow = System.currentTimeMillis();
+// int sizeNow = workQueue.size();
+//
+// int processed = sizeLast - sizeNow;
+//
+// long timeDelta = timeNow - lastTime;
+//
+// double rateLast = (double) processed / (timeDelta / 1000);
+// rateAverageSum += rateLast;
+//
+// long timeRemaining = (long) ( sizeLast / (rateAverageSum / iterations) );
+//
+// logger.info("++PROGRESS ({}): sizeLast={} nowSize={} processed={} rateLast={}/s rateAvg={}/s timeRemaining={}s",
+// new Object[] {
+// identifier, sizeLast, sizeNow, processed, rateLast, (rateAverageSum / iterations), timeRemaining } );
+// }
+//
+// for (Stoppable worker : threadMap.keySet()) {
+// worker.setDone(true);
+// }
+// }
+//
+// private void startAdminAuditors(BlockingQueue<Map<String, Object>> auditQueue, int workerCount) {
+// for (int x = 0; x < workerCount; x++) {
+// AuditWorker worker = new AuditWorker(auditQueue);
+// Thread workerThread = new Thread(worker, "AdminAuditor-" + x);
+// workerThread.start();
+// adminAuditThreads.put(worker, workerThread);
+// }
+// logger.info("Started {} admin auditors", workerCount);
+//
+// }
+//
+//
+// private void startAdminWorkers(BlockingQueue<Map<String, Object>> workQueue,
+// BlockingQueue<Map<String, Object>> auditQueue,
+// int workerCount) {
+//
+// for (int x = 0; x < workerCount; x++) {
+// ImportAdminWorker worker = new ImportAdminWorker(workQueue, auditQueue);
+// Thread workerThread = new Thread(worker, "AdminWriter-" + x);
+// workerThread.start();
+// adminWriteThreads.put(worker, workerThread);
+// }
+//
+// logger.info("Started {} admin workers", workerCount);
+// }
+//
+//
+// private String getType(Map<String, Object> entityProps) {
+// return (String) entityProps.get(PROPERTY_TYPE);
+// }
+//
+//
+// private UUID getId(Map<String, Object> entityProps) {
+// return UUID.fromString((String) entityProps.get(PROPERTY_UUID));
+// }
+//
+//
+// private void validateStartArray(JsonToken token) {
+// if (token != JsonToken.START_ARRAY) {
+// throw new RuntimeException("Token should be START ARRAY but it is:" + token.asString());
+// }
+// }
+//
+//
+// private JsonParser getJsonParserForFile(File organizationFile) throws Exception {
+// JsonParser jp = jsonFactory.createJsonParser( organizationFile );
+// jp.setCodec( new ObjectMapper() );
+// return jp;
+// }
+//
+//
+// /**
+// * Import collections. Collections files are named: collections.<application_name>.Timestamp.json
+// */
+// private void importMetadata(int writeThreadCount) throws Exception {
+//
+// String[] fileNames = importDir.list(
+// new PrefixFileFilter( ExportAdmins.ADMIN_USER_METADATA_PREFIX + "." ) );
+// logger.info( "Metadata files to read: " + fileNames.length );
+//
+// for (String fileName : fileNames) {
+// try {
+// importMetadata(fileName, writeThreadCount);
+// } catch (Exception e) {
+// logger.warn("Unable to import metadata file: " + fileName, e);
+// }
+// }
+// }
+//
+// private void startMetadataWorkers(BlockingQueue<ImportMetadataTask> workQueue, int writeThreadCount) {
+//
+// for (int x = 0; x < writeThreadCount; x++) {
+// ImportMetadataWorker worker = new ImportMetadataWorker(workQueue);
+// Thread workerThread = new Thread(worker, "ImportMetadataTask-" + x );
+// workerThread.start();
+// metadataWorkerThreadMap.put(worker, workerThread);
+// }
+//
+// logger.info( "Started {} metadata workers", writeThreadCount );
+// }
+//
+//
+// @SuppressWarnings("unchecked")
+// private void importMetadata(String fileName, int writeThreads) throws Exception {
+//
+// EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+//
+// File metadataFile = new File(importDir, fileName);
+//
+// logger.info("----- Loading metadata file: " + metadataFile.getAbsolutePath());
+//
+// JsonParser jp = getJsonParserForFile(metadataFile);
+//
+// JsonToken jsonToken = null; // jp.nextToken();// START_OBJECT this is the outer hashmap
+//
+// int depth = 1;
+//
+// BlockingQueue<ImportMetadataTask> workQueue = new LinkedBlockingQueue<ImportMetadataTask>();
+// startMetadataWorkers(workQueue, writeThreads);
+//
+// while (depth > 0) {
+//
+// jsonToken = jp.nextToken();
+//
+// if (jsonToken == null) {
+// logger.info("token is null, breaking");
+// break;
+// }
+//
+// if (jsonToken.equals(JsonToken.START_OBJECT)) {
+// depth++;
+// } else if (jsonToken.equals(JsonToken.END_OBJECT)) {
+// depth--;
+// }
+//
+// if (jsonToken.equals(JsonToken.FIELD_NAME) && depth == 2) {
+//
+// jp.nextToken();
+// String entityOwnerId = jp.getCurrentName();
+//
+// try {
+// EntityRef entityRef = new SimpleEntityRef( "user", UUID.fromString( entityOwnerId ) );
+// Map<String, Object> metadata = (Map<String, Object>) jp.readValueAs( Map.class );
+//
+// workQueue.put( new ImportMetadataTask( entityRef, metadata ) );
+// logger.debug( "Put user {} in metadata queue", entityRef.getUuid() );
+//
+// } catch ( Exception e ) {
+// logger.debug( "Error with user {}, not putting in metadata queue", entityOwnerId );
+// }
+// }
+// }
+//
+// waitForQueueAndMeasure(workQueue, metadataEmptyCount, metadataWorkerThreadMap, "Metadata Load");
+//
+// logger.info("----- End of metadata -----");
+// jp.close();
+// }
+//
+//
+// /**
+// * Imports the entity's connecting references (collections and connections)
+// */
+// @SuppressWarnings("unchecked")
+// private void importEntityMetadata(
+// EntityManager em, EntityRef entityRef, Map<String, Object> metadata) throws Exception {
+//
+// DuplicateUser dup = dupsByDupUuid.get( entityRef.getUuid() );
+//
+// if ( dup == null ) { // not a duplicate
+//
+// User user = em.get( entityRef, User.class );
+// final UserInfo userInfo = managementService.getAdminUserByEmail( user.getEmail() );
+//
+// if (user == null || userInfo == null) {
+// logger.error( "User {} does not exist, not processing metadata", entityRef.getUuid() );
+// return;
+// }
+//
+// List<Object> organizationsList = (List<Object>) metadata.get("organizations");
+// if (organizationsList != null && !organizationsList.isEmpty()) {
+//
+// for (Object orgObject : organizationsList) {
+//
+// Map<String, Object> orgMap = (Map<String, Object>) orgObject;
+// UUID orgUuid = UUID.fromString( (String) orgMap.get( "uuid" ) );
+// String orgName = (String) orgMap.get( "name" );
+//
+// OrganizationInfo orgInfo = managementService.getOrganizationByUuid( orgUuid );
+//
+// if (orgInfo == null) { // org does not exist yet, create it and add user
+// try {
+// managementService.createOrganization( orgUuid, orgName, userInfo, false );
+// orgInfo = managementService.getOrganizationByUuid( orgUuid );
+//
+// logger.debug( "Created new org {} for user {}",
+// new Object[]{orgInfo.getName(), user.getEmail()} );
+//
+// } catch (DuplicateUniquePropertyExistsException dpee) {
+// logger.debug( "Org {} already exists", orgName );
+// }
+// } else { // org exists, add original user to it
+// try {
+// managementService.addAdminUserToOrganization( userInfo, orgInfo, false );
+// logger.debug( "Added to org user {}:{}:{}",
+// new Object[]{
+// orgInfo.getName(),
+// user.getUsername(),
+// user.getEmail(),
+// user.getUuid()
+// });
+//
+// } catch (Exception e) {
+// logger.error( "Error Adding user {} to org {}", new Object[]{user.getEmail(), orgName} );
+// }
+// }
+// }
+// }
+//
+// Map<String, Object> dictionariesMap = (Map<String, Object>) metadata.get("dictionaries");
+// if (dictionariesMap != null && !dictionariesMap.isEmpty()) {
+// for (String name : dictionariesMap.keySet()) {
+// try {
+// Map<String, Object> dictionary = (Map<String, Object>) dictionariesMap.get(name);
+// em.addMapToDictionary( entityRef, name, dictionary);
+//
+// logger.debug( "Creating dictionary for {} name {}",
+// new Object[]{entityRef, name} );
+//
+// } catch (Exception e) {
+// if (logger.isDebugEnabled()) {
+// logger.error("Error importing dictionary name "
+// + name + " for user " + entityRef.getUuid(), e);
+// } else {
+// logger.error("Error importing dictionary name "
+// + name + " for user " + entityRef.getUuid());
+// }
+// }
+// }
+//
+// } else {
+// logger.warn("User {} has no dictionaries", entityRef.getUuid() );
+// }
+//
+// } else { // this is a duplicate user, so merge orgs
+//
+// logger.info("Processing duplicate username={} email={}", dup.email, dup.username );
+//
+// Identifier identifier = dup.email != null ?
+// Identifier.fromEmail( dup.email ) : Identifier.from( dup.username );
+// User originalUser = em.get( em.getUserByIdentifier(identifier), User.class );
+//
+// // get map of original user's orgs
+//
+// UserInfo originalUserInfo = managementService.getAdminUserByEmail( originalUser.getEmail() );
+// Map<String, Object> originalUserOrgData =
+// managementService.getAdminUserOrganizationData( originalUser.getUuid() );
+// Map<String, Map<String, Object>> originalUserOrgs =
+// (Map<String, Map<String, Object>>) originalUserOrgData.get( "organizations" );
+//
+// // loop through duplicate user's orgs and give orgs to original user
+//
+// List<Object> organizationsList = (List<Object>) metadata.get("organizations");
+// for (Object orgObject : organizationsList) {
+//
+// Map<String, Object> orgMap = (Map<String, Object>) orgObject;
+// UUID orgUuid = UUID.fromString( (String) orgMap.get( "uuid" ) );
+// String orgName = (String) orgMap.get( "name" );
+//
+// if (originalUserOrgs.get( orgName ) == null) { // original user does not have this org
+//
+// OrganizationInfo orgInfo = managementService.getOrganizationByUuid( orgUuid );
+//
+// if (orgInfo == null) { // org does not exist yet, create it and add original user to it
+// try {
+// managementService.createOrganization( orgUuid, orgName, originalUserInfo, false );
+// orgInfo = managementService.getOrganizationByUuid( orgUuid );
+//
+// logger.debug( "Created new org {} for user {}:{}:{} from duplicate user {}:{}",
+// new Object[]{
+// orgInfo.getName(),
+// originalUser.getUsername(),
+// originalUser.getEmail(),
+// originalUser.getUuid(),
+// dup.username, dup.email
+// });
+//
+// } catch (DuplicateUniquePropertyExistsException dpee) {
+// logger.debug( "Org {} already exists", orgName );
+// }
+// } else { // org exists so add original user to it
+// try {
+// managementService.addAdminUserToOrganization( originalUserInfo, orgInfo, false );
+// logger.debug( "Added to org user {}:{}:{} from duplicate user {}:{}",
+// new Object[]{
+// orgInfo.getName(),
+// originalUser.getUsername(),
+// originalUser.getEmail(),
+// originalUser.getUuid(),
+// dup.username, dup.email
+// });
+//
+// } catch (Exception e) {
+// logger.error( "Error Adding user {} to org {}",
+// new Object[]{originalUserInfo.getEmail(), orgName} );
+// }
+// }
+//
+// } // else original user already has this org
+//
+// }
+// }
+// }
+//
+//
+// /**
+// * Open up the import directory based on <code>importDir</code>
+// */
+// private void openImportDirectory(CommandLine line) {
+//
+// boolean hasInputDir = line.hasOption(INPUT_DIR);
+//
+// if (hasInputDir) {
+// importDir = new File(line.getOptionValue(INPUT_DIR));
+// } else {
+// importDir = new File(DEFAULT_INPUT_DIR);
+// }
+//
+// logger.info("Importing from:" + importDir.getAbsolutePath());
+// logger.info("Status. Exists: " + importDir.exists() + " - Readable: " + importDir.canRead());
+// }
+//
+//
+// interface Stoppable {
+// void setDone(boolean done);
+// }
+//
+// class AuditWorker implements Runnable, Stoppable {
+// private BlockingQueue<Map<String, Object>> workQueue;
+// private boolean done;
+//
+// public AuditWorker(BlockingQueue<Map<String, Object>> workQueue) {
+// this.workQueue = workQueue;
+// }
+//
+// @Override
+// public void setDone(boolean done) {
+// this.done = done;
+// }
+//
+// @Override
+// public void run() {
+// int count = 0;
+//
+// EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+//
+// long durationSum = 0;
+//
+// while (!done) {
+// try {
+// Map<String, Object> entityProps = this.workQueue.poll(30, TimeUnit.SECONDS);
+//
+// if (entityProps == null) {
+// logger.warn("Reading from AUDIT queue was null!");
+// auditEmptyCount.getAndIncrement();
+// Thread.sleep(1000);
+// continue;
+// }
+// auditEmptyCount.set(0);
+//
+// count++;
+// long startTime = System.currentTimeMillis();
+//
+// UUID uuid = (UUID) entityProps.get(PROPERTY_UUID);
+// String type = getType(entityProps);
+//
+// if (em.get(uuid) == null) {
+// logger.error( "FATAL ERROR: wrote an entity {}:{} and it's missing", uuid, type );
+// System.exit(1);
+// }
+//
+// echo(entityProps);
+//
+// long stopTime = System.currentTimeMillis();
+//
+// long duration = stopTime - startTime;
+// durationSum += duration;
+//
+// //logger.debug( "Audited {}th admin", userCount );
+//
+// if ( count % 100 == 0 ) {
+// logger.info( "Audited {}. Average Audit Rate: {}(ms)", count, durationSum / count );
+// }
+//
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// } catch (Exception e) {
+// e.printStackTrace();
+// }
+// }
+// }
+// }
+//
+//
+// class ImportMetadataTask {
+// public EntityRef entityRef;
+// public Map<String, Object> metadata;
+//
+// public ImportMetadataTask(EntityRef entityRef, Map<String, Object> metadata) {
+// this.entityRef = entityRef;
+// this.metadata = metadata;
+// }
+// }
+//
+// class ImportMetadataWorker implements Runnable, Stoppable {
+// private BlockingQueue<ImportMetadataTask> workQueue;
+// private boolean done = false;
+//
+// public ImportMetadataWorker(final BlockingQueue<ImportMetadataTask> workQueue) {
+// this.workQueue = workQueue;
+//
+// }
+//
+// @Override
+// public void setDone(boolean done) {
+// this.done = done;
+// }
+//
+// @Override
+// public void run() {
+// int count = 0;
+//
+// EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+//
+// long durationSum = 0;
+//
+// while (!done) {
+// try {
+// ImportMetadataTask task = this.workQueue.poll( 30, TimeUnit.SECONDS );
+//
+// if (task == null) {
+// logger.warn("Reading from metadata queue was null!");
+// metadataEmptyCount.getAndIncrement();
+// Thread.sleep(1000);
+// continue;
+// }
+// metadataEmptyCount.set( 0 );
+//
+// long startTime = System.currentTimeMillis();
+//
+// importEntityMetadata( em, task.entityRef, task.metadata );
+//
+// long stopTime = System.currentTimeMillis();
+// long duration = stopTime - startTime;
+// durationSum += duration;
+// metadataCount.getAndIncrement();
+// count++;
+//
+// if ( count % 30 == 0 ) {
+// logger.info( "Imported {} metadata of total {} expected. " +
+// "Average metadata Imported Rate: {}(ms)",
+// new Object[] { metadataCount.get(), userCount.get(), durationSum / count });
+// }
+//
+// } catch (Exception e) {
+// logger.debug("Error reading writing metadata", e);
+// }
+// }
+// }
+// }
+//
+//
+// class ImportAdminWorker implements Runnable, Stoppable {
+//
+// private BlockingQueue<Map<String, Object>> workQueue;
+// private BlockingQueue<Map<String, Object>> auditQueue;
+// private boolean done = false;
+//
+//
+// public ImportAdminWorker(final BlockingQueue<Map<String, Object>> workQueue,
+// final BlockingQueue<Map<String, Object>> auditQueue) {
+// this.workQueue = workQueue;
+// this.auditQueue = auditQueue;
+// }
+//
+// @Override
+// public void setDone(boolean done) {
+// this.done = done;
+// }
+//
+// @Override
+// public void run() {
+// int count = 0;
+//
+// EntityManager em = emf.getEntityManager(MANAGEMENT_APPLICATION_ID);
+//
+// long durationSum = 0;
+//
+// while (!done) {
+//
+// try {
+//
+// Map<String, Object> entityProps = this.workQueue.poll(30, TimeUnit.SECONDS);
+//
+// if (entityProps == null) {
+// logger.warn("Reading from admin import queue was null!");
+// writeEmptyCount.getAndIncrement();
+// Thread.sleep( 1000 );
+// continue;
+// }
+// writeEmptyCount.set(0);
+//
+// // Import/create the entity
+// UUID uuid = getId(entityProps);
+// String type = getType( entityProps );
+//
+// try {
+// long startTime = System.currentTimeMillis();
+//
+// em.create(uuid, type, entityProps);
+//
+// logger.debug( "Imported admin user {}:{}:{}",
+// new Object[] { entityProps.get( "username" ), entityProps.get("email"), uuid } );
+//
+// userCount.getAndIncrement();
+// auditQueue.put(entityProps);
+// long stopTime = System.currentTimeMillis();
+// long duration = stopTime - startTime;
+// durationSum += duration;
+//
+// count++;
+// if (count % 30 == 0) {
+// logger.info( "This worked has imported {} users of total {} imported so far. " +
+// "Average Creation Rate: {}ms",
+// new Object[] { count, userCount.get(), durationSum / count });
+// }
+//
+// } catch (DuplicateUniquePropertyExistsException de) {
+// String dupProperty = de.getPropertyName();
+// handleDuplicateAccount( em, dupProperty, entityProps );
+// continue;
+//
+//
+// } catch (Exception e) {
+// logger.error("Error", e);
+// }
+// } catch (InterruptedException e) {
+// logger.error( "Error", e );
+// }
+//
+// }
+// }
+//
+//
+// private void handleDuplicateAccount(EntityManager em, String dupProperty, Map<String, Object> entityProps ) {
+//
+// logger.info( "Processing duplicate user {}:{}:{} with duplicate {}", new Object[]{
+// entityProps.get( "username" ),
+// entityProps.get( "email" ),
+// entityProps.get( "uuid" ),
+// dupProperty} );
+//
+// UUID dupUuid = UUID.fromString( entityProps.get("uuid").toString() );
+// try {
+// dupsByDupUuid.put( dupUuid, new DuplicateUser( dupProperty, entityProps ) );
+//
+// } catch (Exception e) {
+// logger.error("Error processing dup user {}:{}:{}",
+// new Object[] {entityProps.get( "username" ), entityProps.get("email"), dupUuid});
+// return;
+// }
+//
+// }
+// }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/be483819/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java b/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
index 5c35e85..9c73807 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/IndexRebuild.java
@@ -38,7 +38,7 @@ import org.apache.usergrid.persistence.EntityRef;
/**
- * Index rebuild utility for Usergrid. Can be used to rebuild the index for a specific
+ * Index rebuild utility for Usergrid. Can be used to rebuild the index for a specific
* application, a specific application's collection or for an entire Usergrid system.
*/
public class IndexRebuild extends ToolBase {
@@ -92,7 +92,7 @@ public class IndexRebuild extends ToolBase {
/*
* (non-Javadoc)
- *
+ *
* @see
* org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine)
*/
@@ -103,19 +103,19 @@ public class IndexRebuild extends ToolBase {
logger.info( "Starting index rebuild" );
EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
-
+
@Override
public void onProgress(EntityRef entity) {
logger.info("Indexing entity {}:{}", entity.getType(), entity.getUuid());
}
- @Override
+ //@Override
public long getWriteDelayTime() {
return 100;
}
};
- emf.rebuildInternalIndexes( po );
+ emf.rebuildInternalIndexes( po );
emf.refreshIndex();
if ( line.getOptionValue("all") != null && line.getOptionValue("all").equalsIgnoreCase("true") ) {
@@ -130,7 +130,7 @@ public class IndexRebuild extends ToolBase {
Set<String> collections = getCollections( line, appId );
for ( String collection : collections ) {
- emf.rebuildCollectionIndex( appId, collection, po );
+ emf.rebuildCollectionIndex( appId, collection, false ,po );
emf.refreshIndex();
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/be483819/stack/tools/src/main/java/org/apache/usergrid/tools/RepairingMismatchedApplicationMetadata.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/RepairingMismatchedApplicationMetadata.java b/stack/tools/src/main/java/org/apache/usergrid/tools/RepairingMismatchedApplicationMetadata.java
index 7b18ccc..6adb5d6 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/RepairingMismatchedApplicationMetadata.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/RepairingMismatchedApplicationMetadata.java
@@ -95,7 +95,8 @@ public class RepairingMismatchedApplicationMetadata extends ToolBase {
UUID applicationId = emf.lookupApplication( app.getValue() );
if ( applicationId == null ) {
String appName = app.getValue();
- Keyspace ko = cass.getSystemKeyspace();
+ //TODO: this is broken please fix if needed in the future.
+ Keyspace ko = null; //cass.getSystemKeyspace();
Mutator<ByteBuffer> m = createMutator( ko, be );
long timestamp = cass.createTimestamp();
addInsertToMutator( m, APPLICATIONS_CF, appName, PROPERTY_UUID, app.getKey(), timestamp );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/be483819/stack/tools/src/main/java/org/apache/usergrid/tools/UserCollectionsDuplicateFix.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/UserCollectionsDuplicateFix.java b/stack/tools/src/main/java/org/apache/usergrid/tools/UserCollectionsDuplicateFix.java
new file mode 100644
index 0000000..7107b12
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/UserCollectionsDuplicateFix.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.tools;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+
+
+/**
+ * Fixes in 2.0 the issues where we have two unique entities when there should only exist one. Does indepth check
+ * on user to verify the right one to delete then deletes it from the keyspace!
+ *
+ *
+ * java -jar usergrid-tools.jar UserCollectionDuplicateFix -host <cassandra_host> -col <Collection to evaluate for merging or deletion>
+ */
+public class UserCollectionsDuplicateFix extends ToolBase {
+
+ private static final int PAGE_SIZE = 100;
+
+ private static final Logger logger = LoggerFactory.getLogger( UniqueIndexCleanup.class );
+
+
+ private static final String COLLECTION_ARG = "col";
+
+ @Override
+ @SuppressWarnings("static-access")
+ public Options createOptions() {
+ Options options = new Options();
+
+ Option hostOption =
+ OptionBuilder.withArgName( "host" ).hasArg().isRequired( true ).withDescription( "Cassandra host" )
+ .create( "host" );
+
+
+ options.addOption( hostOption );
+
+ Option collectionOption = OptionBuilder.withArgName( COLLECTION_ARG ).hasArg().isRequired( true )
+ .withDescription( "collection name" ).create( COLLECTION_ARG );
+
+ options.addOption( collectionOption );
+
+ return options;
+ }
+
+
+ @Override
+ public void runTool( final CommandLine line ) throws Exception {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/be483819/stack/tools/src/main/java/org/apache/usergrid/tools/UserManager.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/UserManager.java b/stack/tools/src/main/java/org/apache/usergrid/tools/UserManager.java
index 3b5383d..bfb6a64 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/UserManager.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/UserManager.java
@@ -18,6 +18,8 @@ package org.apache.usergrid.tools;
import com.google.common.collect.BiMap;
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.management.UserInfo;
import org.apache.commons.cli.CommandLine;
@@ -65,7 +67,7 @@ public class UserManager extends ToolBase {
logger.info( mapToFormattedJsonString( orgs ) );
logger.info("--- User dictionaries:");
- EntityManager em = emf.getEntityManager( CassandraService.MANAGEMENT_APPLICATION_ID );
+ EntityManager em = emf.getEntityManager( CpNamingUtils.MANAGEMENT_APPLICATION_ID);
User user = em.get( userInfo.getUuid(), User.class );
Set<String> dictionaries = em.getDictionaries( user );
for (String dictionary : dictionaries) {