You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2016/04/08 22:21:03 UTC
[16/36] usergrid git commit: Comments with changes to make to
selective indexing. Changes to reindexing to add a delay given the reindexing
call.
Comments with changes to make to selective indexing.
Changes to reindexing to add a delay given the reindexing call.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/82ff53b1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/82ff53b1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/82ff53b1
Branch: refs/heads/release-2.1.1
Commit: 82ff53b145d84a7fbf198f0eb63153af0d2b6633
Parents: 1ba12b8
Author: George Reyes <gr...@apache.org>
Authored: Fri Mar 25 09:06:10 2016 -0700
Committer: George Reyes <gr...@apache.org>
Committed: Fri Mar 25 09:06:10 2016 -0700
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 4 +-
.../corepersistence/CpManagerCache.java | 1 +
.../corepersistence/index/IndexServiceImpl.java | 23 +++++-
.../index/ReIndexRequestBuilder.java | 7 ++
.../index/ReIndexRequestBuilderImpl.java | 32 ++++++++
.../index/ReIndexServiceImpl.java | 23 ++++--
.../usergrid/persistence/map/MapManager.java | 2 +
.../rest/applications/CollectionResource.java | 14 +++-
.../collection/CollectionsResourceIT.java | 77 +++++++++++++++++++-
9 files changed, 169 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 1fbacbe..bb3abcb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -1739,9 +1739,11 @@ public class CpEntityManager implements EntityManager {
//haven't decided which one I should base off of which, maybe from epoch to utc
+
+ //TODO: change timeservice as below then use timeservice.
+ //TODO: only allow a single reindex in elasticsearch at a time. akka.
Instant timeInstance = Instant.now();
- Date date = Date.from(timeInstance);
Long epoch = timeInstance.toEpochMilli();
Map<String,Object> schemaMap = new HashMap<>( );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
index 0408bbd..0b7bb43 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
@@ -37,6 +37,7 @@ import com.google.inject.Inject;
/**
* Cache for managing our other managers. Now just a delegate. Needs refactored away
*/
+
public class CpManagerCache implements ManagerCache {
private final EntityCollectionManagerFactory ecmf;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index c0f2e1b..5f50a32 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -121,8 +121,10 @@ public class IndexServiceImpl implements IndexService {
//do our observable for batching
//try to send a whole batch if we can
+
+ //TODO: extract the below and call a single method.
final Observable<IndexOperationMessage> batches = sourceEdgesToIndex
- .buffer(250, TimeUnit.MILLISECONDS, indexFig.getIndexBatchSize() )
+ .buffer(250, TimeUnit.MILLISECONDS, indexFig.getIndexBatchSize() ) //TODO: change to delay. maybe. at least to the before buffer.
//map into batches based on our buffer size
.flatMap( buffer -> Observable.from( buffer )
@@ -202,10 +204,9 @@ public class IndexServiceImpl implements IndexService {
*/
private Map getFilteredStringObjectMap( final ApplicationScope applicationScope,
final Entity entity, final IndexEdge indexEdge ) {
- IndexOperation indexOperation = new IndexOperation();
-
- indexEdge.getNodeId().getUuid();
+ //look into this.
+ IndexOperation indexOperation = new IndexOperation();
Id mapOwner = new SimpleId( indexEdge.getNodeId().getUuid(), TYPE_APPLICATION );
@@ -217,6 +218,7 @@ public class IndexServiceImpl implements IndexService {
Set<String> defaultProperties;
ArrayList fieldsToKeep;
+ //TODO: extract collection name using other classes than the split.
String jsonSchemaMap = mm.getString( indexEdge.getEdgeName().split( "\\|" )[1] );
//If we do have a schema then parse it and add it to a list of properties we want to keep.Otherwise return.
@@ -226,6 +228,8 @@ public class IndexServiceImpl implements IndexService {
Schema schema = Schema.getDefaultSchema();
defaultProperties = schema.getRequiredProperties( indexEdge.getEdgeName().split( "\\|" )[1] );
fieldsToKeep = ( ArrayList ) jsonMapData.get( "fields" );
+ //TODO: add method here to update the relevant fields in the map manager when it was accessed.
+
defaultProperties.addAll( fieldsToKeep );
}
else {
@@ -233,6 +237,7 @@ public class IndexServiceImpl implements IndexService {
}
//Returns the flattened map of the entity.
+ //TODO: maybe instead pass the fields to keep to the flattening.
Map map = indexOperation.convertedEntityToBeIndexed( applicationScope, indexEdge, entity );
HashSet mapFields = ( HashSet ) map.get( "fields" );
@@ -245,6 +250,12 @@ public class IndexServiceImpl implements IndexService {
//Checks to see if the fieldname is a default property. If it is then keep it, otherwise send it to
//be verified the aptly named method
+
+ //one.two.three
+ //one.two.four
+ //one.two3.five
+ //one.two
+ //fields { one.two }
if ( !defaultProperties.contains( fieldName ) ) {
iterateThroughMapForFieldsToBeIndexed( fieldsToKeep, collectionIterator, fieldName );
}
@@ -290,10 +301,14 @@ public class IndexServiceImpl implements IndexService {
//Then if that check passes we go to check that both parts are equal. If they are ever not equal
// e.g one.two.three and one.three.two then it shouldn't be included
+ //TODO: regex.
for ( int index = 0; index < flattedRequirementString.length; index++ ) {
//if the array contains a string that it is equals to then set the remove flag to true
//otherwise remain false.
+ //one.three
+ //one.two
+ //one
if ( flattedStringArray.length <= index ) {
toRemoveFlag = true;
break;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java
index 0863a63..139f832 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.corepersistence.index;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -52,6 +53,8 @@ public interface ReIndexRequestBuilder {
ReIndexRequestBuilder withCursor(final String cursor);
+ ReIndexRequestBuilder withDelay( int delayTimer, TimeUnit timeUnit );
+
/**
* Set the timestamp to re-index entities updated >= this timestamp
* @param timestamp
@@ -60,6 +63,10 @@ public interface ReIndexRequestBuilder {
ReIndexRequestBuilder withStartTimestamp(final Long timestamp);
+ Optional<Integer> getDelayTimer();
+
+ Optional<TimeUnit> getTimeUnitOptional();
+
/**
* Get the application scope
* @return
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java
index e93ccf1..d3e7ecf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java
@@ -21,6 +21,9 @@ package org.apache.usergrid.corepersistence.index;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.auth.IAuthenticator;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -37,6 +40,8 @@ public class ReIndexRequestBuilderImpl implements ReIndexRequestBuilder {
private Optional<String> withCollectionName = Optional.absent();
private Optional<String> cursor = Optional.absent();
private Optional<Long> updateTimestamp = Optional.absent();
+ private Optional<Integer> delayTimer = Optional.absent();
+ private Optional<TimeUnit> timeUnitOptional = Optional.absent();
/***
@@ -81,6 +86,22 @@ public class ReIndexRequestBuilderImpl implements ReIndexRequestBuilder {
/**
+ * Determines whether we should tack on a delay for reindexing and for how long if we do. Also
+ * allowed to specify how throttled back it should be.
+ * @param delayTimer
+ * @param timeUnit
+ * @return
+ */
+ @Override
+ public ReIndexRequestBuilder withDelay( final int delayTimer, final TimeUnit timeUnit ){
+ this.delayTimer = Optional.fromNullable( delayTimer );
+ this.timeUnitOptional = Optional.fromNullable( timeUnit );
+
+ return this;
+ }
+
+
+ /**
* Set start timestamp in epoch time. Only entities updated since this time will be processed for indexing
* @param timestamp
* @return
@@ -93,6 +114,17 @@ public class ReIndexRequestBuilderImpl implements ReIndexRequestBuilder {
@Override
+ public Optional<Integer> getDelayTimer() {
+ return delayTimer;
+ }
+
+ @Override
+ public Optional<TimeUnit> getTimeUnitOptional() {
+ return timeUnitOptional;
+ }
+
+
+ @Override
public Optional<ApplicationScope> getApplicationScope() {
if ( this.withApplicationId.isPresent() ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index fc1c97f..558a18c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -99,6 +99,7 @@ public class ReIndexServiceImpl implements ReIndexService {
}
+ //TODO: optional delay, param.
@Override
public ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBuilder ) {
@@ -111,6 +112,10 @@ public class ReIndexServiceImpl implements ReIndexService {
final Optional<ApplicationScope> appId = reIndexRequestBuilder.getApplicationScope();
+ final Optional<Integer> delayTimer = reIndexRequestBuilder.getDelayTimer();
+
+ final Optional<TimeUnit> timeUnitOptional = reIndexRequestBuilder.getTimeUnitOptional();
+
Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()),
"You cannot specify an app id and a cursor. When resuming with cursor you must omit the appid" );
@@ -123,16 +128,24 @@ public class ReIndexServiceImpl implements ReIndexService {
// create an observable that loads a batch to be indexed
- final Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+ Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
- .buffer( indexProcessorFig.getReindexBufferSize())
- //.delay( 50, TimeUnit.MILLISECONDS )
- .doOnNext(edges -> {
+ .buffer( indexProcessorFig.getReindexBufferSize());
+ if(delayTimer.isPresent()){
+ if(timeUnitOptional.isPresent()){
+ runningReIndex = runningReIndex.delay( delayTimer.get(),timeUnitOptional.get() );
+ }
+ else{
+ runningReIndex = runningReIndex.delay( delayTimer.get(), TimeUnit.MILLISECONDS );
+ }
+ }
+
+ runningReIndex = runningReIndex.doOnNext(edges -> {
logger.info("Sending batch of {} to be indexed.", edges.size());
indexService.indexBatch(edges, modifiedSince);
- });
+ });
//start our sampler and state persistence
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
index 80e2d17..ca8fd9a 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
@@ -27,6 +27,8 @@ import java.util.UUID;
/**
* Generator of a map manager instance
*/
+//TODO: This should be a singleton, otherwise cache could be out of sync and would need to invalidated everywher
+ //TODO: make manager cache injectable everywhere.
public interface MapManager {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
index b674ac4..fb660fa 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.rest.applications;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
@@ -47,6 +48,7 @@ import org.apache.usergrid.rest.AbstractContextResource;
import org.apache.usergrid.rest.ApiResponse;
import org.apache.usergrid.rest.RootResource;
import org.apache.usergrid.rest.security.annotations.RequireApplicationAccess;
+import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
import org.apache.usergrid.services.AbstractCollectionService;
import org.apache.usergrid.services.ServiceAction;
import org.apache.usergrid.services.ServiceParameter;
@@ -91,6 +93,9 @@ public class CollectionResource extends ServiceResource {
if(logger.isTraceEnabled()){
logger.trace( "ServiceResource.executePostOnIndexes" );
}
+ /**
+
+ */
Object json;
if ( StringUtils.isEmpty( body ) ) {
@@ -135,10 +140,12 @@ public class CollectionResource extends ServiceResource {
return response;
}
+ //TODO: this can't be controlled and until it can be controlled we should allow muggles to do this. So system access only.
+ //TODO: use scheduler here to get around people sending a reindex call 30 times.
@POST
@Path("_reindex")
@Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
- @RequireApplicationAccess
+ @RequireSystemAccess
@JSONP
public ApiResponse executePostForReindexing( @Context UriInfo ui, String body,
@QueryParam("callback") @DefaultValue("callback") String callback )
@@ -146,8 +153,8 @@ public class CollectionResource extends ServiceResource {
final ReIndexRequestBuilder request =
createRequest().withApplicationId( services.getApplicationId() ).withCollection(
- String.valueOf( getServiceParameters().get( 0 ) ) );
-//
+ String.valueOf( getServiceParameters().get( 0 ) ) ).withDelay( 1, TimeUnit.SECONDS );
+
return executeAndCreateResponse( request, callback );
}
@@ -170,6 +177,7 @@ public class CollectionResource extends ServiceResource {
}
+ //TODO: change this to {itemName}/_indexes and that should do what we already have. Then we don't have this overriden method.
@Override
@Path("{itemName}")
public AbstractContextResource addNameParameter( @Context UriInfo ui, @PathParam("itemName") PathSegment itemName )
http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
index 5900dd5..109bb6f 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
@@ -168,9 +168,84 @@ public class CollectionsResourceIT extends AbstractRestIT {
//Reindex and verify that the entity only has field one index.
this.app().collection( "testCollection" ).collection( "_reindex" ).post();
- Thread.sleep( 10000 );
+ // Thread.sleep( 10000 );
+ //refreshIndex();
+
+ for(int i = 0; i < 10; i++) {
+ String query = "one ='value"+ i + "'";
+ QueryParameters queryParameters = new QueryParameters().setQuery( query );
+
+ //having a name breaks it. Need to get rid of the stack trace and also
+ Collection tempEntity = this.app().collection( "testCollection" ).get( queryParameters, true );
+ Entity reindexedEntity = tempEntity.getResponse().getEntity();
+ assertEquals( "value"+i, reindexedEntity.get( "one" ) );
+
+ //Verify if you can query on an entity that was not indexed and that no entities are returned.
+ query = "two = 'valuetwo1"+ i + "'";
+ queryParameters = new QueryParameters().setQuery( query );
+ tempEntity = this.app().collection( "testCollection" ).get( queryParameters, true );
+ assertEquals( 0, tempEntity.getResponse().getEntities().size() );
+ }
+ }
+
+
+ @Test
+ public void postToCollectionSchemaAndVerifyFieldsAreUpdated() throws Exception {
+
+ //Create test collection with test entity that is full text indexed.
+ Entity testEntity = new Entity();
+
+
+ for(int i = 0; i < 10; i++){
+ testEntity.put( "one","value"+i );
+ testEntity.put( "two","valuetwo"+i );
+ this.app().collection( "testCollection" ).post( testEntity );
+ }
+
+
+ //Creating schema.
+ //this could be changed to a hashmap.
+ ArrayList<String> indexingArray = new ArrayList<>( );
+ indexingArray.add( "one" );
+
+
+ //field "fields" is required.
+ Entity payload = new Entity();
+ payload.put( "fields", indexingArray);
+
+ //Post index to the collection metadata
+ Entity thing = this.app().collection( "testCollection" ).collection( "_indexes" ).post( payload );
refreshIndex();
+ //TODO: write a test to verify the data below.
+
+ Collection collection = this.app().collection( "testCollection" ).collection( "_index" ).get();
+
+ LinkedHashMap testCollectionSchema = (LinkedHashMap)collection.getResponse().getData();
+ //TODO: the below will have to be replaced by the values that I deem correct.
+ assertEquals( ( thing ).get( "lastUpdated" ), testCollectionSchema.get( "lastUpdated" ));
+ assertEquals( ( thing ).get( "lastUpdateBy" ),testCollectionSchema.get( "lastUpdateBy" ) );
+ assertEquals( ( thing ).get( "lastReindexed" ),testCollectionSchema.get( "lastReindexed" ) );
+
+ //TODO: this test doesn't check to see if create checks the schema. Only that the reindex removes whats already there.
+ ArrayList<String> schema = ( ArrayList<String> ) testCollectionSchema.get( "fields" );
+ assertEquals( "one",schema.get( 0 ) );
+
+
+ //Reindex and verify that the entity only has field one index.
+ this.app().collection( "testCollection" ).collection( "_reindex" ).post();
+
+ collection = this.app().collection( "testCollection" ).collection( "_index" ).get();
+
+ testCollectionSchema = (LinkedHashMap)collection.getResponse().getData();
+ assertEquals( ( thing ).get( "lastUpdated" ), testCollectionSchema.get( "lastUpdated" ));
+ assertEquals( ( thing ).get( "lastUpdateBy" ),testCollectionSchema.get( "lastUpdateBy" ) );
+ assertNotEquals( ( thing ).get( "lastReindexed" ),testCollectionSchema.get( "lastReindexed" ) );
+
+ schema = ( ArrayList<String> ) testCollectionSchema.get( "fields" );
+ assertEquals( "one",schema.get( 0 ) );
+
+
for(int i = 0; i < 10; i++) {
String query = "one ='value"+ i + "'";
QueryParameters queryParameters = new QueryParameters().setQuery( query );