You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/10 17:50:44 UTC
[37/43] git commit: Added a tool to re-index and re-persist all
entities.
Added a tool to re-index and re-persist all entities.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cb7cfadc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cb7cfadc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cb7cfadc
Branch: refs/heads/two-dot-o-events
Commit: cb7cfadc3dbe5bbe7fee6659ab0baf8da04818ad
Parents: 3fdc74f
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Oct 9 20:38:44 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Oct 9 20:38:44 2014 -0400
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 306 ++++++-------------
.../corepersistence/CpEntityManagerFactory.java | 35 ++-
.../usergrid/corepersistence/CpVisitor.java | 32 ++
.../usergrid/corepersistence/CpWalker.java | 236 ++++++++++++++
.../HybridEntityManagerFactory.java | 10 +
.../usergrid/persistence/EntityManager.java | 5 +
.../persistence/EntityManagerFactory.java | 4 +
.../cassandra/EntityManagerFactoryImpl.java | 10 +
.../cassandra/EntityManagerImpl.java | 10 +
.../PerformanceEntityRepersistTest.java | 240 +++++++++++++++
.../org/apache/usergrid/tools/RepersistAll.java | 98 ++++++
11 files changed, 774 insertions(+), 212 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb7cfadc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 0cf89b8..724aa80 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -33,7 +33,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Stack;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
@@ -119,11 +118,6 @@ import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsE
import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.apache.usergrid.persistence.index.IndexScope;
@@ -155,7 +149,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import rx.Observable;
-import rx.functions.Action1;
@@ -215,6 +208,9 @@ public class CpEntityManager implements EntityManager {
}
+ public ApplicationScope getApplicationScope() {
+ return applicationScope;
+ }
@Override
@@ -308,7 +304,7 @@ public class CpEntityManager implements EntityManager {
String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() );
CollectionScope collectionScope = new CollectionScopeImpl(
- applicationScope.getApplication(), applicationScope.getApplication(), collectionName );
+ getApplicationScope().getApplication(), getApplicationScope().getApplication(), collectionName );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@ -395,7 +391,7 @@ public class CpEntityManager implements EntityManager {
String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( type );
CollectionScope collectionScope = new CollectionScopeImpl(
- applicationScope.getApplication(), applicationScope.getApplication(), collectionName );
+ getApplicationScope().getApplication(), getApplicationScope().getApplication(), collectionName );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@ -461,8 +457,8 @@ public class CpEntityManager implements EntityManager {
// first, update entity index in its own collection scope
CollectionScope collectionScope = new CollectionScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
+ getApplicationScope().getApplication(),
+ getApplicationScope().getApplication(),
CpNamingUtils.getCollectionScopeNameFromEntityType( entity.getType() ) );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@ -526,8 +522,8 @@ public class CpEntityManager implements EntityManager {
private Observable deleteAsync( EntityRef entityRef ) throws Exception {
CollectionScope collectionScope = new CollectionScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
+ getApplicationScope().getApplication(),
+ getApplicationScope().getApplication(),
CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@ -552,7 +548,7 @@ public class CpEntityManager implements EntityManager {
logger.debug( "Deleting indexes of all {} collections owning the entity",
owners.keySet().size() );
- final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
+ final EntityIndex ei = managerCache.getEntityIndex(getApplicationScope());
final EntityIndexBatch batch = ei.createBatch();
@@ -579,13 +575,13 @@ public class CpEntityManager implements EntityManager {
// deindex from default index scope
IndexScope defaultIndexScope = new IndexScopeImpl(
- applicationScope.getApplication(),
+ getApplicationScope().getApplication(),
CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
batch.deindex(defaultIndexScope, entity );
IndexScope allTypesIndexScope = new IndexScopeImpl(
- applicationScope.getApplication(),
+ getApplicationScope().getApplication(),
CpNamingUtils.ALL_TYPES);
batch.deindex( allTypesIndexScope, entity );
@@ -967,16 +963,16 @@ public class CpEntityManager implements EntityManager {
String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() );
CollectionScope collectionScope = new CollectionScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
+ getApplicationScope().getApplication(),
+ getApplicationScope().getApplication(),
collectionName );
IndexScope defaultIndexScope = new IndexScopeImpl(
- applicationScope.getApplication(),
+ getApplicationScope().getApplication(),
CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
- EntityIndex ei = managerCache.getEntityIndex( applicationScope );
+ EntityIndex ei = managerCache.getEntityIndex(getApplicationScope());
Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
@@ -2506,8 +2502,8 @@ public class CpEntityManager implements EntityManager {
// prepare to write and index Core Persistence Entity into default scope
CollectionScope collectionScope = new CollectionScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
+ getApplicationScope().getApplication(),
+ getApplicationScope().getApplication(),
CpNamingUtils.getCollectionScopeNameFromEntityType( eType ) );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@ -2772,7 +2768,7 @@ public class CpEntityManager implements EntityManager {
emf.refreshIndex();
// refresh this Entity Manager's application's index
- EntityIndex ei = managerCache.getEntityIndex( applicationScope );
+ EntityIndex ei = managerCache.getEntityIndex(getApplicationScope());
ei.refresh();
}
@@ -2795,6 +2791,7 @@ public class CpEntityManager implements EntityManager {
return cpEntity;
}
+
@Override
public void flushManagerCaches() {
managerCache.flush();
@@ -2804,192 +2801,94 @@ public class CpEntityManager implements EntityManager {
/**
* Completely reindex the application associated with this EntityManager.
*/
- public void reindex( EntityManagerFactory.ProgressObserver po ) throws Exception {
-
- Stack stack = new Stack();
- stack.push( getApplication() );
-
- indexEntityConnectionsAndCollections( getApplication(), po, stack );
- }
-
-
- /**
- * Recursively index (or reindex) all of the collections and connections of a
- * specified entity, and all of the collected and connected entities as well.
- */
- private void indexEntityConnectionsAndCollections( final EntityRef entity,
- final EntityManagerFactory.ProgressObserver po, final Stack stack ) {
-
- final GraphManager gm = managerCache.getGraphManager(applicationScope);
+ public void reindex( final EntityManagerFactory.ProgressObserver po ) throws Exception {
- final Id fromEntityId = new SimpleId( entity.getUuid(), entity.getType() );
+ CpWalker walker = new CpWalker();
- logger.debug("Loading edges types from {}:{}\n scope {}:{}",
- new Object[] { entity.getType(), entity.getUuid(),
- applicationScope.getApplication().getType(),
- applicationScope.getApplication().getUuid() } );
-
- Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
- new SimpleSearchEdgeType( fromEntityId, null , null ));
-
- edgeTypes.forEach( new Action1<String>() {
+ walker.walkCollections( this, application, new CpVisitor() {
@Override
- public void call( final String edgeType ) {
-
- logger.debug("Loading edges of edgeType {} from {}:{}\n scope {}:{}",
- new Object[] { edgeType, entity.getType(), entity.getUuid(),
- applicationScope.getApplication().getType(),
- applicationScope.getApplication().getUuid() } );
-
- Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType(
- fromEntityId, edgeType, Long.MAX_VALUE,
- SearchByEdgeType.Order.DESCENDING, null ));
-
- edges.forEach( new Action1<Edge>() {
-
- @Override
- public void call( Edge edge ) {
-
- if ( CpNamingUtils.isCollectionEdgeType( edge.getType() )) {
-
- String collName = CpNamingUtils.getCollectionName( edgeType );
- String memberType = edge.getTargetNode().getType();
-
- CollectionScope collScope = new CollectionScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( entity.getType()));
- EntityCollectionManager collMgr =
- managerCache.getEntityCollectionManager(collScope);
-
- org.apache.usergrid.persistence.model.entity.Entity collEntity =
- collMgr.load( edge.getSourceNode() ).toBlockingObservable().last();
-
- if (collEntity == null) {
- logger.warn("(Empty collection?) Failed to load collection entity "
- + "{}:{} from scope\n app {}\n owner {}\n name {}",
- new Object[]{
- edge.getSourceNode().getType(),
- edge.getSourceNode().getUuid(),
- collScope.getApplication(),
- collScope.getOwner(),
- collScope.getName()
- });
- return;
- }
-
- CollectionScope memberScope = new CollectionScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( memberType ));
- EntityCollectionManager memberMgr =
- managerCache.getEntityCollectionManager(memberScope);
-
- org.apache.usergrid.persistence.model.entity.Entity memberEntity =
- memberMgr.load( edge.getTargetNode()).toBlockingObservable().last();
-
- if (memberEntity == null) {
- logger.warn("(Empty collection?) Failed to load member entity "
- + "{}:{} from scope\n app {}\n owner {}\n name {}",
- new Object[]{
- edge.getTargetNode().getType(),
- edge.getTargetNode().getUuid(),
- memberScope.getApplication(),
- memberScope.getOwner(),
- memberScope.getName()
- });
- return;
- }
-
- indexEntityIntoCollections( collEntity, memberEntity, collName, true );
-
- EntityRef ref = new SimpleEntityRef(
- memberEntity.getId().getType(), memberEntity.getId().getUuid());
- po.onProgress( entity, ref, edge.getType());
-
- // recursion
- if ( !stack.contains( ref )) {
- stack.push( ref );
- indexEntityConnectionsAndCollections( ref, po, stack );
- stack.pop();
- }
-
-
- } else if ( CpNamingUtils.isConnectionEdgeType( edge.getType() )) {
-
- String connType = CpNamingUtils.getConnectionType( edgeType );
- String targetEntityType = edge.getTargetNode().getType();
- String sourceEntityType = entity.getType();
-
- CollectionScope sourceScope = new CollectionScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( sourceEntityType ));
- EntityCollectionManager sourceEcm =
- managerCache.getEntityCollectionManager(sourceScope);
-
- org.apache.usergrid.persistence.model.entity.Entity sourceEntity =
- sourceEcm.load( edge.getSourceNode() ).toBlockingObservable().last();
-
- if (sourceEntity == null) {
- logger.warn("(Empty connection?) Failed to load source entity "
- + "{}:{} from scope\n app {}\n owner {}\n name {}",
- new Object[]{
- edge.getSourceNode().getType(),
- edge.getSourceNode().getUuid(),
- sourceScope.getApplication(),
- sourceScope.getOwner(),
- sourceScope.getName()
- });
- return;
- }
+ public void visitCollectionEntry(
+ EntityCollectionManager ecm,
+ String collectionName,
+ org.apache.usergrid.persistence.model.entity.Entity collectionEntity,
+ org.apache.usergrid.persistence.model.entity.Entity memberEntity) {
+
+ EntityRef source = new SimpleEntityRef(
+ collectionEntity.getId().getType(), collectionEntity.getId().getUuid() );
+ EntityRef target = new SimpleEntityRef(
+ memberEntity.getId().getType(), memberEntity.getId().getUuid() );
+ po.onProgress(source, target, "dummy");
+
+ indexEntityIntoCollection(
+ collectionEntity, memberEntity, collectionName );
+ }
- CollectionScope targetScope = new CollectionScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( targetEntityType ));
- EntityCollectionManager targetEcm =
- managerCache.getEntityCollectionManager(targetScope);
-
- org.apache.usergrid.persistence.model.entity.Entity targetEntity =
- targetEcm.load( edge.getTargetNode() ).toBlockingObservable().last();
-
- if (targetEntity == null) {
- logger.warn("(Empty connection?) Failed to load target entity "
- + "{}:{} from scope\n app {}\n owner {}\n name {}",
- new Object[]{
- edge.getTargetNode().getType(),
- edge.getTargetNode().getUuid(),
- targetScope.getApplication(),
- targetScope.getOwner(),
- targetScope.getName()
- });
- return;
- }
+ @Override
+ public void visitConnectionEntry(
+ EntityCollectionManager ecm,
+ String connectionType,
+ org.apache.usergrid.persistence.model.entity.Entity sourceEntity,
+ org.apache.usergrid.persistence.model.entity.Entity targetEntity) {
+
+ EntityRef source = new SimpleEntityRef(
+ sourceEntity.getId().getType(), sourceEntity.getId().getUuid() );
+ EntityRef target = new SimpleEntityRef(
+ targetEntity.getId().getType(), targetEntity.getId().getUuid() );
+ po.onProgress(source, target, "dummy");
+
+ indexEntityIntoConnection(
+ sourceEntity, targetEntity, connectionType );
+ }
+ });
+ }
- indexEntityIntoConnection(
- sourceEntity, targetEntity, targetEntityType, connType );
- EntityRef ref = new SimpleEntityRef(
- targetEntity.getId().getType(), targetEntity.getId().getUuid());
- po.onProgress( entity, ref, edge.getType());
+ /**
+ * Completely repersist the application associated with this EntityManager.
+ */
+ public void repersistApplication(
+ final UUID appId, final EntityManagerFactory.ProgressObserver po ) throws Exception {
- // recursion
- if ( !stack.contains( ref )) {
- stack.push( ref );
- indexEntityConnectionsAndCollections( ref, po, stack );
- stack.pop();
- }
- }
- }
+ CpWalker walker = new CpWalker();
- }); // end foreach on edges
+ walker.walkCollections( this, application, new CpVisitor() {
+ @Override
+ public void visitCollectionEntry(
+ EntityCollectionManager ecm,
+ String collectionName,
+ org.apache.usergrid.persistence.model.entity.Entity collectionEntity,
+ org.apache.usergrid.persistence.model.entity.Entity memberEntity) {
+
+ EntityRef source = new SimpleEntityRef(
+ collectionEntity.getId().getType(), collectionEntity.getId().getUuid() );
+ EntityRef target = new SimpleEntityRef(
+ memberEntity.getId().getType(), memberEntity.getId().getUuid() );
+ po.onProgress(source, target, "dummy");
+
+ ecm.write( memberEntity).toBlocking().last();
+
+ indexEntityIntoCollection(
+ collectionEntity, memberEntity, collectionName );
}
- }); // end foreach on edgeTypes
-
+ @Override
+ public void visitConnectionEntry(
+ EntityCollectionManager ecm,
+ String connectionType,
+ org.apache.usergrid.persistence.model.entity.Entity sourceEntity,
+ org.apache.usergrid.persistence.model.entity.Entity targetEntity) {
+
+ EntityRef source = new SimpleEntityRef(
+ sourceEntity.getId().getType(), sourceEntity.getId().getUuid() );
+ EntityRef target = new SimpleEntityRef(
+ targetEntity.getId().getType(), targetEntity.getId().getUuid() );
+ po.onProgress(source, target, "dummy");
+
+ ecm.write( targetEntity).toBlocking().last();
+ }
+ });
}
@@ -3022,7 +2921,6 @@ public class CpEntityManager implements EntityManager {
void indexEntityIntoConnection(
org.apache.usergrid.persistence.model.entity.Entity sourceEntity,
org.apache.usergrid.persistence.model.entity.Entity targetEntity,
- String targetEntityType,
String connType) {
logger.debug("Indexing into connection {} source {}:{} target {}:{}", new Object[] {
@@ -3030,16 +2928,13 @@ public class CpEntityManager implements EntityManager {
targetEntity.getId().getType(), targetEntity.getId().getUuid() });
- final EntityIndex ei = getManagerCache().getEntityIndex( applicationScope );
+ final EntityIndex ei = getManagerCache().getEntityIndex(getApplicationScope());
final EntityIndexBatch batch = ei.createBatch();
-
-
-
// Index the new connection in app|source|type context
IndexScope indexScope = new IndexScopeImpl(
sourceEntity.getId(),
- CpNamingUtils.getConnectionScopeName( targetEntityType, connType ));
+ CpNamingUtils.getConnectionScopeName( targetEntity.getId().getType(), connType ));
batch.index(indexScope, targetEntity);
// Index the new connection in app|scope|all-types context
@@ -3058,7 +2953,7 @@ public class CpEntityManager implements EntityManager {
org.apache.usergrid.persistence.model.entity.Entity memberEntity,
String collName) {
- final EntityIndex ei = getManagerCache().getEntityIndex( applicationScope );
+ final EntityIndex ei = getManagerCache().getEntityIndex(getApplicationScope());
final EntityIndexBatch batch = ei.createBatch();
// index member into entity collection | type scope
@@ -3077,13 +2972,14 @@ public class CpEntityManager implements EntityManager {
// index member into application | all-types scope
IndexScope appAllTypesScope = new IndexScopeImpl(
- applicationScope.getApplication(),
+ getApplicationScope().getApplication(),
CpNamingUtils.ALL_TYPES);
batch.index(appAllTypesScope, memberEntity);
batch.execute();
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb7cfadc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index f44eac3..02c2c24 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -560,16 +560,16 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
// system app
- managerCache.getEntityIndex( new ApplicationScopeImpl( new SimpleId( SYSTEM_APP_ID, "application" ) ) )
- .refresh();
+ managerCache.getEntityIndex( new ApplicationScopeImpl(
+ new SimpleId( SYSTEM_APP_ID, "application" ) ) ).refresh();
// default app
- managerCache.getEntityIndex( new ApplicationScopeImpl( new SimpleId( getManagementAppId(), "application" ) ) )
- .refresh();
+ managerCache.getEntityIndex( new ApplicationScopeImpl(
+ new SimpleId( getManagementAppId(), "application" ) ) ).refresh();
// management app
- managerCache.getEntityIndex( new ApplicationScopeImpl( new SimpleId( getDefaultAppId(), "application" ) ) )
- .refresh();
+ managerCache.getEntityIndex( new ApplicationScopeImpl(
+ new SimpleId( getDefaultAppId(), "application" ) ) ).refresh();
}
@@ -590,7 +590,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
@Override
- public void rebuildInternalIndexes(ProgressObserver po) throws Exception {
+ public void rebuildInternalIndexes( ProgressObserver po ) throws Exception {
rebuildApplicationIndexes(SYSTEM_APP_ID, po);
}
@@ -622,4 +622,25 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
throw new UnsupportedOperationException( "Not supported yet." );
}
+ @Override
+ public void repersistApplication( UUID appId, ProgressObserver po ) throws Exception {
+ EntityManager em = getEntityManager( appId );
+ em.repersistApplication( appId, po );
+ }
+
+ @Override
+ public void repersistAll( ProgressObserver po ) throws Exception {
+
+ logger.info("\n\nRepersisting all Entities\n");
+
+ Map<String, UUID> appMap = getApplications();
+
+ logger.info("About to repersist entities for {} applications", appMap.keySet().size());
+
+ for ( UUID appUuid : appMap.values() ) {
+ repersistApplication( appUuid, po );
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb7cfadc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpVisitor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpVisitor.java
new file mode 100644
index 0000000..02d13c1
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpVisitor.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.corepersistence;
+
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/**
+ * Interface for classes that need to visit all collections, connections and entities.
+ */
+public interface CpVisitor {
+
+ public void visitCollectionEntry(
+ EntityCollectionManager ecm, String collName, Entity collEntity, Entity memberEntity );
+
+ public void visitConnectionEntry(
+ EntityCollectionManager ecm, String connType, Entity sourceEntity, Entity targetEntity );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb7cfadc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
new file mode 100644
index 0000000..584f9a8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2014 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.corepersistence;
+
+import java.util.Stack;
+import java.util.logging.Level;
+import org.apache.usergrid.persistence.EntityRef;
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.functions.Action1;
+
+
+/**
+ * Takes a visitor to all collections and entities.
+ */
+public class CpWalker {
+
+ private static final Logger logger = LoggerFactory.getLogger( CpWalker.class );
+
+ private long writeDelayMs = 100;
+
+
+ public void walkCollections( final CpEntityManager em, final EntityRef start,
+ final CpVisitor visitor ) throws Exception {
+
+ Stack stack = new Stack();
+ Id appId = new SimpleId( em.getApplicationId(), TYPE_APPLICATION );
+ stack.push( appId );
+
+ doWalkCollections(em, new SimpleId(start.getUuid(), start.getType()), visitor, new Stack());
+ }
+
+
+ private void doWalkCollections( final CpEntityManager em, final Id start,
+ final CpVisitor visitor, final Stack stack ) {
+
+ final Id fromEntityId = new SimpleId( start.getUuid(), start.getType() );
+
+ final ApplicationScope applicationScope = em.getApplicationScope();
+
+ final GraphManager gm = em.getManagerCache().getGraphManager(applicationScope);
+
+ logger.debug("Loading edges types from {}:{}\n scope {}:{}",
+ new Object[] { start.getType(), start.getUuid(),
+ applicationScope.getApplication().getType(),
+ applicationScope.getApplication().getUuid()
+ });
+
+ Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
+ new SimpleSearchEdgeType( fromEntityId, null , null ));
+
+ edgeTypes.forEach( new Action1<String>() {
+
+ @Override
+ public void call( final String edgeType ) {
+
+ try {
+ Thread.sleep( writeDelayMs );
+ } catch ( InterruptedException ignored ) {}
+
+ logger.debug("Loading edges of edgeType {} from {}:{}\n scope {}:{}",
+ new Object[] { edgeType, start.getType(), start.getUuid(),
+ applicationScope.getApplication().getType(),
+ applicationScope.getApplication().getUuid()
+ });
+
+ Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType(
+ fromEntityId, edgeType, Long.MAX_VALUE,
+ SearchByEdgeType.Order.DESCENDING, null ));
+
+ edges.forEach( new Action1<Edge>() {
+
+ @Override
+ public void call( Edge edge ) {
+
+ if ( CpNamingUtils.isCollectionEdgeType( edge.getType() )) {
+
+ String collName = CpNamingUtils.getCollectionName( edgeType );
+ String memberType = edge.getTargetNode().getType();
+
+ CollectionScope collScope = new CollectionScopeImpl(
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromEntityType( start.getType()));
+ EntityCollectionManager collMgr =
+ em.getManagerCache().getEntityCollectionManager(collScope);
+
+ org.apache.usergrid.persistence.model.entity.Entity collEntity =
+ collMgr.load( edge.getSourceNode() ).toBlockingObservable().last();
+
+ if (collEntity == null) {
+ logger.warn("(Empty collection?) Failed to load collection entity "
+ + "{}:{} from scope\n app {}\n owner {}\n name {}",
+ new Object[]{
+ edge.getSourceNode().getType(),
+ edge.getSourceNode().getUuid(),
+ collScope.getApplication(),
+ collScope.getOwner(),
+ collScope.getName()
+ });
+ return;
+ }
+
+ CollectionScope memberScope = new CollectionScopeImpl(
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromEntityType( memberType ));
+ EntityCollectionManager memberMgr =
+ em.getManagerCache().getEntityCollectionManager(memberScope);
+
+ org.apache.usergrid.persistence.model.entity.Entity memberEntity =
+ memberMgr.load( edge.getTargetNode()).toBlockingObservable().last();
+
+ if (memberEntity == null) {
+ logger.warn("(Empty collection?) Failed to load member entity "
+ + "{}:{} from scope\n app {}\n owner {}\n name {}",
+ new Object[]{
+ edge.getTargetNode().getType(),
+ edge.getTargetNode().getUuid(),
+ memberScope.getApplication(),
+ memberScope.getOwner(),
+ memberScope.getName()
+ });
+ return;
+ }
+
+ visitor.visitCollectionEntry(
+ memberMgr, collName, collEntity, memberEntity );
+
+ // recursion
+ if ( !stack.contains( memberEntity.getId() )) {
+ stack.push( memberEntity.getId() );
+ doWalkCollections( em, memberEntity.getId(), visitor, stack );
+ stack.pop();
+ }
+
+
+ } else if ( CpNamingUtils.isConnectionEdgeType( edge.getType() )) {
+
+ String connType = CpNamingUtils.getConnectionType( edgeType );
+ String targetEntityType = edge.getTargetNode().getType();
+ String sourceEntityType = start.getType();
+
+ CollectionScope sourceScope = new CollectionScopeImpl(
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromEntityType(sourceEntityType));
+ EntityCollectionManager sourceEcm =
+ em.getManagerCache().getEntityCollectionManager(sourceScope);
+
+ org.apache.usergrid.persistence.model.entity.Entity sourceEntity =
+ sourceEcm.load( edge.getSourceNode() ).toBlockingObservable().last();
+
+ if (sourceEntity == null) {
+ logger.warn("(Empty connection?) Failed to load source entity "
+ + "{}:{} from scope\n app {}\n owner {}\n name {}",
+ new Object[]{
+ edge.getSourceNode().getType(),
+ edge.getSourceNode().getUuid(),
+ sourceScope.getApplication(),
+ sourceScope.getOwner(),
+ sourceScope.getName()
+ });
+ return;
+ }
+
+ CollectionScope targetScope = new CollectionScopeImpl(
+ applicationScope.getApplication(),
+ applicationScope.getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromEntityType(targetEntityType));
+ EntityCollectionManager targetEcm =
+ em.getManagerCache().getEntityCollectionManager(targetScope);
+
+ org.apache.usergrid.persistence.model.entity.Entity targetEntity =
+ targetEcm.load( edge.getTargetNode() ).toBlockingObservable().last();
+
+ if (targetEntity == null) {
+ logger.warn("(Empty connection?) Failed to load target entity "
+ + "{}:{} from scope\n app {}\n owner {}\n name {}",
+ new Object[]{
+ edge.getTargetNode().getType(),
+ edge.getTargetNode().getUuid(),
+ targetScope.getApplication(),
+ targetScope.getOwner(),
+ targetScope.getName()
+ });
+ return;
+ }
+
+ visitor.visitConnectionEntry(
+ targetEcm, connType, sourceEntity, targetEntity );
+
+ // recursion
+ if ( !stack.contains( targetEntity.getId() )) {
+ stack.push( targetEntity.getId() );
+ doWalkCollections( em, targetEntity.getId(), visitor, stack );
+ stack.pop();
+ }
+ }
+ }
+
+ }); // end foreach on edges
+
+ }
+
+ }); // end foreach on edgeTypes
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb7cfadc/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
index 54a5dee..adc0945 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
@@ -163,4 +163,14 @@ public class HybridEntityManagerFactory implements EntityManagerFactory, Applica
public void rebuildCollectionIndex(UUID appId, String collection, ProgressObserver po) {
factory.rebuildCollectionIndex(appId, collection, po);
}
+
+ @Override
+ public void repersistAll( ProgressObserver po ) throws Exception {
+ factory.repersistAll( po );
+ }
+
+ @Override
+ public void repersistApplication(UUID appId, ProgressObserver po) throws Exception {
+ factory.repersistApplication( appId, po );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb7cfadc/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index cf53e50..32595ff 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -688,4 +688,9 @@ public interface EntityManager {
/** For testing purposes */
public void flushManagerCaches();
+
+ public void repersistApplication(
+ final UUID appId, final EntityManagerFactory.ProgressObserver po ) throws Exception;
+
+ public void reindex( final EntityManagerFactory.ProgressObserver po ) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb7cfadc/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
index 06c3114..714db51 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
@@ -130,6 +130,10 @@ public interface EntityManagerFactory {
public void rebuildCollectionIndex(UUID appId, String collection, ProgressObserver object);
+ public void repersistAll( ProgressObserver po ) throws Exception;
+
+ public void repersistApplication( UUID appId, ProgressObserver po ) throws Exception;
+
public interface ProgressObserver {
public void onProgress( EntityRef source, EntityRef target, String edgeType );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb7cfadc/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
index 399bccd..26e55aa 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
@@ -438,4 +438,14 @@ public class EntityManagerFactoryImpl implements EntityManagerFactory, Applicati
public void rebuildCollectionIndex(UUID appId, String collection, ProgressObserver po) {
throw new UnsupportedOperationException("Not supported.");
}
+
+ @Override
+ public void repersistAll( ProgressObserver po ) {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ public void repersistApplication(UUID appId, ProgressObserver po) throws Exception {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb7cfadc/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerImpl.java
index df201f9..9c42c47 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerImpl.java
@@ -2917,4 +2917,14 @@ public class EntityManagerImpl implements EntityManager {
// no-op
}
+ @Override
+ public void repersistApplication(UUID appId, EntityManagerFactory.ProgressObserver po) throws Exception {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ public void reindex(EntityManagerFactory.ProgressObserver po) throws Exception {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb7cfadc/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRepersistTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRepersistTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRepersistTest.java
new file mode 100644
index 0000000..f1be245
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRepersistTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence;
+
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.Application;
+import org.apache.usergrid.CoreApplication;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Slf4jReporter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.usergrid.cassandra.Concurrent;
+import org.apache.usergrid.persistence.index.query.Query;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+//@RunWith(JukitoRunner.class)
+//@UseModules({ GuiceModule.class })
+@Concurrent()
+public class PerformanceEntityRepersistTest extends AbstractCoreIT {
+ private static final Logger logger =
+ LoggerFactory.getLogger(PerformanceEntityRepersistTest.class );
+
+ private static final MetricRegistry registry = new MetricRegistry();
+ private Slf4jReporter reporter;
+
+ private static final long RUNTIME = TimeUnit.MINUTES.toMillis( 1 );
+
+ private static final long writeDelayMs = 100;
+ //private static final long readDelayMs = 7;
+
+ @Rule
+ public Application app = new CoreApplication( setup );
+
+
+ @Before
+ public void startReporting() {
+
+ logger.debug("Starting metrics reporting");
+ reporter = Slf4jReporter.forRegistry( registry ).outputTo( logger )
+ .convertRatesTo( TimeUnit.SECONDS )
+ .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
+
+ reporter.start( 10, TimeUnit.SECONDS );
+ }
+
+
+ @After
+ public void printReport() {
+
+ logger.debug("Printing metrics report");
+ reporter.report();
+ reporter.stop();
+ }
+
+
+ @Test
+ public void repersistAll() throws Exception {
+
+ logger.info("Started repersistAll()");
+
+ final EntityManager em = app.getEntityManager();
+
+ // ----------------- create a bunch of entities
+
+ final long stopTime = System.currentTimeMillis() + RUNTIME;
+
+ Map<String, Object> entityMap = new HashMap<String, Object>() {{
+ put("key1", 1000 );
+ put("key2", 2000 );
+ put("key3", "Some value");
+ }};
+ Map<String, Object> cat1map = new HashMap<String, Object>() {{
+ put("name", "enzo");
+ put("color", "orange");
+ }};
+ Map<String, Object> cat2map = new HashMap<String, Object>() {{
+ put("name", "marquee");
+ put("color", "grey");
+ }};
+ Map<String, Object> cat3map = new HashMap<String, Object>() {{
+ put("name", "bertha");
+ put("color", "tabby");
+ }};
+
+ Entity cat1 = em.create("cat", cat1map );
+ Entity cat2 = em.create("cat", cat2map );
+ Entity cat3 = em.create("cat", cat3map );
+
+ List<EntityRef> entityRefs = new ArrayList<EntityRef>();
+ int entityCount = 0;
+ while ( System.currentTimeMillis() < stopTime ) {
+
+ final Entity entity;
+
+ try {
+ entityMap.put("key", entityCount );
+ entity = em.create("testType", entityMap );
+
+ em.refreshIndex();
+
+ em.createConnection(entity, "herds", cat1);
+ em.createConnection(entity, "herds", cat2);
+ em.createConnection(entity, "herds", cat3);
+
+ } catch (Exception ex) {
+ throw new RuntimeException("Error creating entity", ex);
+ }
+
+ entityRefs.add(new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+ if ( entityCount % 100 == 0 ) {
+ logger.info("Created {} entities", entityCount );
+ }
+
+ entityCount++;
+ try { Thread.sleep( writeDelayMs ); } catch (InterruptedException ignored ) {}
+ }
+
+ logger.info("Created {} entities", entityCount);
+ em.refreshIndex();
+
+ // ----------------- test that we can read them, should work fine
+
+ logger.debug("Read the data");
+ readData("testTypes", entityCount );
+
+ // ----------------- repersist all
+
+ logger.debug("Preparing to repersist all");;
+
+ final String meterName = this.getClass().getSimpleName() + ".repersist";
+ final Meter meter = registry.meter( meterName );
+
+ EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+ int counter = 0;
+ @Override
+ public void onProgress( EntityRef s, EntityRef t, String etype ) {
+ meter.mark();
+ logger.debug("Repersisting from {}:{} to {}:{} ", new Object[] {
+ s.getType(), s.getUuid(), t.getType(), t.getUuid(), etype });
+ if ( !logger.isDebugEnabled() && counter % 100 == 0 ) {
+ logger.info("Repersisted {} entities", counter );
+ }
+ counter++;
+ }
+ };
+
+ try {
+ setup.getEmf().repersistAll(po );
+
+ registry.remove( meterName );
+ logger.info("Repersist complete");
+
+ } catch (Exception ex) {
+ logger.error("Error repersisting", ex);
+ fail();
+ }
+ em.refreshIndex();
+
+ // ----------------- test that we can read them
+
+ readData( "testTypes", entityCount );
+ }
+
+
+ private int readData( String collectionName ) throws Exception {
+ return readData( collectionName, -1 );
+ }
+
+
+ private int readData( String collectionName, int expected ) throws Exception {
+
+ EntityManager em = app.getEntityManager();
+
+ Query q = Query.fromQL("select * where key1=1000");
+ q.setLimit(40);
+ Results results = em.searchCollection( em.getApplicationRef(), collectionName, q );
+
+ int count = 0;
+ while ( true ) {
+
+ for ( Entity e : results.getEntities() ) {
+
+ assertEquals( 2000, e.getProperty("key2"));
+
+ Results catResults = em.searchConnectedEntities(e, Query.fromQL("select *"));
+ assertEquals( 3, catResults.size() );
+
+ if ( count % 100 == 0 ) {
+ logger.info( "read {} entities", count);
+ }
+ count++;
+ }
+
+ if ( results.hasCursor() ) {
+ logger.info( "Counted {} : query again with cursor", count);
+ q.setCursor( results.getCursor() );
+ results = em.searchCollection( em.getApplicationRef(), collectionName, q );
+
+ } else {
+ break;
+ }
+ }
+
+ if ( expected != -1 && expected != count ) {
+ throw new RuntimeException("Did not get expected "
+ + expected + " entities, instead got " + count );
+ }
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb7cfadc/stack/tools/src/main/java/org/apache/usergrid/tools/RepersistAll.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/RepersistAll.java b/stack/tools/src/main/java/org/apache/usergrid/tools/RepersistAll.java
new file mode 100644
index 0000000..4c63f61
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/RepersistAll.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.utils.UUIDUtils;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.EntityRef;
+
+
+/**
+ * Index rebuild utility for Usergrid. Can be used to rebuild the index for a specific
+ * application, a specific application's collection or for an entire Usergrid system.
+ */
+public class RepersistAll extends ToolBase {
+
+ private static final int PAGE_SIZE = 100;
+
+
+ private static final Logger logger = LoggerFactory.getLogger(RepersistAll.class );
+
+
+ @Override
+ @SuppressWarnings("static-access")
+ public Options createOptions() {
+
+ Option hostOpt = OptionBuilder.withArgName( "host" ).hasArg().isRequired( true )
+ .withDescription( "Cassandra host" ).create( "host" );
+
+ Option esHostsOpt = OptionBuilder.withArgName( "host" ).hasArg().isRequired( true )
+ .withDescription( "ElasticSearch host" ).create( "eshost" );
+
+ Option esClusterOpt = OptionBuilder.withArgName( "host" ).hasArg().isRequired( true )
+ .withDescription( "ElasticSearch cluster name" ).create( "escluster" );
+
+ Options options = new Options();
+ options.addOption( hostOpt );
+ options.addOption( esHostsOpt );
+ options.addOption( esClusterOpt );
+
+ return options;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine)
+ */
+ @Override
+ public void runTool( CommandLine line ) throws Exception {
+ startSpring();
+
+ logger.info( "Starting index rebuild" );
+
+ EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+ @Override
+ public void onProgress(EntityRef s, EntityRef t, String etype) {
+ logger.info("Repersisting from {}:{} to {}:{}", new Object[] {
+ s.getType(), s.getUuid(), t.getType(), t.getUuid(), etype });
+ }
+ };
+
+ emf.rebuildAllIndexes( po );
+
+ logger.info( "Finished index rebuild" );
+ }
+}