You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/15 03:23:38 UTC
[9/9] incubator-usergrid git commit: Fixes bugs and cleans up tests
Fixes bugs and cleans up tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/48be894f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/48be894f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/48be894f
Branch: refs/heads/USERGRID-643
Commit: 48be894fb8af857f33700fcc2717357936d12913
Parents: a767bb6
Author: Todd Nine <tn...@apigee.com>
Authored: Thu May 14 19:23:04 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu May 14 19:23:04 2015 -0600
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 4 +
.../corepersistence/CpRelationManager.java | 1 -
.../index/IndexProcessorFig.java | 2 +-
.../index/IndexServiceRequestBuilderImpl.java | 74 ++---
.../corepersistence/index/ReIndexService.java | 16 +-
.../index/ReIndexServiceImpl.java | 48 ++-
.../cursor/AbstractCursorSerializer.java | 2 +-
.../PerformanceEntityRebuildIndexTest.java | 318 +++++++++----------
8 files changed, 225 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index a02bffd..b22a7cb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -25,6 +25,8 @@ import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
import org.apache.usergrid.corepersistence.index.IndexService;
import org.apache.usergrid.corepersistence.index.IndexServiceImpl;
+import org.apache.usergrid.corepersistence.index.ReIndexService;
+import org.apache.usergrid.corepersistence.index.ReIndexServiceImpl;
import org.apache.usergrid.corepersistence.migration.AppInfoMigrationPlugin;
import org.apache.usergrid.corepersistence.migration.CoreMigration;
import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
@@ -142,6 +144,8 @@ public class CoreModule extends AbstractModule {
bind( AsyncEventService.class ).toProvider( AsyncIndexProvider.class );
+ bind( ReIndexService.class).to( ReIndexServiceImpl.class );
+
install( new GuicyFigModule( IndexProcessorFig.class ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 4993d88..cba1a07 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -661,7 +661,6 @@ public class CpRelationManager implements RelationManager {
}while (!found && length <= maxLength);
if(logger.isInfoEnabled()){
logger.info(String.format("Consistent Search finished in %s, results=%s, expected=%s...dumping stack",length, results.size(),expectedResults));
- Thread.dumpStack();
}
return results;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 8e835e2..e4b2329 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -78,7 +78,7 @@ public interface IndexProcessorFig extends GuicyFig {
String getQueueImplementation();
- @Default("10000")
+ @Default("1000")
@Key("elasticsearch.reindex.flush.interval")
int getUpdateInterval();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
index 3466674..4017b6e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
@@ -22,70 +22,57 @@ package org.apache.usergrid.corepersistence.index;
import java.util.UUID;
-
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import com.google.common.base.Optional;
+/**
+ * Index service request builder
+ */
public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilder {
-
- /**
- *
- final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData();
-
- final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
-
- //create an observable that loads each entity and indexes it, start it running with publish
- final ConnectableObservable<EdgeScope> runningReIndex =
- allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp )
-
- //for each edge, create our scope and index on it
- .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();
-
-
-
- //start our sampler and state persistence
- //take a sample every sample interval to allow us to resume state with minimal loss
- runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS,
- rxTaskScheduler.getAsyncIOScheduler() )
- .doOnNext( edge -> {
-
- final String serializedState = SerializableMapper.asString( edge );
-
- mapManager.putString( newCursor, serializedState, INDEX_TTL );
- } ).subscribe();
-
-
- */
-
- private Optional<UUID> withApplicationId;
- private Optional<String> withCollectionName;
- private Optional<String> cursor;
- private Optional<Long> updateTimestamp;
+ private Optional<UUID> withApplicationId = Optional.absent();
+ private Optional<String> withCollectionName = Optional.absent();
+ private Optional<String> cursor = Optional.absent();
+ private Optional<Long> updateTimestamp = Optional.absent();
/***
*
- * @param applicationId
+ * @param applicationId The application id
* @return
*/
@Override
public IndexServiceRequestBuilder withApplicationId( final UUID applicationId ) {
- this.withApplicationId = Optional.fromNullable(applicationId);
+ this.withApplicationId = Optional.fromNullable( applicationId );
return this;
}
+ /**
+ * the colleciton name
+ * @param collectionName
+ * @return
+ */
@Override
public IndexServiceRequestBuilder withCollection( final String collectionName ) {
- this.withCollectionName = Optional.fromNullable( collectionName );
+ if(collectionName == null){
+ this.withCollectionName = Optional.absent();
+ }
+ else {
+ this.withCollectionName = Optional.fromNullable( CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ) );
+ }
return this;
}
+ /**
+ * The cursor
+ * @param cursor
+ * @return
+ */
@Override
public IndexServiceRequestBuilder withCursor( final String cursor ) {
this.cursor = Optional.fromNullable( cursor );
@@ -93,9 +80,14 @@ public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilde
}
+ /**
+ * Set start timestamp in epoch time. Only entities updated since this time will be processed for indexing
+ * @param timestamp
+ * @return
+ */
@Override
public IndexServiceRequestBuilder withStartTimestamp( final Long timestamp ) {
- this.updateTimestamp = Optional.fromNullable(timestamp );
+ this.updateTimestamp = Optional.fromNullable( timestamp );
return this;
}
@@ -103,8 +95,8 @@ public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilde
@Override
public Optional<ApplicationScope> getApplicationScope() {
- if(this.withApplicationId.isPresent()){
- return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get()));
+ if ( this.withApplicationId.isPresent() ) {
+ return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) );
}
return Optional.absent();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index f8955dd..af3615e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -31,7 +31,7 @@ public interface ReIndexService {
*
* @param indexServiceRequestBuilder The builder to build the request
*/
- IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder );
+ ReIndexStatus rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder );
/**
@@ -45,20 +45,20 @@ public interface ReIndexService {
* @param jobId The jobId returned during the rebuild index
* @return
*/
- IndexResponse getStatus( final String jobId );
+ ReIndexStatus getStatus( final String jobId );
/**
* The response when requesting a re-index operation
*/
- class IndexResponse {
+ class ReIndexStatus {
final String jobId;
- final String status;
+ final Status status;
final long numberProcessed;
final long lastUpdated;
- public IndexResponse( final String jobId, final String status, final long numberProcessed,
+ public ReIndexStatus( final String jobId, final Status status, final long numberProcessed,
final long lastUpdated ) {
this.jobId = jobId;
this.status = status;
@@ -97,8 +97,12 @@ public interface ReIndexService {
* Get the status
* @return
*/
- public String getStatus() {
+ public Status getStatus() {
return status;
}
}
+
+ enum Status{
+ STARTED, INPROGRESS, COMPLETE;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index d828fc2..f44113b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -22,6 +22,10 @@ package org.apache.usergrid.corepersistence.index;
import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializerUtil;
import org.apache.usergrid.corepersistence.pipeline.read.CursorSeek;
@@ -51,6 +55,8 @@ import rx.schedulers.Schedulers;
@Singleton
public class ReIndexServiceImpl implements ReIndexService {
+ private static final Logger logger = LoggerFactory.getLogger( ReIndexServiceImpl.class );
+
private static final MapScope RESUME_MAP_SCOPE =
new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" );
@@ -85,7 +91,7 @@ public class ReIndexServiceImpl implements ReIndexService {
@Override
- public IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
+ public ReIndexStatus rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
//load our last emitted Scope if a cursor is present
@@ -97,7 +103,7 @@ public class ReIndexServiceImpl implements ReIndexService {
final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope();
- Preconditions.checkArgument( cursor.isPresent() && appId.isPresent(),
+ Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()),
"You cannot specify an app id and a cursor. When resuming with cursor you must omit the appid" );
final Observable<ApplicationScope> applicationScopes = getApplications( cursor, appId );
@@ -112,9 +118,14 @@ public class ReIndexServiceImpl implements ReIndexService {
indexServiceRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
//for each edge, create our scope and index on it
- .doOnNext( edge -> indexService.index(
- new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
- modifiedSince ) ) );
+ .doOnNext( edge -> {
+ final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(), modifiedSince );
+
+ logger.info( "Queueing {}", entityIndexOperation );
+
+ indexService.index(entityIndexOperation);
+
+ } );
//start our sampler and state persistence
@@ -127,7 +138,7 @@ public class ReIndexServiceImpl implements ReIndexService {
.subscribeOn( Schedulers.io() ).subscribe();
- return new IndexResponse( jobId, "Started", 0, 0 );
+ return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
}
@@ -138,7 +149,7 @@ public class ReIndexServiceImpl implements ReIndexService {
@Override
- public IndexResponse getStatus( final String jobId ) {
+ public ReIndexStatus getStatus( final String jobId ) {
Preconditions.checkNotNull( jobId, "jobId must not be null" );
return getIndexResponse( jobId );
}
@@ -166,11 +177,11 @@ public class ReIndexServiceImpl implements ReIndexService {
writeCursorState( jobId, buffer.get( buffer.size() - 1 ) );
}
- writeStateMeta( jobId, "InProgress", count, System.currentTimeMillis() );
+ writeStateMeta( jobId, Status.INPROGRESS, count, System.currentTimeMillis() );
}
public void complete(){
- writeStateMeta( jobId, "Complete", count, System.currentTimeMillis() );
+ writeStateMeta( jobId, Status.COMPLETE, count, System.currentTimeMillis() );
}
}
@@ -257,10 +268,15 @@ public class ReIndexServiceImpl implements ReIndexService {
* @param processedCount
* @param lastUpdated
*/
- private void writeStateMeta( final String jobId, final String status, final long processedCount,
+ private void writeStateMeta( final String jobId, final Status status, final long processedCount,
final long lastUpdated ) {
- mapManager.putString( jobId + MAP_STATUS_KEY, status );
+ if(logger.isDebugEnabled()) {
+ logger.debug( "Flushing state for jobId {}, status {}, processedCount {}, lastUpdated {}",
+ new Object[] { jobId, status, processedCount, lastUpdated } );
+ }
+
+ mapManager.putString( jobId + MAP_STATUS_KEY, status.name() );
mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
}
@@ -271,18 +287,20 @@ public class ReIndexServiceImpl implements ReIndexService {
* @param jobId
* @return
*/
- private IndexResponse getIndexResponse( final String jobId ) {
+ private ReIndexStatus getIndexResponse( final String jobId ) {
- final String status = mapManager.getString( jobId+MAP_STATUS_KEY );
+ final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY );
- if(status == null){
+ if(stringStatus == null){
throw new IllegalArgumentException( "Could not find a job with id " + jobId );
}
+ final Status status = Status.valueOf( stringStatus );
+
final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY );
- return new IndexResponse( jobId, status, processedCount, lastUpdated );
+ return new ReIndexStatus( jobId, status, processedCount, lastUpdated );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
index 23bb99a..e770a77 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
@@ -44,7 +44,7 @@ public abstract class AbstractCursorSerializer<T> implements CursorSerializer<T>
try {
final Class<? extends T> classType = getType();
- return objectMapper.treeToValue( node, classType );
+ return objectMapper.treeToValue( node, classType );
}
catch ( JsonProcessingException e ) {
throw new CursorParseException( "Unable to deserialize value", e );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 55c4846..8d54043 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -22,78 +22,67 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import com.google.common.base.Optional;
-import org.apache.commons.lang.RandomStringUtils;
-
-import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder;
-import org.apache.usergrid.corepersistence.index.ReIndexService;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.RandomStringUtils;
+
import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.cassandra.SpringResource;
+import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder;
+import org.apache.usergrid.corepersistence.index.ReIndexService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
-import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Slf4jReporter;
import com.google.inject.Injector;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
//@RunWith(JukitoRunner.class)
//@UseModules({ GuiceModule.class })
+
public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
- private static final Logger logger = LoggerFactory.getLogger(PerformanceEntityRebuildIndexTest.class );
+ private static final Logger logger = LoggerFactory.getLogger( PerformanceEntityRebuildIndexTest.class );
private static final MetricRegistry registry = new MetricRegistry();
- private Slf4jReporter reporter;
-
- private static final int ENTITIES_TO_INDEX = 2000;
+ private static final int ENTITIES_TO_INDEX = 1000;
@Before
public void startReporting() {
- logger.debug("Starting metrics reporting");
- reporter = Slf4jReporter.forRegistry( registry ).outputTo( logger )
- .convertRatesTo( TimeUnit.SECONDS )
- .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
-
- reporter.start( 10, TimeUnit.SECONDS );
+ logger.debug( "Starting metrics reporting" );
}
@After
public void printReport() {
- logger.debug("Printing metrics report");
- reporter.report();
- reporter.stop();
+ logger.debug( "Printing metrics report" );
}
- @Test
+ @Test( timeout = 120000 )
public void rebuildOneCollectionIndex() throws Exception {
- logger.info("Started rebuildIndex()");
+ logger.info( "Started rebuildIndex()" );
- String rand = RandomStringUtils.randomAlphanumeric(5);
- final UUID appId = setup.createApplication("org_" + rand, "app_" + rand);
+ String rand = RandomStringUtils.randomAlphanumeric( 5 );
+ final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
final EntityManager em = setup.getEmf().getEntityManager( appId );
@@ -102,109 +91,80 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
// ----------------- create a bunch of entities
Map<String, Object> entityMap = new HashMap<String, Object>() {{
- put("key1", 1000 );
- put("key2", 2000 );
- put("key3", "Some value");
+ put( "key1", 1000 );
+ put( "key2", 2000 );
+ put( "key3", "Some value" );
}};
List<EntityRef> entityRefs = new ArrayList<EntityRef>();
- int herderCount = 0;
+ int herderCount = 0;
int shepardCount = 0;
- for (int i = 0; i < ENTITIES_TO_INDEX; i++) {
+ for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) {
final Entity entity;
try {
- entityMap.put("key", i );
+ entityMap.put( "key", i );
if ( i % 2 == 0 ) {
- entity = em.create("catherder", entityMap);
+ entity = em.create( "catherder", entityMap );
herderCount++;
- } else {
- entity = em.create("catshepard", entityMap);
+ }
+ else {
+ entity = em.create( "catshepard", entityMap );
shepardCount++;
}
-
- app.refreshIndex();
-
-// em.createConnection(entity, "herds", cat1);
-// em.createConnection(entity, "herds", cat2);
-// em.createConnection(entity, "herds", cat3);
-
- } catch (Exception ex) {
- throw new RuntimeException("Error creating entity", ex);
+ }
+ catch ( Exception ex ) {
+ throw new RuntimeException( "Error creating entity", ex );
}
- entityRefs.add(new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+ entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
if ( i % 10 == 0 ) {
- logger.info("Created {} entities", i );
+ logger.info( "Created {} entities", i );
}
-
}
- logger.info("Created {} entities", ENTITIES_TO_INDEX);
+ logger.info( "Created {} entities", ENTITIES_TO_INDEX );
app.refreshIndex();
// ----------------- test that we can read them, should work fine
- logger.debug("Read the data");
- readData( em, "catherders", herderCount, 0);
- readData( em, "catshepards", shepardCount, 0);
+ logger.debug( "Read the data" );
+ readData( em, "catherders", herderCount, 0 );
+ readData( em, "catshepards", shepardCount, 0 );
// ----------------- delete the system and application indexes
- logger.debug("Deleting apps");
+ logger.debug( "Deleting apps" );
deleteIndex( em.getApplicationId() );
// ----------------- test that we can read them, should fail
- logger.debug("Reading data, should fail this time ");
- try {
- readData( em, "testTypes", ENTITIES_TO_INDEX, 0 );
- fail("should have failed to read data");
+ logger.debug( "Reading data, should fail this time " );
- } catch (Exception expected) {}
+ //should be no data
+ readData( em, "testTypes", 0, 0 );
-// ----------------- rebuild index for catherders only
- logger.debug("Preparing to rebuild all indexes");;
+ // ----------------- rebuild index for catherders only
- final String meterName = this.getClass().getSimpleName() + ".rebuildIndex";
- final Meter meter = registry.meter( meterName );
+ logger.debug( "Preparing to rebuild all indexes" );
- EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
- int counter = 0;
- @Override
- public void onProgress( final EntityRef entity ) {
-
- meter.mark();
- logger.debug("Indexing {}:{}", entity.getType(), entity.getUuid());
- if ( counter % 100 == 0 ) {
- logger.info("Reindexed {} entities", counter );
- }
- counter++;
- }
+ final IndexServiceRequestBuilder builder =
+ reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
+ ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
+ assertNotNull( status.getJobId(), "JobId is present" );
- };
-
- try {
+ logger.info( "Rebuilt index" );
- final IndexServiceRequestBuilder builder = reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
- reIndexService.rebuildIndex(builder );
+ waitForRebuild( status, reIndexService );
- reporter.report();
- registry.remove( meterName );
- logger.info("Rebuilt index");
-
- } catch (Exception ex) {
- logger.error("Error rebuilding index", ex);
- fail();
- }
// ----------------- test that we can read the catherder collection and not the catshepard
@@ -213,78 +173,79 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
}
- @Test
+ @Test( timeout = 120000 )
public void rebuildIndex() throws Exception {
- logger.info("Started rebuildIndex()");
+ logger.info( "Started rebuildIndex()" );
+
+ String rand = RandomStringUtils.randomAlphanumeric( 5 );
+ final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
- String rand = RandomStringUtils.randomAlphanumeric(5);
- final UUID appId = setup.createApplication("org_" + rand, "app_" + rand);
+ final EntityManager em = setup.getEmf().getEntityManager( appId );
- final EntityManager em = setup.getEmf().getEntityManager(appId);
+ final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class );
// ----------------- create a bunch of entities
Map<String, Object> entityMap = new HashMap<String, Object>() {{
- put("key1", 1000 );
- put("key2", 2000 );
- put("key3", "Some value");
+ put( "key1", 1000 );
+ put( "key2", 2000 );
+ put( "key3", "Some value" );
}};
Map<String, Object> cat1map = new HashMap<String, Object>() {{
- put("name", "enzo");
- put("color", "orange");
+ put( "name", "enzo" );
+ put( "color", "orange" );
}};
Map<String, Object> cat2map = new HashMap<String, Object>() {{
- put("name", "marquee");
- put("color", "grey");
+ put( "name", "marquee" );
+ put( "color", "grey" );
}};
Map<String, Object> cat3map = new HashMap<String, Object>() {{
- put("name", "bertha");
- put("color", "tabby");
+ put( "name", "bertha" );
+ put( "color", "tabby" );
}};
- Entity cat1 = em.create("cat", cat1map );
- Entity cat2 = em.create("cat", cat2map );
- Entity cat3 = em.create("cat", cat3map );
+ Entity cat1 = em.create( "cat", cat1map );
+ Entity cat2 = em.create( "cat", cat2map );
+ Entity cat3 = em.create( "cat", cat3map );
- List<EntityRef> entityRefs = new ArrayList<EntityRef>();
- int entityCount = 0;
- for (int i = 0; i < ENTITIES_TO_INDEX; i++) {
+ List<EntityRef> entityRefs = new ArrayList<>();
+
+ for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) {
final Entity entity;
try {
- entityMap.put("key", entityCount );
- entity = em.create("testType", entityMap );
+ entityMap.put( "key", i );
+ entity = em.create( "testType", entityMap );
- em.createConnection(entity, "herds", cat1);
- em.createConnection(entity, "herds", cat2);
- em.createConnection(entity, "herds", cat3);
-
- } catch (Exception ex) {
- throw new RuntimeException("Error creating entity", ex);
+ em.createConnection( entity, "herds", cat1 );
+ em.createConnection( entity, "herds", cat2 );
+ em.createConnection( entity, "herds", cat3 );
}
-
- entityRefs.add(new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
- if ( entityCount % 10 == 0 ) {
- logger.info("Created {} entities", entityCount );
+ catch ( Exception ex ) {
+ throw new RuntimeException( "Error creating entity", ex );
}
+ entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+ if ( i % 10 == 0 ) {
+ logger.info( "Created {} entities", i );
+ }
}
- logger.info("Created {} entities", entityCount);
+ logger.info( "Created {} entities", ENTITIES_TO_INDEX );
app.refreshIndex();
- Thread.sleep(10000);
// ----------------- test that we can read them, should work fine
- logger.debug("Read the data");
- readData( em, "testType", entityCount, 3 );
+ logger.debug( "Read the data" );
+ final String collectionName = "testtypes";
+ readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
// ----------------- delete the system and application indexes
- logger.debug("Deleting app index");
+ logger.debug( "Deleting app index" );
deleteIndex( em.getApplicationId() );
@@ -295,61 +256,73 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
// ----------------- test that we can read them, should fail
- logger.debug("Reading data, should fail this time ");
- try {
- readData( em, "testTypes", entityCount, 3 );
- fail("should have failed to read data");
+ logger.debug( "Reading data, should fail this time " );
+
+ readData( em, collectionName, 0, 0 );
+
- } catch (Exception expected) {}
// ----------------- rebuild index
- logger.debug("Preparing to rebuild all indexes");;
+ logger.debug( "Preparing to rebuild all indexes" );
+ ;
- final String meterName = this.getClass().getSimpleName() + ".rebuildIndex";
- final Meter meter = registry.meter( meterName );
- EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
- int counter = 0;
+ try {
- @Override
- public void onProgress( final EntityRef entity ) {
+ final IndexServiceRequestBuilder builder =
+ reIndexService.getBuilder().withApplicationId( em.getApplicationId() );
- meter.mark();
- logger.debug("Indexing {}:{}", entity.getType(), entity.getUuid());
- if ( counter % 100 == 0 ) {
- logger.info("Reindexed {} entities", counter );
- }
- counter++;
- }
+ ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
+ assertNotNull( status.getJobId(), "JobId is present" );
- };
+ logger.info( "Rebuilt index" );
- try {
- fail( "Implement index rebuild" );
-// setup.getEmf().rebuildInternalIndexes( po );
-//
-// setup.getEmf().rebuildApplicationIndexes( em.getApplicationId(), po );
+ waitForRebuild( status, reIndexService );
- reporter.report();
- registry.remove( meterName );
- logger.info("Rebuilt index");
- app.refreshIndex();
+ logger.info( "Rebuilt index" );
- } catch (Exception ex) {
- logger.error("Error rebuilding index", ex);
+ app.refreshIndex();
+ }
+ catch ( Exception ex ) {
+ logger.error( "Error rebuilding index", ex );
fail();
}
// ----------------- test that we can read them
- Thread.sleep(2000);
- readData( em, "testTypes", entityCount, 3 );
+ Thread.sleep( 2000 );
+ readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
+ }
+
+
+ /**
+ * Wait for the rebuild to occur
+ */
+ private void waitForRebuild( final ReIndexService.ReIndexStatus status, final ReIndexService reIndexService )
+ throws InterruptedException {
+ while ( true ) {
+
+ try {
+ final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatus( status.getJobId() );
+
+ if ( updatedStatus.getStatus() == ReIndexService.Status.COMPLETE ) {
+ break;
+ }
+ }
+ catch ( IllegalArgumentException iae ) {
+ //swallow
+ }
+
+
+ Thread.sleep( 1000 );
+ }
}
+
/**
* Delete app index
*/
@@ -360,54 +333,49 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
Id appId = new SimpleId( appUuid, Schema.TYPE_APPLICATION );
ApplicationScope scope = new ApplicationScopeImpl( appId );
- ApplicationEntityIndex ei = eif.createApplicationEntityIndex(scope);
+ ApplicationEntityIndex ei = eif.createApplicationEntityIndex( scope );
- ei.deleteApplication().toBlocking().lastOrDefault(null);
+ ei.deleteApplication().toBlocking().lastOrDefault( null );
app.refreshIndex();
-
}
- private int readData( EntityManager em,
- String collectionName, int expectedEntities, int expectedConnections ) throws Exception {
+ private int readData( EntityManager em, String collectionName, int expectedEntities, int expectedConnections )
+ throws Exception {
app.refreshIndex();
- Query q = Query.fromQL("select * where key1=1000");
- q.setLimit(40);
- Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q,expectedEntities );
+ Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
+ Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
int count = 0;
while ( true ) {
for ( Entity e : results.getEntities() ) {
- assertEquals( 2000, e.getProperty("key2"));
+ assertEquals( 2000, e.getProperty( "key2" ) );
- Results catResults = em.searchTargetEntities(e,
- Query.fromQL("select *").setConnectionType("herds"));
+ Results catResults =
+ em.searchTargetEntities( e, Query.fromQL( "select *" ).setConnectionType( "herds" ) );
assertEquals( expectedConnections, catResults.size() );
if ( count % 100 == 0 ) {
- logger.info( "read {} entities", count);
+ logger.info( "read {} entities", count );
}
count++;
}
if ( results.hasCursor() ) {
- logger.info( "Counted {} : query again with cursor", count);
+ logger.info( "Counted {} : query again with cursor", count );
q.setCursor( results.getCursor() );
results = em.searchCollection( em.getApplicationRef(), collectionName, q );
-
- } else {
+ }
+ else {
break;
}
}
- if ( expectedEntities != -1 && expectedEntities != count ) {
- throw new RuntimeException("Did not get expected "
- + expectedEntities + " entities, instead got " + count );
- }
+ assertEquals("Did not get expected entities", expectedEntities, count);
return count;
}
}