You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2016/04/08 22:21:03 UTC

[16/36] usergrid git commit: Comments with changes to make to selective indexing. Changes to reindexing to add a delay given the reindexing call.

Comments with changes to make to selective indexing.
Changes to reindexing to add a delay given the reindexing call.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/82ff53b1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/82ff53b1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/82ff53b1

Branch: refs/heads/release-2.1.1
Commit: 82ff53b145d84a7fbf198f0eb63153af0d2b6633
Parents: 1ba12b8
Author: George Reyes <gr...@apache.org>
Authored: Fri Mar 25 09:06:10 2016 -0700
Committer: George Reyes <gr...@apache.org>
Committed: Fri Mar 25 09:06:10 2016 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  4 +-
 .../corepersistence/CpManagerCache.java         |  1 +
 .../corepersistence/index/IndexServiceImpl.java | 23 +++++-
 .../index/ReIndexRequestBuilder.java            |  7 ++
 .../index/ReIndexRequestBuilderImpl.java        | 32 ++++++++
 .../index/ReIndexServiceImpl.java               | 23 ++++--
 .../usergrid/persistence/map/MapManager.java    |  2 +
 .../rest/applications/CollectionResource.java   | 14 +++-
 .../collection/CollectionsResourceIT.java       | 77 +++++++++++++++++++-
 9 files changed, 169 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 1fbacbe..bb3abcb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -1739,9 +1739,11 @@ public class CpEntityManager implements EntityManager {
 
 
         //haven't decided which one I should base off of which, maybe from epoch to utc
+
+        //TODO: change timeservice as below then use timeservice.
+        //TODO: only allow a single reindex in elasticsearch at a time. akka.
         Instant timeInstance = Instant.now();
 
-        Date date = Date.from(timeInstance);
         Long epoch = timeInstance.toEpochMilli();
 
         Map<String,Object> schemaMap = new HashMap<>(  );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
index 0408bbd..0b7bb43 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
@@ -37,6 +37,7 @@ import com.google.inject.Inject;
 /**
  * Cache for managing our other managers.  Now just a delegate.  Needs refactored away
  */
+
 public class CpManagerCache implements ManagerCache {
 
     private final EntityCollectionManagerFactory ecmf;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index c0f2e1b..5f50a32 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -121,8 +121,10 @@ public class IndexServiceImpl implements IndexService {
 
         //do our observable for batching
         //try to send a whole batch if we can
+
+        //TODO: extract the below and call a single method.
         final Observable<IndexOperationMessage>  batches =  sourceEdgesToIndex
-            .buffer(250, TimeUnit.MILLISECONDS, indexFig.getIndexBatchSize() )
+            .buffer(250, TimeUnit.MILLISECONDS, indexFig.getIndexBatchSize() ) //TODO: change to delay. maybe. at least to the before buffer.
 
             //map into batches based on our buffer size
             .flatMap( buffer -> Observable.from( buffer )
@@ -202,10 +204,9 @@ public class IndexServiceImpl implements IndexService {
      */
     private Map getFilteredStringObjectMap( final ApplicationScope applicationScope,
                                             final Entity entity, final IndexEdge indexEdge ) {
-        IndexOperation indexOperation = new IndexOperation();
-
 
-        indexEdge.getNodeId().getUuid();
+        //look into this.
+        IndexOperation indexOperation = new IndexOperation();
 
         Id mapOwner = new SimpleId( indexEdge.getNodeId().getUuid(), TYPE_APPLICATION );
 
@@ -217,6 +218,7 @@ public class IndexServiceImpl implements IndexService {
         Set<String> defaultProperties;
         ArrayList fieldsToKeep;
 
+        //TODO: extract collection name using other classes than the split.
         String jsonSchemaMap = mm.getString( indexEdge.getEdgeName().split( "\\|" )[1] );
 
         //If we do have a schema then parse it and add it to a list of properties we want to keep.Otherwise return.
@@ -226,6 +228,8 @@ public class IndexServiceImpl implements IndexService {
             Schema schema = Schema.getDefaultSchema();
             defaultProperties = schema.getRequiredProperties( indexEdge.getEdgeName().split( "\\|" )[1] );
             fieldsToKeep = ( ArrayList ) jsonMapData.get( "fields" );
+            //TODO: add method here to update the relevant fields in the map manager when it was accessed.
+
             defaultProperties.addAll( fieldsToKeep );
         }
         else {
@@ -233,6 +237,7 @@ public class IndexServiceImpl implements IndexService {
         }
 
         //Returns the flattened map of the entity.
+        //TODO: maybe instead pass the fields to keep to the flattening.
         Map map = indexOperation.convertedEntityToBeIndexed( applicationScope, indexEdge, entity );
 
         HashSet mapFields = ( HashSet ) map.get( "fields" );
@@ -245,6 +250,12 @@ public class IndexServiceImpl implements IndexService {
 
             //Checks to see if the fieldname is a default property. If it is then keep it, otherwise send it to
             //be verified the aptly named method
+
+            //one.two.three
+            //one.two.four
+            //one.two3.five
+            //one.two
+            //fields { one.two }
             if ( !defaultProperties.contains( fieldName ) ) {
                 iterateThroughMapForFieldsToBeIndexed( fieldsToKeep, collectionIterator, fieldName );
             }
@@ -290,10 +301,14 @@ public class IndexServiceImpl implements IndexService {
 
             //Then if that check passes we go to check that both parts are equal. If they are ever not equal
             // e.g one.two.three and one.three.two then it shouldn't be included
+            //TODO: regex.
             for ( int index = 0; index < flattedRequirementString.length; index++ ) {
                 //if the array contains a string that it is equals to then set the remove flag to true
                 //otherwise remain false.
 
+                //one.three
+                //one.two
+                //one
                 if ( flattedStringArray.length <= index ) {
                     toRemoveFlag = true;
                     break;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java
index 0863a63..139f832 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.corepersistence.index;
 
 
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
@@ -52,6 +53,8 @@ public interface ReIndexRequestBuilder {
     ReIndexRequestBuilder withCursor(final String cursor);
 
 
+    ReIndexRequestBuilder withDelay( int delayTimer, TimeUnit timeUnit );
+
     /**
      * Set the timestamp to re-index entities updated >= this timestamp
      * @param timestamp
@@ -60,6 +63,10 @@ public interface ReIndexRequestBuilder {
     ReIndexRequestBuilder withStartTimestamp(final Long timestamp);
 
 
+    Optional<Integer> getDelayTimer();
+
+    Optional<TimeUnit> getTimeUnitOptional();
+
     /**
      * Get the application scope
      * @return

http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java
index e93ccf1..d3e7ecf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java
@@ -21,6 +21,9 @@ package org.apache.usergrid.corepersistence.index;
 
 
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.auth.IAuthenticator;
 
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -37,6 +40,8 @@ public class ReIndexRequestBuilderImpl implements ReIndexRequestBuilder {
     private Optional<String> withCollectionName = Optional.absent();
     private Optional<String> cursor = Optional.absent();
     private Optional<Long> updateTimestamp = Optional.absent();
+    private Optional<Integer> delayTimer = Optional.absent();
+    private Optional<TimeUnit> timeUnitOptional = Optional.absent();
 
 
     /***
@@ -81,6 +86,22 @@ public class ReIndexRequestBuilderImpl implements ReIndexRequestBuilder {
 
 
     /**
+     * Determines whether we should tack on a delay for reindexing and for how long if we do. Also
+     * allowed to specify how throttled back it should be.
+     * @param delayTimer
+     * @param timeUnit
+     * @return
+     */
+    @Override
+    public ReIndexRequestBuilder withDelay( final int delayTimer, final TimeUnit timeUnit ){
+        this.delayTimer = Optional.fromNullable( delayTimer );
+        this.timeUnitOptional = Optional.fromNullable( timeUnit );
+
+        return this;
+    }
+
+
+    /**
      * Set start timestamp in epoch time.  Only entities updated since this time will be processed for indexing
      * @param timestamp
      * @return
@@ -93,6 +114,17 @@ public class ReIndexRequestBuilderImpl implements ReIndexRequestBuilder {
 
 
     @Override
+    public Optional<Integer> getDelayTimer() {
+        return delayTimer;
+    }
+
+    @Override
+    public Optional<TimeUnit> getTimeUnitOptional() {
+        return timeUnitOptional;
+    }
+
+
+    @Override
     public Optional<ApplicationScope> getApplicationScope() {
 
         if ( this.withApplicationId.isPresent() ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index fc1c97f..558a18c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -99,6 +99,7 @@ public class ReIndexServiceImpl implements ReIndexService {
     }
 
 
+    //TODO: optional delay, param.
     @Override
     public ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBuilder ) {
 
@@ -111,6 +112,10 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         final Optional<ApplicationScope> appId = reIndexRequestBuilder.getApplicationScope();
 
+        final Optional<Integer> delayTimer = reIndexRequestBuilder.getDelayTimer();
+
+        final Optional<TimeUnit> timeUnitOptional = reIndexRequestBuilder.getTimeUnitOptional();
+
         Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()),
             "You cannot specify an app id and a cursor.  When resuming with cursor you must omit the appid" );
 
@@ -123,16 +128,24 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         // create an observable that loads a batch to be indexed
 
-        final Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+        Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
             reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
-            .buffer( indexProcessorFig.getReindexBufferSize())
-            //.delay( 50, TimeUnit.MILLISECONDS )
-            .doOnNext(edges -> {
+            .buffer( indexProcessorFig.getReindexBufferSize());
 
+        if(delayTimer.isPresent()){
+            if(timeUnitOptional.isPresent()){
+                runningReIndex = runningReIndex.delay( delayTimer.get(),timeUnitOptional.get() );
+            }
+            else{
+                runningReIndex = runningReIndex.delay( delayTimer.get(), TimeUnit.MILLISECONDS );
+            }
+        }
+
+        runningReIndex = runningReIndex.doOnNext(edges -> {
                 logger.info("Sending batch of {} to be indexed.", edges.size());
                 indexService.indexBatch(edges, modifiedSince);
 
-            });
+        });
 
 
         //start our sampler and state persistence

http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
index 80e2d17..ca8fd9a 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
@@ -27,6 +27,8 @@ import java.util.UUID;
 /**
  * Generator of a map manager instance
  */
+//TODO: This should be a singleton, otherwise cache could be out of sync and would need to invalidated everywher
+    //TODO: make manager cache injectable everywhere.
 public interface MapManager {
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
index b674ac4..fb660fa 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.rest.applications;
 
 
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
@@ -47,6 +48,7 @@ import org.apache.usergrid.rest.AbstractContextResource;
 import org.apache.usergrid.rest.ApiResponse;
 import org.apache.usergrid.rest.RootResource;
 import org.apache.usergrid.rest.security.annotations.RequireApplicationAccess;
+import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
 import org.apache.usergrid.services.AbstractCollectionService;
 import org.apache.usergrid.services.ServiceAction;
 import org.apache.usergrid.services.ServiceParameter;
@@ -91,6 +93,9 @@ public class CollectionResource extends ServiceResource {
         if(logger.isTraceEnabled()){
             logger.trace( "ServiceResource.executePostOnIndexes" );
         }
+        /**
+
+         */
 
         Object json;
         if ( StringUtils.isEmpty( body ) ) {
@@ -135,10 +140,12 @@ public class CollectionResource extends ServiceResource {
         return response;
     }
 
+    //TODO: this can't be controlled and until it can be controlled we should allow muggles to do this. So system access only.
+    //TODO: use scheduler here to get around people sending a reindex call 30 times.
     @POST
     @Path("_reindex")
     @Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
-    @RequireApplicationAccess
+    @RequireSystemAccess
     @JSONP
     public ApiResponse executePostForReindexing( @Context UriInfo ui, String body,
                                              @QueryParam("callback") @DefaultValue("callback") String callback )
@@ -146,8 +153,8 @@ public class CollectionResource extends ServiceResource {
 
         final ReIndexRequestBuilder request =
             createRequest().withApplicationId( services.getApplicationId() ).withCollection(
-                String.valueOf( getServiceParameters().get( 0 ) ) );
-//
+                String.valueOf( getServiceParameters().get( 0 ) ) ).withDelay( 1, TimeUnit.SECONDS );
+
         return executeAndCreateResponse( request, callback );
     }
 
@@ -170,6 +177,7 @@ public class CollectionResource extends ServiceResource {
     }
 
 
+    //TODO: change this to {itemName}/_indexes and that should do what we already have. Then we don't have this overriden method.
     @Override
     @Path("{itemName}")
     public AbstractContextResource addNameParameter( @Context UriInfo ui, @PathParam("itemName") PathSegment itemName )

http://git-wip-us.apache.org/repos/asf/usergrid/blob/82ff53b1/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
index 5900dd5..109bb6f 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/CollectionsResourceIT.java
@@ -168,9 +168,84 @@ public class CollectionsResourceIT extends AbstractRestIT {
 
         //Reindex and verify that the entity only has field one index.
         this.app().collection( "testCollection" ).collection( "_reindex" ).post();
-        Thread.sleep( 10000 );
+       // Thread.sleep( 10000 );
+        //refreshIndex();
+
+        for(int i = 0; i < 10; i++) {
+            String query = "one ='value"+ i + "'";
+            QueryParameters queryParameters = new QueryParameters().setQuery( query );
+
+            //having a name breaks it. Need to get rid of the stack trace and also
+            Collection tempEntity = this.app().collection( "testCollection" ).get( queryParameters, true );
+            Entity reindexedEntity = tempEntity.getResponse().getEntity();
+            assertEquals( "value"+i, reindexedEntity.get( "one" ) );
+
+            //Verify if you can query on an entity that was not indexed and that no entities are returned.
+            query = "two = 'valuetwo1"+ i + "'";
+            queryParameters = new QueryParameters().setQuery( query );
+            tempEntity = this.app().collection( "testCollection" ).get( queryParameters, true );
+            assertEquals( 0, tempEntity.getResponse().getEntities().size() );
+        }
+    }
+
+
+    @Test
+    public void postToCollectionSchemaAndVerifyFieldsAreUpdated() throws Exception {
+
+        //Create test collection with test entity that is full text indexed.
+        Entity testEntity = new Entity();
+
+
+        for(int i = 0; i < 10; i++){
+            testEntity.put( "one","value"+i );
+            testEntity.put( "two","valuetwo"+i );
+            this.app().collection( "testCollection" ).post( testEntity );
+        }
+
+
+        //Creating schema.
+        //this could be changed to a hashmap.
+        ArrayList<String> indexingArray = new ArrayList<>(  );
+        indexingArray.add( "one" );
+
+
+        //field "fields" is required.
+        Entity payload = new Entity();
+        payload.put( "fields", indexingArray);
+
+        //Post index to the collection metadata
+        Entity thing = this.app().collection( "testCollection" ).collection( "_indexes" ).post( payload );
         refreshIndex();
 
+        //TODO: write a test to verify the data below.
+
+        Collection collection = this.app().collection( "testCollection" ).collection( "_index" ).get();
+
+        LinkedHashMap testCollectionSchema = (LinkedHashMap)collection.getResponse().getData();
+        //TODO: the below will have to be replaced by the values that I deem correct.
+        assertEquals( ( thing ).get( "lastUpdated" ), testCollectionSchema.get( "lastUpdated" ));
+        assertEquals( ( thing ).get( "lastUpdateBy" ),testCollectionSchema.get( "lastUpdateBy" ) );
+        assertEquals( ( thing ).get( "lastReindexed" ),testCollectionSchema.get( "lastReindexed" ) );
+
+        //TODO: this test doesn't check to see if create checks the schema. Only that the reindex removes whats already there.
+        ArrayList<String> schema = ( ArrayList<String> ) testCollectionSchema.get( "fields" );
+        assertEquals( "one",schema.get( 0 ) );
+
+
+        //Reindex and verify that the entity only has field one index.
+        this.app().collection( "testCollection" ).collection( "_reindex" ).post();
+
+        collection = this.app().collection( "testCollection" ).collection( "_index" ).get();
+
+        testCollectionSchema = (LinkedHashMap)collection.getResponse().getData();
+        assertEquals( ( thing ).get( "lastUpdated" ), testCollectionSchema.get( "lastUpdated" ));
+        assertEquals( ( thing ).get( "lastUpdateBy" ),testCollectionSchema.get( "lastUpdateBy" ) );
+        assertNotEquals( ( thing ).get( "lastReindexed" ),testCollectionSchema.get( "lastReindexed" ) );
+
+        schema = ( ArrayList<String> ) testCollectionSchema.get( "fields" );
+        assertEquals( "one",schema.get( 0 ) );
+
+
         for(int i = 0; i < 10; i++) {
             String query = "one ='value"+ i + "'";
             QueryParameters queryParameters = new QueryParameters().setQuery( query );