You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/22 02:22:05 UTC

[07/19] incubator-usergrid git commit: Fixes bugs and cleans up tests

Fixes bugs and cleans up tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/48be894f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/48be894f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/48be894f

Branch: refs/heads/USERGRID-641
Commit: 48be894fb8af857f33700fcc2717357936d12913
Parents: a767bb6
Author: Todd Nine <tn...@apigee.com>
Authored: Thu May 14 19:23:04 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu May 14 19:23:04 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   4 +
 .../corepersistence/CpRelationManager.java      |   1 -
 .../index/IndexProcessorFig.java                |   2 +-
 .../index/IndexServiceRequestBuilderImpl.java   |  74 ++---
 .../corepersistence/index/ReIndexService.java   |  16 +-
 .../index/ReIndexServiceImpl.java               |  48 ++-
 .../cursor/AbstractCursorSerializer.java        |   2 +-
 .../PerformanceEntityRebuildIndexTest.java      | 318 +++++++++----------
 8 files changed, 225 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index a02bffd..b22a7cb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -25,6 +25,8 @@ import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
 import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
 import org.apache.usergrid.corepersistence.index.IndexService;
 import org.apache.usergrid.corepersistence.index.IndexServiceImpl;
+import org.apache.usergrid.corepersistence.index.ReIndexService;
+import org.apache.usergrid.corepersistence.index.ReIndexServiceImpl;
 import org.apache.usergrid.corepersistence.migration.AppInfoMigrationPlugin;
 import org.apache.usergrid.corepersistence.migration.CoreMigration;
 import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
