You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/20 15:46:40 UTC

[12/50] [abbrv] git commit: Work in progress on refactoring visitor to only update entities since connections are handled internally

Work in progress on refactoring visitor to only update entities since connections are handled internally


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/163fa9ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/163fa9ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/163fa9ad

Branch: refs/heads/two-dot-o-events
Commit: 163fa9adf163226521d5724dce1cdac974de3b38
Parents: 9da529b
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 16 11:39:12 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 16 11:39:12 2014 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  83 ++--------
 .../corepersistence/CpEntityManagerFactory.java |  22 +--
 .../usergrid/corepersistence/CpVisitor.java     |  11 +-
 .../usergrid/corepersistence/CpWalker.java      | 155 +++++++------------
 .../persistence/EntityManagerFactory.java       |   8 +-
 .../org/apache/usergrid/CoreApplication.java    |   1 +
 .../PerformanceEntityRebuildIndexTest.java      |  20 ++-
 .../apache/usergrid/rest/SystemResource.java    |  47 +++---
 8 files changed, 135 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 8753670..4015014 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -109,6 +109,7 @@ import static org.apache.usergrid.persistence.cassandra.Serializers.ue;
 import org.apache.usergrid.persistence.cassandra.util.TraceParticipant;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerifyException;
 import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -2869,94 +2870,32 @@ public class CpEntityManager implements EntityManager {
      */
     public void reindex( final EntityManagerFactory.ProgressObserver po ) throws Exception {
 
-        CpWalker walker = new CpWalker();
+        CpWalker walker = new CpWalker(po.getWriteDelayTime());
 
         walker.walkCollections( this, application, new CpVisitor() {
 
             @Override
             public void visitCollectionEntry(
-                    EntityManager em, String collName, Entity source, Entity target) {
+                    EntityManager em, String collName, Entity entity) {
 
                 try {
-                    em.update( target);
-                    po.onProgress(source, target, collName);
-
-                } catch (Exception ex) {
-                    logger.error("Error repersisting entity", ex);
+                    em.update( entity);
+                    po.onProgress(entity);
                 }
-            }
-
-            @Override
-            public void visitConnectionEntry(
-                    EntityManager em, String connType, Entity source, Entity target) {
-
-                try {
-                    em.update( target);
-                    po.onProgress(source, target, connType);
-
-                } catch (Exception ex) {
+                catch(WriteOptimisticVerifyException wo ){
+                    //swallow this, it just means this was already updated, which accomplishes our task.  Just ignore.
+                    logger.warn( "Someone beat us to updating entity {} in collection {}.  Ignoring.", entity.getName(), collName );
+                }
+                catch (Exception ex) {
                     logger.error("Error repersisting entity", ex);
                 }
             }
 
-        });
-    }
-
-
-    private void indexEntityIntoCollections( 
-            org.apache.usergrid.persistence.model.entity.Entity collectionEntity, 
-            org.apache.usergrid.persistence.model.entity.Entity memberEntity, 
-            String collName, 
-            boolean connectBack ) {
-
-        logger.debug("Indexing into collections {} {}:{} member {}:{}", new Object[] { 
-            collName, collectionEntity.getId().getType(), collectionEntity.getId().getUuid(),
-            memberEntity.getId().getType(), memberEntity.getId().getUuid() });
-
-        indexEntityIntoCollection( collectionEntity, memberEntity, collName);
 
-        CollectionInfo collection = getDefaultSchema()
-                .getCollection( memberEntity.getId().getType(), collName);
-
-        if (connectBack && collection != null && collection.getLinkedCollection() != null) {
-
-            logger.debug("Linking back from entity in collection {} to collection {}", 
-                collection.getName(), collection.getLinkedCollection());
-
-            indexEntityIntoCollections( 
-                memberEntity, collectionEntity, collection.getLinkedCollection(), false );
-        }
+        });
     }
 
 
