You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/06/19 20:35:40 UTC
incubator-usergrid git commit: Added appName line value to exporting
toolbase. Added changes to tool that will allow it to take in applications.
Repository: incubator-usergrid
Updated Branches:
refs/heads/MigrationTool [created] 7f3111ae3
Added appName line value to exporting toolbase.
Added changes to tool that will allow it to take in applications.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7f3111ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7f3111ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7f3111ae
Branch: refs/heads/MigrationTool
Commit: 7f3111ae35d185cb762e13f1f4d520fd158fb1c8
Parents: 107a465
Author: GERey <gr...@apigee.com>
Authored: Fri Jun 19 11:35:38 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Fri Jun 19 11:35:38 2015 -0700
----------------------------------------------------------------------
.../usergrid/tools/ExportingToolBase.java | 6 +
.../org/apache/usergrid/tools/Migration.java | 511 +++++++++++++++++++
2 files changed, 517 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7f3111ae/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
index 3de220c..63a3b4e 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportingToolBase.java
@@ -52,6 +52,7 @@ public abstract class ExportingToolBase extends ToolBase {
protected String baseOutputDirName = "export";
protected UUID orgId;
+ protected String appName;
JsonFactory jsonFactory = new JsonFactory();
protected long startTime = System.currentTimeMillis();
@@ -81,6 +82,11 @@ public abstract class ExportingToolBase extends ToolBase {
}
}
+ protected void applyAppName( CommandLine line) {
+ if ( line.hasOption( "appName" ) ) {
+ appName =ConversionUtils.string( line.getOptionValue( "appName" ) ).toLowerCase();
+ }
+ }
protected void applyOrgId( CommandLine line ) {
if ( line.hasOption( "orgId" ) ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7f3111ae/stack/tools/src/main/java/org/apache/usergrid/tools/Migration.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/Migration.java b/stack/tools/src/main/java/org/apache/usergrid/tools/Migration.java
new file mode 100644
index 0000000..9bc27bf
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/Migration.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.tools;
+
+
+import com.google.common.collect.BiMap;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.Results.Level;
+import org.apache.usergrid.persistence.cassandra.CassandraService;
+import org.apache.usergrid.utils.JsonUtils;
+import org.apache.usergrid.utils.StringUtils;
+import org.codehaus.jackson.JsonGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Export Single application and all entities inside.
+ *
+ * java -jar usergrid-tools.jar Migration -appName
+ */
+public class Migration extends ExportingToolBase {
+
+ static final Logger logger = LoggerFactory.getLogger( Migration.class );
+ public static final String ADMIN_USERS_PREFIX = "admin-users";
+ public static final String ADMIN_USER_METADATA_PREFIX = "admin-user-metadata";
+ private static final String READ_THREAD_COUNT = "readThreads";
+ private int readThreadCount;
+
+
+ /**
+ * Represents an AdminUser that has been read and is ready for export.
+ */
+ class AdminUserWriteTask {
+ Entity adminUser;
+ Map<String, List<UUID>> collectionsByName;
+ Map<String, List<ConnectionRef>> connectionsByType;
+ Map<String, Map<Object, Object>> dictionariesByName;
+ BiMap<UUID, String> orgNamesByUuid;
+ }
+
+
+ /**
+ * Export admin users using multiple threads.
+ * <p/>
+ * How it works:
+ * In main thread we query for IDs of all admin users, add each ID to read queue.
+ * Read-queue workers read admin user data, add data to write queue.
+ * One write-queue worker reads data writes to file.
+ */
+ @Override
+ public void runTool(CommandLine line) throws Exception {
+ startSpring();
+
+ setVerbose( line );
+
+ //Checks for the application name that we want to export.
+ applyAppName( line );
+ if ( appName == null ){
+ logger.error( "Must include a application name using -appName. Aborting..." );
+ return;
+ }
+ prepareBaseOutputFileName( line );
+ outputDir = createOutputParentDir();
+ logger.info( "Export directory: " + outputDir.getAbsolutePath() );
+
+ //Define how many threads we should use
+ if (StringUtils.isNotEmpty( line.getOptionValue( READ_THREAD_COUNT ) )) {
+ try {
+ readThreadCount = Integer.parseInt( line.getOptionValue( READ_THREAD_COUNT ) );
+ } catch (NumberFormatException nfe) {
+ logger.error( "-" + READ_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+ return;
+ }
+ } else {
+ readThreadCount = 20;
+ }
+
+ // start write queue worker
+
+ BlockingQueue<AdminUserWriteTask> writeQueue = new LinkedBlockingQueue<AdminUserWriteTask>();
+ AdminUserWriter adminUserWriter = new AdminUserWriter( writeQueue );
+ Thread writeThread = new Thread( adminUserWriter );
+ writeThread.start();
+ logger.debug( "Write thread started" );
+
+ // start read queue workers
+
+ BlockingQueue<UUID> readQueue = new LinkedBlockingQueue<UUID>();
+ List<AdminUserReader> readers = new ArrayList<AdminUserReader>();
+ for (int i = 0; i < readThreadCount; i++) {
+ AdminUserReader worker = new AdminUserReader( readQueue, writeQueue );
+ Thread readerThread = new Thread( worker, "AdminUserReader-" + i );
+ readerThread.start();
+ readers.add( worker );
+ }
+ logger.debug( readThreadCount + " read worker threads started" );
+
+ // query for IDs, add each to read queue
+
+ Query query = new Query();
+ query.setLimit( MAX_ENTITY_FETCH );
+ query.setResultsLevel( Level.IDS );
+ //EntityManager em = emf.getEntityManager( CassandraService.MANAGEMENT_APPLICATION_ID );
+
+
+ //Retrieve application that was given to the exporter
+ UUID applicationUuid = emf.lookupApplication( "applicationName" );
+ EntityManager em = emf.getEntityManager( applicationUuid );
+ //get all the collection names in an application
+ Iterator<String> collections = em.getApplicationCollections().iterator();
+// while(collections.hasNext()){
+// String collectionName = collections.next();
+// managementService.get
+// em.getCollection
+// }
+
+ Map<String, Object> metadata = em.getApplicationCollectionMetadata();
+ echo( JsonUtils.mapToFormattedJsonString( metadata ) );
+
+ // Loop through the collections. This is the only way to loop
+ // through the entities in the application (former namespace).
+ for ( String collectionName : metadata.keySet() ) {
+
+ query = new Query();
+ query.setLimit( MAX_ENTITY_FETCH );
+ query.setResultsLevel( Level.IDS );
+
+ Results ids = em.searchCollection( em.getApplicationRef(), collectionName, query );
+ for (UUID uuid : ids.getIds()) {
+ readQueue.add( uuid );
+ logger.debug( "Added uuid to readQueue: " + uuid );
+ }
+ if (ids.getCursor() == null) {
+ break;
+ }
+
+ query.setCursor( ids.getCursor() );
+
+ ids = em.searchCollection( em.getApplicationRef(), collectionName, query );
+ }
+
+ adminUserWriter.setDone( true );
+ for (AdminUserReader aur : readers) {
+ aur.setDone( true );
+ }
+
+ logger.debug( "Waiting for write thread to complete" );
+ writeThread.join();
+ }
+
+ public EntityManager applicationEntityManagerCreator() throws Exception{
+ UUID applicationUuid = emf.lookupApplication( appName );
+ return emf.getEntityManager( applicationUuid );
+ }
+
+ @Override
+ @SuppressWarnings("static-access")
+ public Options createOptions() {
+
+ Options options = super.createOptions();
+
+ Option readThreads = OptionBuilder
+ .hasArg().withType(0).withDescription("Read Threads -" + READ_THREAD_COUNT).create(READ_THREAD_COUNT);
+
+ options.addOption( readThreads );
+ return options;
+ }
+
+
+ public class AdminUserReader implements Runnable {
+
+ private boolean done = true;
+
+ private final BlockingQueue<UUID> readQueue;
+ private final BlockingQueue<AdminUserWriteTask> writeQueue;
+
+ public AdminUserReader( BlockingQueue<UUID> readQueue, BlockingQueue<AdminUserWriteTask> writeQueue ) {
+ this.readQueue = readQueue;
+ this.writeQueue = writeQueue;
+ }
+
+
+ @Override
+ public void run() {
+ try {
+ readAndQueueAdminUsers();
+ } catch (Exception e) {
+ logger.error("Error reading data for export", e);
+ }
+ }
+
+
+ private void readAndQueueAdminUsers() throws Exception {
+
+ EntityManager em = applicationEntityManagerCreator();
+
+ while ( true ) {
+
+ UUID uuid = null;
+ try {
+ uuid = readQueue.poll( 30, TimeUnit.SECONDS );
+ logger.debug("Got item from entityId queue: " + uuid );
+
+ if ( uuid == null && done ) {
+ break;
+ }
+
+ Entity entity = em.get( uuid );
+
+ AdminUserWriteTask task = new AdminUserWriteTask();
+ task.adminUser = entity;
+
+ addCollectionsToTask( task, entity );
+ addDictionariesToTask( task, entity );
+ addConnectionsToTask( task, entity );
+ // addOrganizationsToTask( task, entity );
+
+ writeQueue.add( task );
+
+ } catch ( Exception e ) {
+ logger.error("Error reading data for user " + uuid, e );
+ }
+ }
+ }
+
+
+ private void addCollectionsToTask(AdminUserWriteTask task, Entity entity) throws Exception {
+
+ EntityManager em = applicationEntityManagerCreator();
+ Set<String> collections = em.getCollections( entity );
+ if ((collections == null) || collections.isEmpty()) {
+ return;
+ }
+
+ task.collectionsByName = new HashMap<String, List<UUID>>();
+
+ for (String collectionName : collections) {
+
+ List<UUID> uuids = task.collectionsByName.get( collectionName );
+ if ( uuids == null ) {
+ uuids = new ArrayList<UUID>();
+ task.collectionsByName.put( collectionName, uuids );
+ }
+
+ Results collectionMembers = em.getCollection( entity, collectionName, null, 100000, Level.IDS, false );
+
+ List<UUID> entityIds = collectionMembers.getIds();
+
+ if ((entityIds != null) && !entityIds.isEmpty()) {
+ for (UUID childEntityUUID : entityIds) {
+ uuids.add( childEntityUUID );
+ }
+ }
+ }
+ }
+
+
+ private void addDictionariesToTask(AdminUserWriteTask task, Entity entity) throws Exception {
+ EntityManager em = applicationEntityManagerCreator();
+
+ Set<String> dictionaries = em.getDictionaries( entity );
+
+ task.dictionariesByName = new HashMap<String, Map<Object, Object>>();
+
+ for (String dictionary : dictionaries) {
+ Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
+ task.dictionariesByName.put( dictionary, dict );
+ }
+ }
+
+
+ private void addConnectionsToTask(AdminUserWriteTask task, Entity entity) throws Exception {
+ EntityManager em = applicationEntityManagerCreator();
+
+ task.connectionsByType = new HashMap<String, List<ConnectionRef>>();
+
+ Set<String> connectionTypes = em.getConnectionTypes( entity );
+ for (String connectionType : connectionTypes) {
+
+ List<ConnectionRef> connRefs = task.connectionsByType.get( connectionType );
+ if ( connRefs == null ) {
+ connRefs = new ArrayList<ConnectionRef>();
+ }
+
+ Results results = em.getConnectedEntities( entity.getUuid(), connectionType, null, Level.IDS );
+ List<ConnectionRef> connections = results.getConnections();
+
+ for (ConnectionRef connectionRef : connections) {
+ connRefs.add( connectionRef );
+ }
+ }
+ }
+
+
+ private void addOrganizationsToTask(AdminUserWriteTask task, Entity entity) throws Exception {
+ task.orgNamesByUuid = managementService.getOrganizationsForAdminUser( entity.getUuid() );
+ }
+
+ public void setDone(boolean done) {
+ this.done = done;
+ }
+ }
+
+ class AdminUserWriter implements Runnable {
+
+ private boolean done = false;
+
+ private final BlockingQueue<AdminUserWriteTask> taskQueue;
+
+ public AdminUserWriter( BlockingQueue<AdminUserWriteTask> taskQueue ) {
+ this.taskQueue = taskQueue;
+ }
+
+
+ @Override
+ public void run() {
+ try {
+ writeEntities();
+ } catch (Exception e) {
+ logger.error("Error writing export data", e);
+ }
+ }
+
+
+ private void writeEntities() throws Exception {
+ //TODO: change this to take in the application name.
+ EntityManager em = applicationEntityManagerCreator();
+
+ //TODO: could do one file for collections and one file for applications or multiple.
+ // write one JSON file for management application users
+ JsonGenerator usersFile =
+ getJsonGenerator( createOutputFile( ADMIN_USERS_PREFIX, em.getApplication().getName() ) );
+ usersFile.writeStartArray();
+
+ // write one JSON file for metadata: collections, connections and dictionaries of those users
+ JsonGenerator metadataFile =
+ getJsonGenerator( createOutputFile( ADMIN_USER_METADATA_PREFIX, em.getApplication().getName() ) );
+ metadataFile.writeStartObject();
+
+ int count = 0;
+
+ while ( true ) {
+
+ try {
+ AdminUserWriteTask task = taskQueue.poll( 30, TimeUnit.SECONDS );
+ if ( task == null && done ) {
+ break;
+ }
+
+ // write user to application file
+ usersFile.writeObject( task.adminUser );
+ echo( task.adminUser );
+
+ // write metadata to metadata file
+ saveCollections( metadataFile, task );
+ saveConnections( metadataFile, task );
+ //saveOrganizations( metadataFile, task );
+ saveDictionaries( metadataFile, task );
+
+ logger.debug("Exported user {}", task.adminUser.getProperty( "email" ));
+
+ count++;
+ if ( count % 1000 == 0 ) {
+ logger.info("Exported {} admin users", count);
+ }
+
+
+ } catch (InterruptedException e) {
+ throw new Exception("Interrupted", e);
+ }
+ }
+
+ metadataFile.writeEndObject();
+ metadataFile.close();
+
+ usersFile.writeEndArray();
+ usersFile.close();
+
+ logger.info("Exported TOTAL {} admin users", count);
+ }
+
+
+ private void saveCollections( JsonGenerator jg, AdminUserWriteTask task ) throws Exception {
+
+ jg.writeFieldName( task.adminUser.getUuid().toString() );
+ jg.writeStartObject();
+
+ for (String collectionName : task.collectionsByName.keySet() ) {
+
+ jg.writeFieldName( collectionName );
+ jg.writeStartArray();
+
+ List<UUID> entityIds = task.collectionsByName.get( collectionName );
+
+ if ((entityIds != null) && !entityIds.isEmpty()) {
+ for (UUID childEntityUUID : entityIds) {
+ jg.writeObject( childEntityUUID.toString() );
+ }
+ }
+
+ jg.writeEndArray();
+ }
+
+ jg.writeEndObject();
+ }
+
+
+ private void saveDictionaries( JsonGenerator jg, AdminUserWriteTask task ) throws Exception {
+
+ jg.writeFieldName( "dictionaries" );
+ jg.writeStartObject();
+
+ for (String dictionary : task.dictionariesByName.keySet() ) {
+
+ Map<Object, Object> dict = task.dictionariesByName.get( dictionary );
+
+ if (dict.isEmpty()) {
+ continue;
+ }
+
+ jg.writeFieldName( dictionary );
+
+ jg.writeStartObject();
+
+ for (Map.Entry<Object, Object> entry : dict.entrySet()) {
+ jg.writeFieldName( entry.getKey().toString() );
+ jg.writeObject( entry.getValue() );
+ }
+
+ jg.writeEndObject();
+ }
+ jg.writeEndObject();
+ }
+
+
+ private void saveConnections( JsonGenerator jg, AdminUserWriteTask task ) throws Exception {
+
+ jg.writeFieldName( "connections" );
+ jg.writeStartObject();
+
+ for (String connectionType : task.connectionsByType.keySet() ) {
+
+ jg.writeFieldName( connectionType );
+ jg.writeStartArray();
+
+ List<ConnectionRef> connections = task.connectionsByType.get( connectionType );
+ for (ConnectionRef connectionRef : connections) {
+ jg.writeObject( connectionRef.getConnectedEntity().getUuid() );
+ }
+
+ jg.writeEndArray();
+ }
+ jg.writeEndObject();
+ }
+
+
+ private void saveOrganizations( JsonGenerator jg, AdminUserWriteTask task ) throws Exception {
+
+ final BiMap<UUID, String> orgs = task.orgNamesByUuid;
+
+ jg.writeFieldName( "organizations" );
+
+ jg.writeStartArray();
+
+ for (UUID uuid : orgs.keySet()) {
+
+ jg.writeStartObject();
+
+ jg.writeFieldName( "uuid" );
+ jg.writeObject( uuid );
+
+ jg.writeFieldName( "name" );
+ jg.writeObject( orgs.get( uuid ) );
+
+ jg.writeEndObject();
+ }
+
+ jg.writeEndArray();
+ }
+
+ public void setDone(boolean done) {
+ this.done = done;
+ }
+ }
+}
+