You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2018/10/23 03:17:46 UTC
[08/16] usergrid git commit: Create AppDeleter tool to delete data
for an application
Create AppDeleter tool to delete data for an application
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/59e7a24e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/59e7a24e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/59e7a24e
Branch: refs/heads/master
Commit: 59e7a24e9ebd577bd892812bd36f2626fa011677
Parents: 57afad2
Author: Mike Dunker <md...@google.com>
Authored: Fri Apr 6 16:08:41 2018 -0700
Committer: Keyur Karnik <ke...@gmail.com>
Committed: Tue Aug 28 16:41:44 2018 -0700
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 9 +-
.../usergrid/persistence/EntityManager.java | 4 +
.../usergrid/management/ManagementService.java | 2 +
.../cassandra/ManagementServiceImpl.java | 53 ++
stack/tools/pom.xml | 6 +
.../org/apache/usergrid/tools/AppDeleter.java | 656 +++++++++++++++++++
.../org/apache/usergrid/tools/ExportApp.java | 2 +-
7 files changed, 730 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/59e7a24e/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index ed33201..0681b7a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -1246,8 +1246,15 @@ public class CpEntityManager implements EntityManager {
@Override
public Map<Object, Object> getDictionaryAsMap( EntityRef entity, String dictionaryName ) throws Exception {
+ return getDictionaryAsMap(entity, dictionaryName, true);
+ }
+
+
+ @Override
+ public Map<Object, Object> getDictionaryAsMap( EntityRef entity, String dictionaryName,
+ boolean forceVerification) throws Exception {
- entity = validate( entity );
+ entity = validate( entity, forceVerification);
Map<Object, Object> dictionary = new LinkedHashMap<Object, Object>();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/59e7a24e/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index c9752c3..c0e64a6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -276,6 +276,10 @@ public interface EntityManager {
public Map<Object, Object> getDictionaryAsMap( EntityRef entityRef, String dictionaryName )
throws Exception;
+ public Map<Object, Object> getDictionaryAsMap( EntityRef entityRef, String dictionaryName,
+ boolean forceVerification )
+ throws Exception;
+
public Object getDictionaryElementValue( EntityRef entityRef, String dictionaryName,
String elementName ) throws Exception;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/59e7a24e/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java b/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
index 3d0c9fb..e0ab95e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/ManagementService.java
@@ -110,6 +110,8 @@ public interface ManagementService {
UUID addApplicationToOrganization(UUID organizationId, Entity appInfo) throws Exception;
+ boolean deleteAdminUser( UUID userId ) throws Exception;
+
void deleteOrganizationApplication( UUID organizationId, UUID applicationId ) throws Exception;
void disableAdminUser( UUID userId ) throws Exception;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/59e7a24e/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
index ab93563..7e114e5 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
@@ -934,6 +934,36 @@ public class ManagementServiceImpl implements ManagementService {
}
+ // reverse doCreateAdmin + creation of user entity
+ @Override
+ public boolean deleteAdminUser( UUID userId ) throws Exception {
+
+ // make sure user is not attached to any orgs
+ BiMap<UUID, String> orgMap = getOrganizationsForAdminUser(userId);
+ if (!orgMap.isEmpty()) {
+ // cannot delete admin user that is attached to orgs
+ logger.info("Cannot delete admin user {} -- admin user is attached to {} orgs", userId, orgMap.size());
+ return false;
+ }
+
+ EntityRef userRef = new SimpleEntityRef(User.ENTITY_TYPE, userId);
+
+ // delete mongo password
+ deleteUserMongoPassword(smf.getManagementAppId(), userRef);
+
+ // delete user password
+ deleteUserPassword(smf.getManagementAppId(), userRef);
+
+ // delete user token
+ deleteUserToken(smf.getManagementAppId(), userRef);
+
+ // delete user entity
+ emf.getEntityManager(smf.getManagementAppId()).delete(userRef);
+
+ return true;
+ }
+
+
@Override
public UserInfo createAdminFromPrexistingPassword( UUID organizationId, User user, CredentialsInfo ci )
throws Exception {
@@ -3283,6 +3313,12 @@ public class ManagementServiceImpl implements ManagementService {
}
+ /** Delete the user's password credentials info */
+ protected void deleteUserPassword( UUID appId, EntityRef owner ) throws Exception {
+ deleteCreds( appId, owner, USER_PASSWORD );
+ }
+
+
/** read the user password credential's info */
protected CredentialsInfo readUserPasswordCredentials( UUID appId, UUID ownerId, String ownerType )
throws Exception {
@@ -3302,11 +3338,22 @@ public class ManagementServiceImpl implements ManagementService {
}
+ /** Delete the user's token */
+ protected void deleteUserToken( UUID appId, EntityRef owner ) throws Exception {
+ deleteCreds( appId, owner, USER_TOKEN );
+ }
+
+
/** Write the mongo password */
protected void writeUserMongoPassword( UUID appId, EntityRef owner, CredentialsInfo password ) throws Exception {
writeCreds( appId, owner, password, USER_MONGO_PASSWORD );
}
+ /** Delete the mongo password */
+ private void deleteUserMongoPassword( UUID appId, EntityRef owner) throws Exception {
+ deleteCreds( appId, owner, USER_MONGO_PASSWORD );
+ }
+
/** Read the mongo password */
protected CredentialsInfo readUserMongoPassword( UUID appId, UUID ownerId, String ownerType ) throws Exception {
@@ -3339,6 +3386,12 @@ public class ManagementServiceImpl implements ManagementService {
}
+ private void deleteCreds( UUID appId, EntityRef owner, String key ) throws Exception {
+ EntityManager em = emf.getEntityManager( appId );
+ em.removeFromDictionary(owner, DICTIONARY_CREDENTIALS, key);
+ }
+
+
private Set<CredentialsInfo> readUserPasswordHistory( UUID appId, UUID ownerId ) throws Exception {
EntityManager em = emf.getEntityManager( appId );
Entity owner = em.get( new SimpleEntityRef("user", ownerId ));
http://git-wip-us.apache.org/repos/asf/usergrid/blob/59e7a24e/stack/tools/pom.xml
----------------------------------------------------------------------
diff --git a/stack/tools/pom.xml b/stack/tools/pom.xml
index b34b068..991ff7c 100644
--- a/stack/tools/pom.xml
+++ b/stack/tools/pom.xml
@@ -262,6 +262,12 @@
<version>${aws.version}</version>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.8.1</version>
+ </dependency>
+
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
http://git-wip-us.apache.org/repos/asf/usergrid/blob/59e7a24e/stack/tools/src/main/java/org/apache/usergrid/tools/AppDeleter.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/AppDeleter.java b/stack/tools/src/main/java/org/apache/usergrid/tools/AppDeleter.java
new file mode 100644
index 0000000..911e43f
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/AppDeleter.java
@@ -0,0 +1,656 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.BiMap;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.management.OrganizationInfo;
+import org.apache.usergrid.management.UserInfo;
+import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.index.impl.EsProvider;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.services.assets.data.BinaryStore;
+import org.apache.usergrid.tools.export.ExportEntity;
+import org.apache.usergrid.utils.StringUtils;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Scheduler;
+import rx.Subscriber;
+import rx.functions.Action0;
+import rx.functions.Action1;
+import rx.schedulers.Schedulers;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Delete all entities and connections of a Usergrid app.
+ */
+public class AppDeleter extends ExportingToolBase {
+ static final Logger logger = LoggerFactory.getLogger( AppDeleter.class );
+
+ private static final String ORGANIZATION_NAME = "organizationName";
+ private static final String APPLICATION_NAME = "applicationName";
+ private static final String DELETE_THREAD_COUNT = "deleteThreads";
+ private static final String PERFORM_DELETE = "performDelete";
+ private static final String LOG_EACH_ITEM = "logEachItem";
+
+ private static final String ACCESS_ID_PROPNAME = "AWS_ACCESS_KEY_ID";
+ private static final String SECRET_KEY_PROPNAME = "AWS_SECRET_KEY";
+ private static final String BUCKET_NAME_PROPNAME = "usergrid.binary.bucketname";
+
+ private static final String ALL_INDEXES = "*";
+ private static final String SCROLL_TIMEOUT = "5m";
+ private static final int SCROLL_SIZE = 10;
+
+ String applicationName;
+ String organizationName;
+
+ AtomicInteger entitiesFound = new AtomicInteger(0);
+ AtomicInteger entityDictionaryEntriesFound = new AtomicInteger(0);
+ AtomicInteger appDictionaryEntriesFound = new AtomicInteger(0);
+ AtomicInteger assetsFound = new AtomicInteger(0);
+ AtomicInteger esDocsFound = new AtomicInteger(0);
+ AtomicInteger orgAdminsFound = new AtomicInteger(0);
+
+ Scheduler deleteScheduler;
+ AmazonS3Client s3Client;
+ EsProvider esProvider;
+ IndexLocationStrategyFactory ilsf;
+
+ ObjectMapper mapper = new ObjectMapper();
+ Map<Thread, JsonGenerator> entityGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
+ Map<Thread, JsonGenerator> connectionGeneratorsByThread = new HashMap<Thread, JsonGenerator>();
+
+ int deleteThreadCount = 10; // set via CLI option
+
+ BinaryStore binaryStore;
+
+ String logLineSeparator = "-------------------------------------------------------------------";
+
+
+ @Override
+ @SuppressWarnings("static-access")
+ public Options createOptions() {
+
+ Options options = super.createOptions();
+
+ Option orgNameOption = OptionBuilder.hasArg().isRequired(true).withType("")
+ .withDescription( "Organization Name -" + ORGANIZATION_NAME ).create( ORGANIZATION_NAME );
+ options.addOption( orgNameOption );
+
+ Option appNameOption = OptionBuilder.hasArg().isRequired(false).withType("")
+ .withDescription( "Application Name -" + APPLICATION_NAME ).create( APPLICATION_NAME );
+ options.addOption( appNameOption );
+
+ Option performDeleteOption = OptionBuilder.hasArg().isRequired(false)
+ .withDescription("Perform Delete -" + PERFORM_DELETE).create(PERFORM_DELETE);
+ options.addOption( performDeleteOption );
+
+ Option deleteThreadsOption = OptionBuilder.hasArg().isRequired(false)
+ .withType("")
+ .withDescription( "Delete Threads -" + DELETE_THREAD_COUNT).create(DELETE_THREAD_COUNT);
+ options.addOption( deleteThreadsOption );
+
+ Option logEachItemOption = OptionBuilder.hasArg().isRequired(false)
+ .withDescription("Log each item -" + LOG_EACH_ITEM).create(LOG_EACH_ITEM);
+ options.addOption( logEachItemOption );
+
+ return options;
+ }
+
+
+ /**
+ * Tool entry point.
+ */
+ @Override
+ public void runTool(CommandLine line) throws Exception {
+
+ organizationName = line.getOptionValue( ORGANIZATION_NAME );
+ applicationName = line.getOptionValue( APPLICATION_NAME );
+ final boolean allApps = StringUtils.isEmpty(applicationName);
+
+ String performDeleteOption = line.getOptionValue(PERFORM_DELETE);
+ final boolean performDelete = StringUtils.isNotEmpty(performDeleteOption) && performDeleteOption.toLowerCase().equals("yes");
+
+ String logEachItemOption = line.getOptionValue(LOG_EACH_ITEM);
+ final boolean logEachItem = StringUtils.isNotEmpty(logEachItemOption) && logEachItemOption.toLowerCase().equals("yes");
+
+ if (StringUtils.isNotEmpty( line.getOptionValue(DELETE_THREAD_COUNT) )) {
+ try {
+ deleteThreadCount = Integer.parseInt( line.getOptionValue(DELETE_THREAD_COUNT) );
+ } catch (NumberFormatException nfe) {
+ logger.error( "-" + DELETE_THREAD_COUNT + " must be specified as an integer. Aborting..." );
+ return;
+ }
+ }
+
+ startSpring();
+
+ // S3 asset store
+ String accessId = (String)properties.get(ACCESS_ID_PROPNAME);
+ String secretKey = (String)properties.get(SECRET_KEY_PROPNAME);
+ String bucketName = (String)properties.get(BUCKET_NAME_PROPNAME);
+
+ Properties overrides = new Properties();
+ overrides.setProperty( "s3" + ".identity", accessId );
+ overrides.setProperty( "s3" + ".credential", secretKey );
+
+ AWSCredentials credentials = new BasicAWSCredentials(accessId, secretKey);
+ ClientConfiguration clientConfig = new ClientConfiguration();
+ clientConfig.setProtocol(Protocol.HTTP);
+
+ s3Client = new AmazonS3Client(credentials, clientConfig);
+
+ // ES
+ ilsf = injector.getInstance(IndexLocationStrategyFactory.class);
+ esProvider = injector.getInstance(EsProvider.class);
+
+ ExecutorService deleteThreadPoolExecutor = Executors.newFixedThreadPool(deleteThreadCount);
+ deleteScheduler = Schedulers.from( deleteThreadPoolExecutor );
+
+ setVerbose( line );
+
+ logger.info(logLineSeparator);
+
+ boolean singleApp = false;
+ String matchingAppPrefix = organizationName + "/";
+ if (StringUtils.isNotEmpty(applicationName)) {
+ singleApp = true;
+ matchingAppPrefix += applicationName;
+ logger.info("APPLICATION:");
+ } else {
+ logger.info("APPLICATIONS FOR ORG " + organizationName + ":");
+ }
+
+ boolean foundApps = false;
+ Map<String, UUID> activeAppMap = new HashMap<>();
+ for (Map.Entry<String, UUID> entry : emf.getApplications().entrySet()) {
+ if (entry.getKey().startsWith(matchingAppPrefix)) {
+ foundApps = true;
+ logger.info("ACTIVE APP: {} - {}", entry.getKey(), entry.getValue().toString());
+ activeAppMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ Map<String, UUID> deletedAppMap = new HashMap<>();
+ for (Map.Entry<String, UUID> entry : emf.getDeletedApplications().entrySet()) {
+ if (entry.getKey().startsWith(matchingAppPrefix)) {
+ foundApps = true;
+ logger.info("DELETED APP: {} - {}", entry.getKey(), entry.getValue().toString());
+ deletedAppMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ logger.info(logLineSeparator);
+
+ if (!foundApps) {
+ if (singleApp) {
+ throw new RuntimeException( "Cannot find application " + organizationName + "/" + applicationName );
+ } else {
+ throw new RuntimeException( "Cannot find applications for org " + organizationName );
+ }
+ }
+
+ for (String name : activeAppMap.keySet()) {
+ UUID applicationId = activeAppMap.get(name);
+ final EntityManager em = emf.getEntityManager( applicationId );
+ handleApp(applicationId, name, false, em, performDelete, bucketName, logEachItem);
+ }
+ for (String name : deletedAppMap.keySet()) {
+ UUID applicationId = deletedAppMap.get(name);
+ final EntityManager em = emf.getEntityManager( applicationId );
+ handleApp(applicationId, name, true, em, performDelete, bucketName, logEachItem);
+ }
+
+ if (!singleApp) {
+ // handle org
+ handleOrg(organizationName, performDelete);
+ }
+
+ }
+
+
+ private void handleOrg(String organizationName, boolean performDelete) throws Exception {
+ OrganizationInfo orgInfo = managementService.getOrganizationByName(organizationName);
+ UUID orgUUID = orgInfo.getUuid();
+
+ logger.info(logLineSeparator);
+ logger.info("ORGANIZATION: {}({})", organizationName, orgUUID);
+ logger.info(logLineSeparator);
+
+ if (performDelete) {
+ try {
+ String clientId = managementService.getClientIdForOrganization(orgUUID);
+ String oldClientSecret = managementService.getClientSecretForOrganization(orgUUID);
+ logger.info(logLineSeparator);
+ logger.info("OLD ORG CLIENT ID: {}", clientId);
+ logger.info("OLD ORG CLIENT SECRET: {}", oldClientSecret);
+ String newClientSecret = managementService.newClientSecretForOrganization(orgInfo.getUuid());
+ logger.info("NEW ORG CLIENT SECRET: {}", newClientSecret);
+ logger.info(logLineSeparator);
+ } catch (Exception e) {
+ logger.error("FAILED TO CHANGE CREDENTIALS FOR ORG " + organizationName + ": " + e.getMessage(), e);
+ }
+ }
+
+ List<UserInfo> userList = managementService.getAdminUsersForOrganization(orgInfo.getUuid());
+
+ logger.info(logLineSeparator);
+ logger.info("ORGANIZATION ADMINS: {}({})", organizationName, orgInfo.getUuid());
+ logger.info(logLineSeparator);
+ orgAdminsFound.set(0);
+ for (UserInfo user : userList) {
+ orgAdminsFound.incrementAndGet();
+ BiMap<UUID, String> adminOrgs = managementService.getOrganizationsForAdminUser(user.getUuid());
+ int numOrgs = adminOrgs.size();
+ logger.info("ORGADMIN: {} ({}) - number of other orgs: {}", user.getUsername(), user.getEmail(), numOrgs - 1);
+ if (performDelete) {
+ managementService.removeAdminUserFromOrganization(user.getUuid(), orgInfo.getUuid(), true);
+ if (numOrgs <= 1) {
+ logger.info("ORGADMIN {} is in no other orgs -- deleting", user.getUsername());
+ try {
+ boolean success = managementService.deleteAdminUser(user.getUuid());
+ if (!success) {
+ logger.info("ORGADMIN {} - failed to delete", user.getUsername());
+ }
+ } catch (Exception e) {
+ logger.info("ORGADMIN " + user.getUsername() + " - exception while deleting: " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+ logger.info(logLineSeparator);
+ logger.info("ORGANIZATION ADMINS {} DONE! OrgAdmins: {}", performDelete ? "DELETE" : "LIST", orgAdminsFound.get());
+ logger.info(logLineSeparator);
+
+ }
+
+
+ private void handleApp(UUID appId, String orgAppName, boolean deletedApp,
+ EntityManager em, boolean performDelete, String bucketName,
+ boolean logEachItem) {
+ logger.info(logLineSeparator);
+ logger.info("APPLICATION: {}({}){}", orgAppName, appId.toString(), deletedApp ? " - DELETED" : "");
+ logger.info(logLineSeparator);
+
+ if (performDelete) {
+ try {
+ String clientId = managementService.getClientIdForApplication(appId);
+ String oldClientSecret = managementService.getClientSecretForApplication(appId);
+ logger.info(logLineSeparator);
+ logger.info("OLD APP CLIENT ID: {}", clientId);
+ logger.info("OLD APP CLIENT SECRET: {}", oldClientSecret);
+ String newClientSecret = managementService.newClientSecretForApplication(appId);
+ logger.info("NEW APP CLIENT SECRET: {}", newClientSecret);
+ logger.info(logLineSeparator);
+ } catch (Exception e) {
+ logger.error("FAILED TO CHANGE CREDENTIALS FOR APP " + orgAppName + ": " + e.getMessage(), e);
+ }
+ }
+
+ logger.info(logLineSeparator);
+ logger.info("FINDING APP DICTIONARIES");
+ logger.info(logLineSeparator);
+ // check for entity dictionaries
+ try {
+ EntityManager rootEm = emf.getEntityManager( emf.getManagementAppId() );
+
+ Application application = em.getApplication();
+ //logger.info("APP: {}", application.toString());
+
+ for ( String dictionary : rootEm.getDictionaries( application ) ) {
+ try {
+ //logger.info("DICTIONARY NAME: {}", dictionary);
+ Map<Object, Object> dictMap = rootEm.getDictionaryAsMap(application, dictionary, false);
+ for (Object key : dictMap.keySet()) {
+ appDictionaryEntriesFound.incrementAndGet();
+ if (logEachItem) {
+ logger.info("APP DICTIONARY {} ENTRY: ({})", dictionary, key.toString());
+ }
+ }
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+ catch (Exception e) {
+ logger.error("APP DICTIONARY CHECK FOR APP " + orgAppName + " FAILED: " + e.getMessage(), e);
+ }
+ logger.info(logLineSeparator);
+ logger.info("APP DICTIONARIES {} DONE! App Dictionary Entries found: {}", performDelete ? "DELETE" : "LIST", appDictionaryEntriesFound.get());
+ logger.info(logLineSeparator);
+
+ logger.info(logLineSeparator);
+ logger.info("FINDING ENTITIES");
+ logger.info(logLineSeparator);
+ entitiesFound.set(0);
+ Observable<String> collectionsObservable = Observable.create( new CollectionsObservable( em ) );
+
+ collectionsObservable.flatMap( collection -> {
+
+ return Observable.create( new EntityObservable( em, collection ) )
+ .doOnNext( new EntityDeleteAction(em, performDelete, logEachItem) ).subscribeOn(deleteScheduler);
+
+
+ } ).doOnCompleted( new EntityDeleteWrapUpAction(performDelete) ).toBlocking().lastOrDefault(null);
+
+
+ logger.info(logLineSeparator);
+ logger.info("FINDING ASSETS");
+ logger.info(logLineSeparator);
+ assetsFound.set(0);
+
+ ObjectListing listing = null;
+ try {
+ listing = s3Client.listObjects(bucketName, appId.toString() + "/");
+ }
+ catch (Exception e) {
+ logger.error("FAILED TO RETRIEVE ASSETS: ", e);
+ }
+ if (listing != null) {
+ for (S3ObjectSummary summary : listing.getObjectSummaries()) {
+ String assetKey = summary.getKey();
+ assetsFound.getAndIncrement();
+ if (logEachItem) {
+ logger.info("ASSET: {}", assetKey);
+ }
+ if (performDelete) {
+ try {
+ s3Client.deleteObject(bucketName, assetKey);
+ }
+ catch (Exception e) {
+ logger.error("FAILED TO DELETE ASSET: " + assetKey, e);
+ }
+ }
+ }
+ }
+ logger.info(logLineSeparator);
+ logger.info("Asset {} DONE! Assets: {}", performDelete ? "DELETE" : "LIST", assetsFound.get());
+ logger.info(logLineSeparator);
+
+ // Elasticsearch docs
+ logger.info(logLineSeparator);
+ logger.info("FINDING ES DOCS");
+ logger.info(logLineSeparator);
+ esDocsFound.set(0);
+
+ ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(appId, "application"));
+ // IndexLocationStrategy strategy = ilsf.getIndexLocationStrategy(applicationScope);
+
+ QueryBuilder qb = QueryBuilders.matchQuery("applicationId", "appId(" + appId.toString() + ",application)");
+ SearchResponse scrollResponse = esProvider.getClient()
+ .prepareSearch(ALL_INDEXES)
+ .setScroll(SCROLL_TIMEOUT)
+ .setSearchType(SearchType.SCAN)
+ .setQuery(qb)
+ .setSize(SCROLL_SIZE)
+ .setNoFields()
+ .execute().actionGet();
+
+ //logger.info(scrollResponse.toString());
+
+ while (true) {
+ BulkRequestBuilder bulkRequest = null;
+ if (performDelete) {
+ bulkRequest = esProvider.getClient().prepareBulk();
+ }
+ boolean docsToDelete = false;
+ for (SearchHit hit : scrollResponse.getHits().getHits()) {
+ esDocsFound.getAndIncrement();
+ if (logEachItem) {
+ logger.info("ES DOC: {}", hit.getId());
+ }
+ if (performDelete) {
+ docsToDelete = true;
+ bulkRequest.add(esProvider.getClient()
+ .prepareDelete(hit.getIndex(), hit.getType(), hit.getId()));
+ }
+ }
+
+ if (docsToDelete) {
+ BulkResponse bulkResponse = bulkRequest.execute().actionGet();
+ if (bulkResponse.hasFailures()) {
+ throw new RuntimeException(bulkResponse.buildFailureMessage());
+ }
+ }
+
+ scrollResponse = esProvider.getClient().prepareSearchScroll(scrollResponse.getScrollId())
+ .setScroll(SCROLL_TIMEOUT).execute().actionGet();
+
+ //logger.info(scrollResponse.toString());
+
+ if (scrollResponse.getHits().getHits().length == 0) {
+ break;
+ }
+ }
+ logger.info(logLineSeparator);
+ logger.info("ES Doc {} DONE! ES Docs: {}", performDelete ? "DELETE" : "LIST", esDocsFound.get());
+ logger.info(logLineSeparator);
+
+ }
+
+
+
+ // ----------------------------------------------------------------------------------------
+ // reading data
+
+
+ /**
+ * Emits collection names found in application.
+ */
+ private class CollectionsObservable implements Observable.OnSubscribe<String> {
+ EntityManager em;
+
+ public CollectionsObservable(EntityManager em) {
+ this.em = em;
+ }
+
+ public void call(Subscriber<? super String> subscriber) {
+
+ int count = 0;
+ try {
+ Map<String, Object> collectionMetadata = em.getApplicationCollectionMetadata();
+
+ logger.debug( "Emitting {} collection names for application {}",
+ collectionMetadata.size(), em.getApplication().getName() );
+
+ for ( String collection : collectionMetadata.keySet() ) {
+ subscriber.onNext( collection );
+ count++;
+ }
+
+ } catch (Exception e) {
+ subscriber.onError( e );
+ }
+
+ subscriber.onCompleted();
+ logger.info( "Completed. Read {} collection names", count );
+ }
+ }
+
+
+ /**
+ * Emits entities of collection.
+ */
+ private class EntityObservable implements Observable.OnSubscribe<ExportEntity> {
+ EntityManager em;
+ String collection;
+
+ public EntityObservable(EntityManager em, String collection) {
+ this.em = em;
+ this.collection = collection;
+ }
+
+ public void call(Subscriber<? super ExportEntity> subscriber) {
+
+ logger.info("Starting to fetch entities of collection {}", collection);
+
+ //subscriber.onStart();
+
+ try {
+ int count = 0;
+
+ Query query = new Query();
+ query.setLimit( MAX_ENTITY_FETCH );
+
+ Results results = em.searchCollection( em.getApplicationRef(), collection, query );
+
+ while (results.size() > 0) {
+ for (Entity entity : results.getEntities()) {
+ try {
+ Set<String> dictionaries = em.getDictionaries( entity );
+ Map dictionariesByName = new HashMap<String, Map<Object, Object>>();
+ for (String dictionary : dictionaries) {
+ Map<Object, Object> dict = em.getDictionaryAsMap( entity, dictionary );
+ if (dict.isEmpty()) {
+ continue;
+ }
+ dictionariesByName.put( dictionary, dict );
+ }
+
+ ExportEntity exportEntity = new ExportEntity(
+ organizationName,
+ applicationName,
+ entity,
+ dictionariesByName );
+
+ subscriber.onNext( exportEntity );
+ count++;
+
+ } catch (Exception e) {
+ logger.error("Error reading entity " + entity.getUuid() +" from collection " + collection);
+ }
+ }
+ if (results.getCursor() == null) {
+ break;
+ }
+ query.setCursor( results.getCursor() );
+ results = em.searchCollection( em.getApplicationRef(), collection, query );
+ }
+
+ subscriber.onCompleted();
+ logger.info("Completed collection {}. Read {} entities", collection, count);
+
+ } catch ( Exception e ) {
+ subscriber.onError(e);
+ }
+ }
+ }
+
+
+ // ----------------------------------------------------------------------------------------
+ // writing data
+
+
+ /**
+ * Delete entities.
+ */
+ private class EntityDeleteAction implements Action1<ExportEntity> {
+
+ EntityManager em;
+ boolean performDelete;
+ boolean logEachEntity;
+
+ public EntityDeleteAction(EntityManager em, boolean performDelete, boolean logEachEntity) {
+ this.em = em;
+ this.performDelete = performDelete;
+ this.logEachEntity = logEachEntity;
+ }
+
+ public void call(ExportEntity entity) {
+
+ try {
+ entitiesFound.getAndIncrement();
+ if (logEachEntity) {
+ logger.info("ENTITY: {}", entity.getEntity().asId().toString());
+ }
+
+ // check for entity dictionaries
+ if (entity.getDictionaries().size() > 0) {
+ for (String dictionaryName : entity.getDictionaries().keySet()) {
+ Map<Object, Object> dictMap = em.getDictionaryAsMap(entity.getEntity(), dictionaryName);
+ for (Object key : dictMap.keySet()) {
+ entityDictionaryEntriesFound.incrementAndGet();
+ if (logEachEntity) {
+ logger.info("ENTITY DICTIONARY ENTRY ({}-{}): ({}): ({})",
+ entity.getEntity().asId().toString(),
+ dictionaryName, key.toString(),
+ dictMap.get(key).toString());
+ }
+ }
+ }
+ }
+
+ if (performDelete) {
+ em.delete(entity.getEntity());
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeException("Error deleting entity (IOException): " + e.getMessage(), e);
+ } catch (Exception e) {
+ throw new RuntimeException("Error deleting entity: " + e.getMessage(), e);
+ }
+ }
+ }
+
+
+ private class EntityDeleteWrapUpAction implements Action0 {
+
+ boolean performDelete;
+
+ public EntityDeleteWrapUpAction(boolean performDelete) {
+ this.performDelete = performDelete;
+ }
+
+ @Override
+ public void call() {
+
+ logger.info(logLineSeparator);
+ logger.info("Entity {} DONE! Entities: {}", performDelete ? "DELETE" : "LIST", entitiesFound.get());
+ logger.info(logLineSeparator);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/59e7a24e/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
index c46f4b7..53e5ef3 100644
--- a/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ExportApp.java
@@ -293,8 +293,8 @@ public class ExportApp extends ExportingToolBase {
try {
ExportConnection connection = new ExportConnection(
- applicationName,
organizationName,
+ applicationName,
connectionType,
exportEntity.getEntity().getUuid(),
connectedEntity.getUuid());