-    void indexEntityIntoConnection(
-            org.apache.usergrid.persistence.model.entity.Entity sourceEntity,
-            org.apache.usergrid.persistence.model.entity.Entity targetEntity,
-            String connType) {
-
-        logger.debug("Indexing into connection {} source {}:{} target {}:{}", new Object[] { 
-            connType, sourceEntity.getId().getType(), sourceEntity.getId().getUuid(),
-            targetEntity.getId().getType(), targetEntity.getId().getUuid() });
-
-
-        final EntityIndex ei = getManagerCache().getEntityIndex(getApplicationScope());
-        final EntityIndexBatch batch = ei.createBatch();
-
-        // Index the new connection in app|source|type context
-        IndexScope indexScope = new IndexScopeImpl(
-                sourceEntity.getId(),
-                CpNamingUtils.getConnectionScopeName( targetEntity.getId().getType(), connType ));
-        batch.index(indexScope, targetEntity);
-        
-        // Index the new connection in app|scope|all-types context
-        IndexScope allTypesIndexScope = new IndexScopeImpl(
-                sourceEntity.getId(),
-                CpNamingUtils.ALL_TYPES);
-
-        batch.index(allTypesIndexScope, targetEntity);
-
-        batch.execute();
-    }
 
 
     void indexEntityIntoCollection(

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 2abe83f..8ab0b7f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -101,7 +101,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     public static final  UUID DEFAULT_APPLICATION_ID =
             UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c9");
 
-    private static AtomicBoolean INIT_SYSTEM = new AtomicBoolean(  );
+    private AtomicBoolean init_indexes = new AtomicBoolean(  );
 
 
     // cache of already instantiated entity managers
@@ -201,6 +201,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     private EntityManager _getEntityManager( UUID applicationId ) {
         EntityManager em = new CpEntityManager();
         em.init( this, applicationId );
+        //TODO T.N. Can we remove this?  Seems like we should fix our lifecycle instead...
+        em.createIndex();
         return em;
     }
 
@@ -628,7 +630,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
     private void maybeCreateIndexes() {
         // system app
-        if ( INIT_SYSTEM.getAndSet( true ) ) {
+        if ( init_indexes.getAndSet( true ) ) {
             return;
         }
 
@@ -641,16 +643,16 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     private List<EntityIndex> getManagementIndexes() {
 
         return Arrays.asList(
-                getManagerCache().getEntityIndex( new ApplicationScopeImpl(
-                        new SimpleId( SYSTEM_APP_ID, "application" ) ) ),
+                getManagerCache().getEntityIndex(
+                        new ApplicationScopeImpl( new SimpleId( SYSTEM_APP_ID, "application" ) ) ),
 
                 // default app
-               getManagerCache().getEntityIndex( new ApplicationScopeImpl(
-                        new SimpleId( getManagementAppId(), "application" ) ) ),
+               getManagerCache().getEntityIndex(
+                       new ApplicationScopeImpl( new SimpleId( getManagementAppId(), "application" ) ) ),
 
                 // management app
-               getManagerCache().getEntityIndex( new ApplicationScopeImpl(
-                        new SimpleId( getDefaultAppId(), "application" ) ) ) );
+               getManagerCache().getEntityIndex(
+                       new ApplicationScopeImpl( new SimpleId( getDefaultAppId(), "application" ) ) ) );
     }
 
 
@@ -682,8 +684,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         EntityManager em = getEntityManager( appId );
         Application app = em.getApplication();
 
-        ((CpEntityManager)em).reindex( po );
-        em.refreshIndex();
+        em.reindex( po );
+//        em.refreshIndex();
 
         logger.info("\n\nRebuilt index for application {} id {}\n", app.getName(), appId );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpVisitor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpVisitor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpVisitor.java
index 5d32235..aa06744 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpVisitor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpVisitor.java
@@ -24,9 +24,12 @@ import org.apache.usergrid.persistence.EntityManager;
  */
 public interface CpVisitor {
 
+    /**
+     * Visit the entity as we're walking the structure
+     * @param em
+     * @param collName
+     * @param visitedEntity
+     */
     public void visitCollectionEntry( 
-        EntityManager em, String collName, Entity sourceEntity, Entity targetEntity );
-
-    public void visitConnectionEntry( 
-        EntityManager em, String connType, Entity sourceEntity, Entity targetEntity );
+        EntityManager em, String collName, Entity visitedEntity );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index b507edd..d491d3d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -15,12 +15,15 @@
  */
 package org.apache.usergrid.corepersistence;
 
+
 import java.util.Stack;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityRef;
-import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 import org.apache.usergrid.persistence.SimpleEntityRef;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -30,10 +33,12 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 import rx.Observable;
 import rx.functions.Action1;
+import rx.functions.Func1;
+
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 
 
 /**
@@ -43,125 +48,83 @@ public class CpWalker {
 
     private static final Logger logger = LoggerFactory.getLogger( CpWalker.class );
 
-    private long writeDelayMs = 100;
+    private final long writeDelayMs;
+
 
+    /**
+     * Wait the set amount of time between successive writes.
+     * @param writeDelayMs
+     */
+    public CpWalker(final long writeDelayMs){
+         this.writeDelayMs = writeDelayMs;
+    }
 
-    public void walkCollections( final CpEntityManager em, final EntityRef start, 
-            final CpVisitor visitor ) throws Exception {
 
-        Stack stack = new Stack();
-        Id appId = new SimpleId( em.getApplicationId(), TYPE_APPLICATION );
-        stack.push( appId );
+    public void walkCollections( final CpEntityManager em, final EntityRef start, final CpVisitor visitor )
+            throws Exception {
 
-        doWalkCollections(em, new SimpleId(start.getUuid(), start.getType()), visitor, new Stack());
+        doWalkCollections( em, new SimpleId( start.getUuid(), start.getType() ), visitor );
     }
 
 
-    private void doWalkCollections( final CpEntityManager em, final Id start, 
-            final CpVisitor visitor, final Stack stack ) {
+    private void doWalkCollections( final CpEntityManager em, final Id start, final CpVisitor visitor ) {
 
         final Id fromEntityId = new SimpleId( start.getUuid(), start.getType() );
 
         final ApplicationScope applicationScope = em.getApplicationScope();
 
-        final GraphManager gm = em.getManagerCache().getGraphManager(applicationScope);
-
-        logger.debug("Loading edges types from {}:{}\n   scope {}:{}",
-            new Object[] { start.getType(), start.getUuid(),
-                applicationScope.getApplication().getType(),
-                applicationScope.getApplication().getUuid()
-            });
+        final GraphManager gm = em.getManagerCache().getGraphManager( applicationScope );
 
-        Observable<String> edgeTypes = gm.getEdgeTypesFromSource( 
-                new SimpleSearchEdgeType( fromEntityId, null , null ));
+        logger.debug( "Loading edges types from {}:{}\n   scope {}:{}", new Object[] {
+                        start.getType(), start.getUuid(), applicationScope.getApplication().getType(),
+                        applicationScope.getApplication().getUuid()
+                } );
 
-        edgeTypes.forEach( new Action1<String>() {
+        Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
+                new SimpleSearchEdgeType( fromEntityId, CpNamingUtils.EDGE_COLL_SUFFIX, null ) );
 
+        edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
             @Override
-            public void call( final String edgeType ) {
-
-                try {
-                    Thread.sleep( writeDelayMs );
-                } catch ( InterruptedException ignored ) {}
+            public Observable<Edge> call( final String edgeType ) {
 
-                logger.debug("Loading edges of edgeType {} from {}:{}\n   scope {}:{}",
-                    new Object[] { edgeType, start.getType(), start.getUuid(),
-                        applicationScope.getApplication().getType(),
+                logger.debug( "Loading edges of edgeType {} from {}:{}\n   scope {}:{}", new Object[] {
+                        edgeType, start.getType(), start.getUuid(), applicationScope.getApplication().getType(),
                         applicationScope.getApplication().getUuid()
-                });
+                } );
 
+                return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( fromEntityId, edgeType, Long.MAX_VALUE,
+                                SearchByEdgeType.Order.DESCENDING, null ) );
+            }
+        } ).doOnNext( new Action1<Edge>() {
 
-                Observable<String> edgeTypes = gm.getEdgeTypesFromSource( new SimpleSearchEdgeType(fromEntityId, CpNamingUtils.EDGE_COLL_SUFFIX, null ));
-
-
-
-                Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType( 
-                        fromEntityId, edgeType, Long.MAX_VALUE, 
-                        SearchByEdgeType.Order.DESCENDING, null ));
-
-                edges.forEach( new Action1<Edge>() {
-
-                    @Override
-                    public void call( Edge edge ) {
-
-                        EntityRef sourceEntityRef = new SimpleEntityRef( 
-                            edge.getSourceNode().getType(), edge.getSourceNode().getUuid());
-                        Entity sourceEntity;
-                        try {
-                            sourceEntity = em.get( sourceEntityRef );
-                        } catch (Exception ex) {
-                            logger.error( "Error getting sourceEntity {}:{}, continuing", 
-                                    sourceEntityRef.getType(), sourceEntityRef.getUuid());
-                            return;
-                        }
-
-                        EntityRef targetEntityRef = new SimpleEntityRef( 
-                            edge.getTargetNode().getType(), edge.getTargetNode().getUuid());
-                        Entity targetEntity;
-                        try {
-                            targetEntity = em.get( targetEntityRef );
-                        } catch (Exception ex) {
-                            logger.error( "Error getting sourceEntity {}:{}, continuing", 
-                                    sourceEntityRef.getType(), sourceEntityRef.getUuid());
-                            return;
-                        }
-                            
-                        if ( CpNamingUtils.isCollectionEdgeType( edge.getType() )) {
-
-                            String collName = CpNamingUtils.getCollectionName( edgeType );
-
-                            visitor.visitCollectionEntry( 
-                                    em, collName, sourceEntity, targetEntity );
-
-                            // recursion
-                            if ( !stack.contains( targetEntity.getUuid() )) {
-                                stack.push( targetEntity.getUuid() );
-                                doWalkCollections( em, edge.getSourceNode(), visitor, stack );
-                                stack.pop(); 
-                            }
+            @Override
+            public void call( Edge edge ) {
 
-                        } else {
+                EntityRef sourceEntityRef =
+                        new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
 
-                            String collName = CpNamingUtils.getConnectionType(edgeType);
+                Entity entity;
+                try {
+                    entity = em.get( sourceEntityRef );
+                }
+                catch ( Exception ex ) {
+                    logger.error( "Error getting sourceEntity {}:{}, continuing", sourceEntityRef.getType(),
+                            sourceEntityRef.getUuid() );
+                    return;
+                }
 
-                            visitor.visitConnectionEntry( 
-                                    em, collName, sourceEntity, targetEntity );
 
-                            // recursion
-                            if ( !stack.contains( targetEntity.getUuid() )) {
-                                stack.push( targetEntity.getUuid() );
-                                doWalkCollections( em, edge.getTargetNode(), visitor, stack );
-                                stack.pop(); 
-                            }
-                        }
-                    }
+                String collName = CpNamingUtils.getCollectionName( edge.getType() );
 
-                }); // end foreach on edges
+                visitor.visitCollectionEntry( em, collName, entity );
 
+                try {
+                    Thread.sleep( writeDelayMs );
+                }
+                catch ( InterruptedException e ) {
+                    throw new RuntimeException( "Unable to wait" );
+                }
             }
-
-        }); // end foreach on edgeTypes
-
+        } ).toBlocking().lastOrDefault( null ); // end foreach on edges
     }