@@ -142,6 +144,8 @@ public class CoreModule  extends AbstractModule {
         bind( AsyncEventService.class ).toProvider( AsyncIndexProvider.class );
 
 
+        bind( ReIndexService.class).to( ReIndexServiceImpl.class );
+
 
         install( new GuicyFigModule( IndexProcessorFig.class ) );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 4993d88..cba1a07 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -661,7 +661,6 @@ public class CpRelationManager implements RelationManager {
         }while (!found && length <= maxLength);
         if(logger.isInfoEnabled()){
             logger.info(String.format("Consistent Search finished in %s,  results=%s, expected=%s...dumping stack",length, results.size(),expectedResults));
-            Thread.dumpStack();
         }
         return results;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 8e835e2..e4b2329 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -78,7 +78,7 @@ public interface IndexProcessorFig extends GuicyFig {
     String getQueueImplementation();
 
 
-    @Default("10000")
+    @Default("1000")
     @Key("elasticsearch.reindex.flush.interval")
     int getUpdateInterval();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
index 3466674..4017b6e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
@@ -22,70 +22,57 @@ package org.apache.usergrid.corepersistence.index;
 
 import java.util.UUID;
 
-
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
 import com.google.common.base.Optional;
 
 
+/**
+ * Index service request builder
+ */
 public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilder {
 
-
-    /**
-     *
-     final Observable<ApplicationScope>  applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData();
-
-     final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
-
-     //create an observable that loads each entity and indexes it, start it running with publish
-     final ConnectableObservable<EdgeScope> runningReIndex =
-         allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp )
-
-             //for each edge, create our scope and index on it
-             .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();
-
-
-
-     //start our sampler and state persistence
-     //take a sample every sample interval to allow us to resume state with minimal loss
-     runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS,
-         rxTaskScheduler.getAsyncIOScheduler() )
-         .doOnNext( edge -> {
-
-             final String serializedState = SerializableMapper.asString( edge );
-
-             mapManager.putString( newCursor, serializedState, INDEX_TTL );
-         } ).subscribe();
-
-
-     */
-
-    private Optional<UUID> withApplicationId;
-    private Optional<String> withCollectionName;
-    private Optional<String> cursor;
-    private Optional<Long> updateTimestamp;
+    private Optional<UUID> withApplicationId = Optional.absent();
+    private Optional<String> withCollectionName = Optional.absent();
+    private Optional<String> cursor = Optional.absent();
+    private Optional<Long> updateTimestamp = Optional.absent();
 
 
     /***
      *
-     * @param applicationId
+     * @param applicationId The application id
      * @return
      */
     @Override
     public IndexServiceRequestBuilder withApplicationId( final UUID applicationId ) {
-        this.withApplicationId = Optional.fromNullable(applicationId);
+        this.withApplicationId = Optional.fromNullable( applicationId );
         return this;
     }
 
 
+    /**
+     * the colleciton name
+     * @param collectionName
+     * @return
+     */
     @Override
     public IndexServiceRequestBuilder withCollection( final String collectionName ) {
-        this.withCollectionName = Optional.fromNullable( collectionName );
+        if(collectionName == null){
+            this.withCollectionName = Optional.absent();
+        }
+        else {
+            this.withCollectionName = Optional.fromNullable( CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ) );
+        }
         return this;
     }
 
 
+    /**
+     * The cursor
+     * @param cursor
+     * @return
+     */
     @Override
     public IndexServiceRequestBuilder withCursor( final String cursor ) {
         this.cursor = Optional.fromNullable( cursor );
@@ -93,9 +80,14 @@ public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilde
     }
 
 
+    /**
+     * Set start timestamp in epoch time.  Only entities updated since this time will be processed for indexing
+     * @param timestamp
+     * @return
+     */
     @Override
     public IndexServiceRequestBuilder withStartTimestamp( final Long timestamp ) {
-        this.updateTimestamp = Optional.fromNullable(timestamp  );
+        this.updateTimestamp = Optional.fromNullable( timestamp );
         return this;
     }
 
@@ -103,8 +95,8 @@ public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilde
     @Override
     public Optional<ApplicationScope> getApplicationScope() {
 
-        if(this.withApplicationId.isPresent()){
-            return Optional.of(  CpNamingUtils.getApplicationScope( withApplicationId.get()));
+        if ( this.withApplicationId.isPresent() ) {
+            return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) );
         }
 
         return Optional.absent();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index f8955dd..af3615e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -31,7 +31,7 @@ public interface ReIndexService {
      *
      * @param indexServiceRequestBuilder The builder to build the request
      */
-    IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder );
+    ReIndexStatus rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder );
 
 
     /**
@@ -45,20 +45,20 @@ public interface ReIndexService {
      * @param jobId The jobId returned during the rebuild index
      * @return
      */
-    IndexResponse getStatus( final String jobId );
+    ReIndexStatus getStatus( final String jobId );
 
 
     /**
      * The response when requesting a re-index operation
      */
-    class IndexResponse {
+    class ReIndexStatus {
         final String jobId;
-        final String status;
+        final Status status;
         final long numberProcessed;
         final long lastUpdated;
 
 
-        public IndexResponse( final String jobId, final String status, final long numberProcessed,
+        public ReIndexStatus( final String jobId, final Status status, final long numberProcessed,
                               final long lastUpdated ) {
             this.jobId = jobId;
             this.status = status;
@@ -97,8 +97,12 @@ public interface ReIndexService {
          * Get the status
          * @return
          */
-        public String getStatus() {
+        public Status getStatus() {
             return status;
         }
     }
+
+    enum Status{
+        STARTED, INPROGRESS, COMPLETE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index d828fc2..f44113b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -22,6 +22,10 @@ package org.apache.usergrid.corepersistence.index;
 
 import java.util.List;
 
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializerUtil;
 import org.apache.usergrid.corepersistence.pipeline.read.CursorSeek;
@@ -51,6 +55,8 @@ import rx.schedulers.Schedulers;
 @Singleton
 public class ReIndexServiceImpl implements ReIndexService {
 
+    private static final Logger logger = LoggerFactory.getLogger( ReIndexServiceImpl.class );
+
     private static final MapScope RESUME_MAP_SCOPE =
         new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" );
 
@@ -85,7 +91,7 @@ public class ReIndexServiceImpl implements ReIndexService {
 
 
     @Override
-    public IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
+    public ReIndexStatus rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
 
         //load our last emitted Scope if a cursor is present
 
@@ -97,7 +103,7 @@ public class ReIndexServiceImpl implements ReIndexService {
         final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope();
 
 
-        Preconditions.checkArgument( cursor.isPresent() && appId.isPresent(),
+        Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()),
             "You cannot specify an app id and a cursor.  When resuming with cursor you must omit the appid" );
 
         final Observable<ApplicationScope> applicationScopes = getApplications( cursor, appId );
@@ -112,9 +118,14 @@ public class ReIndexServiceImpl implements ReIndexService {
             indexServiceRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
 
             //for each edge, create our scope and index on it
-            .doOnNext( edge -> indexService.index(
-                new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
-                    modifiedSince ) ) );
+            .doOnNext( edge -> {
+                final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(), modifiedSince );
+
+                logger.info( "Queueing {}", entityIndexOperation );
+
+                indexService.index(entityIndexOperation);
+
+            } );
 
 
         //start our sampler and state persistence
@@ -127,7 +138,7 @@ public class ReIndexServiceImpl implements ReIndexService {
             .subscribeOn( Schedulers.io() ).subscribe();
 
 
-        return new IndexResponse( jobId, "Started", 0, 0 );
+        return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
     }
 
 
@@ -138,7 +149,7 @@ public class ReIndexServiceImpl implements ReIndexService {
 
 
     @Override
-    public IndexResponse getStatus( final String jobId ) {
+    public ReIndexStatus getStatus( final String jobId ) {
         Preconditions.checkNotNull( jobId, "jobId must not be null" );
         return getIndexResponse( jobId );
     }
@@ -166,11 +177,11 @@ public class ReIndexServiceImpl implements ReIndexService {
                 writeCursorState( jobId, buffer.get( buffer.size() - 1 ) );
             }
 
-            writeStateMeta( jobId, "InProgress", count, System.currentTimeMillis() );
+            writeStateMeta( jobId, Status.INPROGRESS, count, System.currentTimeMillis() );
         }
 
         public void complete(){
-            writeStateMeta( jobId, "Complete", count, System.currentTimeMillis() );
+            writeStateMeta( jobId, Status.COMPLETE, count, System.currentTimeMillis() );
         }
     }
 
@@ -257,10 +268,15 @@ public class ReIndexServiceImpl implements ReIndexService {
      * @param processedCount
      * @param lastUpdated
      */
-    private void writeStateMeta( final String jobId, final String status, final long processedCount,
+    private void writeStateMeta( final String jobId, final Status status, final long processedCount,
                                  final long lastUpdated ) {
 
-        mapManager.putString( jobId + MAP_STATUS_KEY, status );
+        if(logger.isDebugEnabled()) {
+            logger.debug( "Flushing state for jobId {}, status {}, processedCount {}, lastUpdated {}",
+                new Object[] { jobId, status, processedCount, lastUpdated } );
+        }
+
+        mapManager.putString( jobId + MAP_STATUS_KEY, status.name() );
         mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
         mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
     }
@@ -271,18 +287,20 @@ public class ReIndexServiceImpl implements ReIndexService {
      * @param jobId
      * @return
      */
-    private IndexResponse getIndexResponse( final String jobId ) {
+    private ReIndexStatus getIndexResponse( final String jobId ) {
 
-        final String status = mapManager.getString( jobId+MAP_STATUS_KEY );
+        final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY );
 
-        if(status == null){
+        if(stringStatus == null){
             throw new IllegalArgumentException( "Could not find a job with id " + jobId );
         }
 
+        final Status status = Status.valueOf( stringStatus );
+
         final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
         final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY );
 
-        return new IndexResponse( jobId, status, processedCount, lastUpdated );
+        return new ReIndexStatus( jobId, status, processedCount, lastUpdated );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
index 23bb99a..e770a77 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
@@ -44,7 +44,7 @@ public abstract class AbstractCursorSerializer<T> implements CursorSerializer<T>
         try {
             final Class<? extends T> classType = getType();
 
-            return objectMapper.treeToValue( node, classType );
+             return objectMapper.treeToValue( node, classType );
         }
         catch ( JsonProcessingException e ) {
             throw new CursorParseException( "Unable to deserialize value", e );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 55c4846..8d54043 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -22,78 +22,67 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Optional;
-import org.apache.commons.lang.RandomStringUtils;
-
-import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder;
-import org.apache.usergrid.corepersistence.index.ReIndexService;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang.RandomStringUtils;
+
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.cassandra.SpringResource;
+import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder;
+import org.apache.usergrid.corepersistence.index.ReIndexService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
-import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Slf4jReporter;
 import com.google.inject.Injector;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 
 //@RunWith(JukitoRunner.class)
 //@UseModules({ GuiceModule.class })
 
+
 public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
-    private static final Logger logger = LoggerFactory.getLogger(PerformanceEntityRebuildIndexTest.class );
+    private static final Logger logger = LoggerFactory.getLogger( PerformanceEntityRebuildIndexTest.class );
 
     private static final MetricRegistry registry = new MetricRegistry();
-    private Slf4jReporter reporter;
-
-    private static final int ENTITIES_TO_INDEX = 2000;
 
 
+    private static final int ENTITIES_TO_INDEX = 1000;
 
 
     @Before
     public void startReporting() {
 
-        logger.debug("Starting metrics reporting");
-        reporter = Slf4jReporter.forRegistry( registry ).outputTo( logger )
-                .convertRatesTo( TimeUnit.SECONDS )
-                .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
-
-        reporter.start( 10, TimeUnit.SECONDS );
+        logger.debug( "Starting metrics reporting" );
     }
 
 
     @After
     public void printReport() {
-        logger.debug("Printing metrics report");
-        reporter.report();
-        reporter.stop();
+        logger.debug( "Printing metrics report" );
     }
 
 
-    @Test
+    @Test( timeout = 120000 )
     public void rebuildOneCollectionIndex() throws Exception {
 
-        logger.info("Started rebuildIndex()");
+        logger.info( "Started rebuildIndex()" );
 
-        String rand = RandomStringUtils.randomAlphanumeric(5);
-        final UUID appId = setup.createApplication("org_" + rand, "app_" + rand);
+        String rand = RandomStringUtils.randomAlphanumeric( 5 );
+        final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
 
         final EntityManager em = setup.getEmf().getEntityManager( appId );
 
@@ -102,109 +91,80 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
         // ----------------- create a bunch of entities
 
         Map<String, Object> entityMap = new HashMap<String, Object>() {{
-            put("key1", 1000 );
-            put("key2", 2000 );
-            put("key3", "Some value");
+            put( "key1", 1000 );
+            put( "key2", 2000 );
+            put( "key3", "Some value" );
         }};
 
 
         List<EntityRef> entityRefs = new ArrayList<EntityRef>();
-        int herderCount  = 0;
+        int herderCount = 0;
         int shepardCount = 0;
-        for (int i = 0; i < ENTITIES_TO_INDEX; i++) {
+        for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) {
 
             final Entity entity;
 
             try {
-                entityMap.put("key", i );
+                entityMap.put( "key", i );
 
                 if ( i % 2 == 0 ) {
-                    entity = em.create("catherder", entityMap);
+                    entity = em.create( "catherder", entityMap );
                     herderCount++;
-                } else {
-                    entity = em.create("catshepard", entityMap);
+                }
+                else {
+                    entity = em.create( "catshepard", entityMap );
                     shepardCount++;
                 }
-
-                app.refreshIndex();
-
-//                em.createConnection(entity, "herds", cat1);
-//                em.createConnection(entity, "herds", cat2);
-//                em.createConnection(entity, "herds", cat3);
-
-            } catch (Exception ex) {
-                throw new RuntimeException("Error creating entity", ex);
+            }
+            catch ( Exception ex ) {
+                throw new RuntimeException( "Error creating entity", ex );
             }
 
-            entityRefs.add(new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+            entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
             if ( i % 10 == 0 ) {
-                logger.info("Created {} entities", i );
+                logger.info( "Created {} entities", i );
             }
-
         }
 
-        logger.info("Created {} entities", ENTITIES_TO_INDEX);
+        logger.info( "Created {} entities", ENTITIES_TO_INDEX );
         app.refreshIndex();
 
         // ----------------- test that we can read them, should work fine
 
-        logger.debug("Read the data");
-        readData( em, "catherders", herderCount, 0);
-        readData( em, "catshepards", shepardCount, 0);
+        logger.debug( "Read the data" );
+        readData( em, "catherders", herderCount, 0 );
+        readData( em, "catshepards", shepardCount, 0 );
 
         // ----------------- delete the system and application indexes
 
-        logger.debug("Deleting apps");
+        logger.debug( "Deleting apps" );
         deleteIndex( em.getApplicationId() );
 
         // ----------------- test that we can read them, should fail
 
-        logger.debug("Reading data, should fail this time ");
-        try {
-            readData( em,  "testTypes", ENTITIES_TO_INDEX, 0 );
-            fail("should have failed to read data");
+        logger.debug( "Reading data, should fail this time " );
 
-        } catch (Exception expected) {}
+        //should be no data
+        readData( em, "testTypes", 0, 0 );
 
-//        ----------------- rebuild index for catherders only
 
-        logger.debug("Preparing to rebuild all indexes");;
+        //        ----------------- rebuild index for catherders only
 
-        final String meterName = this.getClass().getSimpleName() + ".rebuildIndex";
-        final Meter meter = registry.meter( meterName );
+        logger.debug( "Preparing to rebuild all indexes" );
 
-        EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
-            int counter = 0;
 
-            @Override
-            public void onProgress( final EntityRef entity ) {
-
-                meter.mark();
-                logger.debug("Indexing {}:{}", entity.getType(), entity.getUuid());
-                if ( counter % 100 == 0 ) {
-                    logger.info("Reindexed {} entities", counter );
-                }
-                counter++;
-            }
+        final IndexServiceRequestBuilder builder =
+            reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
 
+        ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
 
+        assertNotNull( status.getJobId(), "JobId is present" );
 
-        };
-
-        try {
+        logger.info( "Rebuilt index" );
 
-            final IndexServiceRequestBuilder builder = reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
 
-            reIndexService.rebuildIndex(builder );
+        waitForRebuild( status, reIndexService );
 
-            reporter.report();
-            registry.remove( meterName );
-            logger.info("Rebuilt index");
-
-        } catch (Exception ex) {
-            logger.error("Error rebuilding index", ex);
-            fail();
-        }
 
         // ----------------- test that we can read the catherder collection and not the catshepard
 
@@ -213,78 +173,79 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
     }
 
 
-    @Test
+    @Test( timeout = 120000 )
     public void rebuildIndex() throws Exception {
 
-        logger.info("Started rebuildIndex()");
+        logger.info( "Started rebuildIndex()" );
+
+        String rand = RandomStringUtils.randomAlphanumeric( 5 );
+        final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
 
-        String rand = RandomStringUtils.randomAlphanumeric(5);
-        final UUID appId = setup.createApplication("org_" + rand, "app_" + rand);
+        final EntityManager em = setup.getEmf().getEntityManager( appId );
 
-        final EntityManager em = setup.getEmf().getEntityManager(appId);
+        final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class );
 
         // ----------------- create a bunch of entities
 
         Map<String, Object> entityMap = new HashMap<String, Object>() {{
-            put("key1", 1000 );
-            put("key2", 2000 );
-            put("key3", "Some value");
+            put( "key1", 1000 );
+            put( "key2", 2000 );
+            put( "key3", "Some value" );
         }};
         Map<String, Object> cat1map = new HashMap<String, Object>() {{
-            put("name", "enzo");
-            put("color", "orange");
+            put( "name", "enzo" );
+            put( "color", "orange" );
         }};
         Map<String, Object> cat2map = new HashMap<String, Object>() {{
-            put("name", "marquee");
-            put("color", "grey");
+            put( "name", "marquee" );
+            put( "color", "grey" );
         }};
         Map<String, Object> cat3map = new HashMap<String, Object>() {{
-            put("name", "bertha");
-            put("color", "tabby");
+            put( "name", "bertha" );
+            put( "color", "tabby" );
         }};
 
-        Entity cat1 = em.create("cat", cat1map );
-        Entity cat2 = em.create("cat", cat2map );
-        Entity cat3 = em.create("cat", cat3map );
+        Entity cat1 = em.create( "cat", cat1map );
+        Entity cat2 = em.create( "cat", cat2map );
+        Entity cat3 = em.create( "cat", cat3map );
 
-        List<EntityRef> entityRefs = new ArrayList<EntityRef>();
-        int entityCount = 0;
-        for (int i = 0; i < ENTITIES_TO_INDEX; i++) {
+        List<EntityRef> entityRefs = new ArrayList<>();
+
+        for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) {
 
             final Entity entity;
 
             try {
-                entityMap.put("key", entityCount );
-                entity = em.create("testType", entityMap );
+                entityMap.put( "key", i );
+                entity = em.create( "testType", entityMap );
 
 
-                em.createConnection(entity, "herds", cat1);
-                em.createConnection(entity, "herds", cat2);
-                em.createConnection(entity, "herds", cat3);
-
-            } catch (Exception ex) {
-                throw new RuntimeException("Error creating entity", ex);
+                em.createConnection( entity, "herds", cat1 );
+                em.createConnection( entity, "herds", cat2 );
+                em.createConnection( entity, "herds", cat3 );
             }
-
-            entityRefs.add(new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
-            if ( entityCount % 10 == 0 ) {
-                logger.info("Created {} entities", entityCount );
+            catch ( Exception ex ) {
+                throw new RuntimeException( "Error creating entity", ex );
             }
 
+            entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+            if ( i % 10 == 0 ) {
+                logger.info( "Created {} entities", i );
+            }
         }
 
-        logger.info("Created {} entities", entityCount);
+        logger.info( "Created {} entities", ENTITIES_TO_INDEX );
         app.refreshIndex();
-        Thread.sleep(10000);
 
         // ----------------- test that we can read them, should work fine
 
-        logger.debug("Read the data");
-        readData( em, "testType", entityCount, 3 );
+        logger.debug( "Read the data" );
+        final String collectionName = "testtypes";
+        readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
 
         // ----------------- delete the system and application indexes
 
-        logger.debug("Deleting app index");
+        logger.debug( "Deleting app index" );
 
         deleteIndex( em.getApplicationId() );
 
@@ -295,61 +256,73 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
 
         // ----------------- test that we can read them, should fail
 
-        logger.debug("Reading data, should fail this time ");
-        try {
-            readData( em, "testTypes", entityCount, 3 );
-            fail("should have failed to read data");
+        logger.debug( "Reading data, should fail this time " );
+
+        readData( em, collectionName, 0, 0 );
+
 
-        } catch (Exception expected) {}
 
         // ----------------- rebuild index
 
-        logger.debug("Preparing to rebuild all indexes");;
+        logger.debug( "Preparing to rebuild all indexes" );
+        ;
 
-        final String meterName = this.getClass().getSimpleName() + ".rebuildIndex";
-        final Meter meter = registry.meter( meterName );
 
-        EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
-            int counter = 0;
+        try {
 
-            @Override
-            public void onProgress( final EntityRef entity ) {
+            final IndexServiceRequestBuilder builder =
+                reIndexService.getBuilder().withApplicationId( em.getApplicationId() );
 
-                meter.mark();
-                logger.debug("Indexing {}:{}", entity.getType(), entity.getUuid());
-                if ( counter % 100 == 0 ) {
-                    logger.info("Reindexed {} entities", counter );
-                }
-                counter++;
-            }
+            ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
 
+            assertNotNull( status.getJobId(), "JobId is present" );
 
-        };
+            logger.info( "Rebuilt index" );
 
-        try {
 
-            fail( "Implement index rebuild" );
-//            setup.getEmf().rebuildInternalIndexes( po );
-//
-//            setup.getEmf().rebuildApplicationIndexes( em.getApplicationId(), po );
+            waitForRebuild( status, reIndexService );
 
-            reporter.report();
-            registry.remove( meterName );
-            logger.info("Rebuilt index");
 
-            app.refreshIndex();
+            logger.info( "Rebuilt index" );
 
-        } catch (Exception ex) {
-            logger.error("Error rebuilding index", ex);
+            app.refreshIndex();
+        }
+        catch ( Exception ex ) {
+            logger.error( "Error rebuilding index", ex );
             fail();
         }
 
         // ----------------- test that we can read them
 
-        Thread.sleep(2000);
-        readData( em, "testTypes", entityCount, 3 );
+        Thread.sleep( 2000 );
+        readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
+    }
+
+
+    /**
+     * Wait for the rebuild to occur
+     */
+    private void waitForRebuild( final ReIndexService.ReIndexStatus status, final ReIndexService reIndexService )
+        throws InterruptedException {
+        while ( true ) {
+
+            try {
+                final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatus( status.getJobId() );
+
+                if ( updatedStatus.getStatus() == ReIndexService.Status.COMPLETE ) {
+                    break;
+                }
+            }
+            catch ( IllegalArgumentException iae ) {
+                //swallow
+            }
+
+
+            Thread.sleep( 1000 );
+        }
     }
 
+
     /**
      * Delete app index
      */
@@ -360,54 +333,49 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
 
         Id appId = new SimpleId( appUuid, Schema.TYPE_APPLICATION );
         ApplicationScope scope = new ApplicationScopeImpl( appId );
-        ApplicationEntityIndex ei = eif.createApplicationEntityIndex(scope);
+        ApplicationEntityIndex ei = eif.createApplicationEntityIndex( scope );
 
-        ei.deleteApplication().toBlocking().lastOrDefault(null);
+        ei.deleteApplication().toBlocking().lastOrDefault( null );
         app.refreshIndex();
-
     }
 
 
-    private int readData( EntityManager em,
-        String collectionName, int expectedEntities, int expectedConnections ) throws Exception {
+    private int readData( EntityManager em, String collectionName, int expectedEntities, int expectedConnections )
+        throws Exception {
 
         app.refreshIndex();
 
-        Query q = Query.fromQL("select * where key1=1000");
-        q.setLimit(40);
-        Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q,expectedEntities );
+        Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
+        Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
 
         int count = 0;
         while ( true ) {
 
             for ( Entity e : results.getEntities() ) {
 
-                assertEquals( 2000, e.getProperty("key2"));
+                assertEquals( 2000, e.getProperty( "key2" ) );
 
-                Results catResults = em.searchTargetEntities(e,
-                    Query.fromQL("select *").setConnectionType("herds"));
+                Results catResults =
+                    em.searchTargetEntities( e, Query.fromQL( "select *" ).setConnectionType( "herds" ) );
                 assertEquals( expectedConnections, catResults.size() );
 
                 if ( count % 100 == 0 ) {
-                    logger.info( "read {} entities", count);
+                    logger.info( "read {} entities", count );
                 }
                 count++;
             }
 
             if ( results.hasCursor() ) {
-                logger.info( "Counted {} : query again with cursor", count);
+                logger.info( "Counted {} : query again with cursor", count );
                 q.setCursor( results.getCursor() );
                 results = em.searchCollection( em.getApplicationRef(), collectionName, q );
-
-            } else {
+            }
+            else {
                 break;
             }
         }
 
-        if ( expectedEntities != -1 && expectedEntities != count ) {
-            throw new RuntimeException("Did not get expected "
-                + expectedEntities + " entities, instead got " + count );
-        }
+        assertEquals("Did not get expected entities", expectedEntities, count);
         return count;
     }
 }