-    
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
index 06c3114..e57aa69 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
@@ -131,6 +131,12 @@ public interface EntityManagerFactory {
     public void rebuildCollectionIndex(UUID appId, String collection, ProgressObserver object);
 
     public interface ProgressObserver {
-        public void onProgress( EntityRef source, EntityRef target, String edgeType );
+        public void onProgress( EntityRef entity);
+
+        /**
+         * Get the write delay time from the progress observer.  Used to throttle writes
+         * @return
+         */
+        public long getWriteDelayTime();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
index c790f64..51e825b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
+++ b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
@@ -150,6 +150,7 @@ public class CoreApplication implements Application, TestRule {
         id = setup.createApplication( orgName, appName );
         assertNotNull( id );
 
+        setup.getEmf().refreshIndex();
         em = setup.getEmf().getEntityManager( id );
         em.createIndex();
         assertNotNull( em );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 8de520a..d2f7fef 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -60,14 +60,13 @@ import static org.junit.Assert.fail;
 //@RunWith(JukitoRunner.class)
 //@UseModules({ GuiceModule.class })
 @Concurrent()
-@Ignore("Temporary ignore")
 public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
     private static final Logger logger = LoggerFactory.getLogger(PerformanceEntityRebuildIndexTest.class );
 
     private static final MetricRegistry registry = new MetricRegistry();
     private Slf4jReporter reporter;
 
-    private static final long RUNTIME = TimeUnit.MINUTES.toMillis( 1 );
+    private static final long RUNTIME = TimeUnit.SECONDS.toMillis( 5 );
 
     private static final long writeDelayMs = 100;
     //private static final long readDelayMs = 7;
@@ -192,18 +191,26 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
         EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
             int counter = 0;
             @Override
-            public void onProgress( EntityRef s, EntityRef t, String etype ) {
+               public void onProgress( final EntityRef entity ) {
+
                 meter.mark();
-                logger.debug("Indexing from {}:{} to {}:{} edgeType {}", new Object[] {
-                    s.getType(), s.getUuid(), t.getType(), t.getUuid(), etype });
+                logger.debug("Indexing from {}:{}", entity.getType(), entity.getUuid());
                 if ( !logger.isDebugEnabled() && counter % 100 == 0 ) {
                     logger.info("Reindexed {} entities", counter );
                 }
                 counter++;
             }
+
+
+
+            @Override
+            public long getWriteDelayTime() {
+                return 0;
+            }
         };
 
         try {
+//            setup.getEmf().refreshIndex();
             setup.getEmf().rebuildAllIndexes( po );
 
             registry.remove( meterName );
@@ -236,9 +243,6 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
         eeii.deleteIndex();
     }
 
-    private int readData( String collectionName ) throws Exception {
-        return readData( collectionName, -1 );
-    }
 
     private int readData( String collectionName, int expected ) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/163fa9ad/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
index ceec656..28a7120 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/SystemResource.java
@@ -124,11 +124,17 @@ public class SystemResource extends AbstractContextResource {
 
 
         final ProgressObserver po = new ProgressObserver() {
+
+
+            @Override
+            public void onProgress( final EntityRef entity ) {
+                logger.info( "Indexing from {}:{} ", entity.getType(), entity.getUuid() );
+            }
+
+
             @Override
-            public void onProgress( EntityRef s, EntityRef t, String etype ) {
-                logger.info( "Indexing from {}:{} to {}:{} edgeType {}", new Object[] {
-                        s.getType(), s.getUuid(), t.getType(), t.getUuid(), etype
-                } );
+            public long getWriteDelayTime() {
+                return 0;
             }
         };
 
@@ -168,7 +174,8 @@ public class SystemResource extends AbstractContextResource {
     public JSONWithPadding rebuildIndexes( 
                 @Context UriInfo ui, 
                 @PathParam( "applicationId" ) String applicationIdStr,
-                @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+                @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback,
+                @QueryParam( "delay" ) @DefaultValue( "10" ) final long delay)
 
             throws Exception {
 
@@ -177,14 +184,6 @@ public class SystemResource extends AbstractContextResource {
         response.setAction( "rebuild indexes" );
 
 
-        final ProgressObserver po = new ProgressObserver() {
-            @Override
-            public void onProgress( EntityRef s, EntityRef t, String etype ) {
-                logger.info( "Indexing from {}:{} to {}:{} edgeType {}", new Object[] {
-                        s.getType(), s.getUuid(), t.getType(), t.getUuid(), etype
-                } );
-            }
-        };
 
 
         final EntityManager em = emf.getEntityManager( appId );
@@ -199,7 +198,7 @@ public class SystemResource extends AbstractContextResource {
 
 
                 {
-                    rebuildCollection( appId, collectionName );
+                    rebuildCollection( appId, collectionName, delay );
                 }
             }
         };
@@ -221,7 +220,8 @@ public class SystemResource extends AbstractContextResource {
                 @Context UriInfo ui,
                 @PathParam( "applicationId" ) final String applicationIdStr,
                 @PathParam( "collectionName" ) final String collectionName,
-                @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+                @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback,
+                @QueryParam( "delay" ) @DefaultValue( "10" ) final long delay )
             throws Exception {
 
         final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
@@ -232,7 +232,7 @@ public class SystemResource extends AbstractContextResource {
 
             public void run() {
 
-                rebuildCollection( appId, collectionName );
+                rebuildCollection( appId, collectionName, delay );
             }
         };
 
@@ -247,13 +247,18 @@ public class SystemResource extends AbstractContextResource {
     }
 
 
-    private void rebuildCollection( final UUID applicationId, final String collectionName ) {
+    private void rebuildCollection( final UUID applicationId, final String collectionName, final long delay ) {
         EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+
+            @Override
+            public void onProgress( final EntityRef entity ) {
+                logger.info( "Indexing from {}:{} to {}:{} edgeType {}", entity.getType(), entity.getUuid());
+            }
+
+
             @Override
-            public void onProgress( EntityRef s, EntityRef t, String etype ) {
-                logger.info( "Indexing from {}:{} to {}:{} edgeType {}", new Object[] {
-                        s.getType(), s.getUuid(), t.getType(), t.getUuid(), etype
-                } );
+            public long getWriteDelayTime() {
+                return delay;
             }
         };