You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2017/07/14 18:14:11 UTC

usergrid git commit: Initial commit for exposing collection re-index to non sysadmin users.

Repository: usergrid
Updated Branches:
  refs/heads/expose-reindex [created] 148337ec8


Initial commit for exposing collection re-index to non sysadmin users.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/148337ec
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/148337ec
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/148337ec

Branch: refs/heads/expose-reindex
Commit: 148337ec848b2077c14967a138242e39476b2ef6
Parents: a6fee78
Author: Michael Russo <ru...@google.com>
Authored: Wed Jun 28 11:07:15 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Wed Jun 28 11:07:15 2017 -0700

----------------------------------------------------------------------
 .../corepersistence/index/ReIndexService.java   |  33 +++++-
 .../index/ReIndexServiceImpl.java               | 106 +++++++++++++------
 .../rest/applications/CollectionResource.java   |  28 +++--
 .../usergrid/rest/system/IndexResource.java     |  78 ++++++++++++--
 4 files changed, 194 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/148337ec/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index b9238e5..d37f117 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import org.apache.usergrid.utils.StringUtils;
+
 /**
  * An interface for re-indexing all entities in an application
  */
@@ -47,6 +49,13 @@ public interface ReIndexService {
      */
     ReIndexStatus getStatus( final String jobId );
 
+    /**
+     * Get the status of a collection job
+     * @param collectionName The collectionName for the rebuild index
+     * @return
+     */
+    ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName );
+
 
     /**
      * The response when requesting a re-index operation
@@ -56,14 +65,27 @@ public interface ReIndexService {
         final Status status;
         final long numberProcessed;
         final long lastUpdated;
+        final String collectionName;
 
 
         public ReIndexStatus( final String jobId, final Status status, final long numberProcessed,
-                              final long lastUpdated ) {
-            this.jobId = jobId;
+                              final long lastUpdated, final String collectionName ) {
+
+            if(StringUtils.isNotEmpty(jobId)){
+                this.jobId = jobId;
+            }else {
+                this.jobId = "";
+            }
+
             this.status = status;
             this.numberProcessed = numberProcessed;
             this.lastUpdated = lastUpdated;
+
+            if(StringUtils.isNotEmpty(collectionName)){
+                this.collectionName = collectionName;
+            }else {
+                this.collectionName = "";
+            }
         }
 
 
@@ -74,6 +96,13 @@ public interface ReIndexService {
             return jobId;
         }
 
+        /**
+         * Get the jobId used to resume this operation
+         */
+        public String getCollectionName() {
+            return collectionName;
+        }
+
 
         /**
          * Get the last updated time, as a long

http://git-wip-us.apache.org/repos/asf/usergrid/blob/148337ec/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 0660d5e..0731af7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -74,6 +74,7 @@ public class ReIndexServiceImpl implements ReIndexService {
     private static final String MAP_COUNT_KEY = "count";
     private static final String MAP_STATUS_KEY = "status";
     private static final String MAP_UPDATED_KEY = "lastUpdated";
+    private static final String MAP_SEPARATOR = "|||";
 
 
     private final AllApplicationsObservable allApplicationsObservable;
@@ -139,7 +140,9 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         // create an observable that loads a batch to be indexed
 
-        if(reIndexRequestBuilder.getCollectionName().isPresent()) {
+        final boolean isForCollection = reIndexRequestBuilder.getCollectionName().isPresent();
+
+        if(isForCollection) {
 
             String collectionName =  InflectionUtils.pluralize(
                 CpNamingUtils.getNameFromEdgeType(reIndexRequestBuilder.getCollectionName().get() ));
@@ -174,12 +177,36 @@ public class ReIndexServiceImpl implements ReIndexService {
                 if( edgeScopes.size() > 0 ) {
                     writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
                 }
-                writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); })
-            .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
+                if( isForCollection ){
+                    writeStateMetaForCollection(
+                        appId.get().getApplication().getUuid().toString(),
+                        reIndexRequestBuilder.getCollectionName().get(),
+                        Status.INPROGRESS, count.get(),
+                        System.currentTimeMillis() );
+                }else{
+                    writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() );
+                }
+            })
+            .doOnCompleted(() ->{
+                if( isForCollection ){
+                    writeStateMetaForCollection(
+                        appId.get().getApplication().getUuid().toString(),
+                        reIndexRequestBuilder.getCollectionName().get(),
+                        Status.COMPLETE, count.get(),
+                        System.currentTimeMillis() );
+                }else {
+                    writeStateMeta(jobId, Status.COMPLETE, count.get(), System.currentTimeMillis());
+                }
+            })
             .subscribeOn( Schedulers.io() ).subscribe();
 
+        if(isForCollection){
+            return new ReIndexStatus( "", Status.STARTED, 0, 0, reIndexRequestBuilder.getCollectionName().get() );
+
+        }
+
 
-        return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
+        return new ReIndexStatus( jobId, Status.STARTED, 0, 0, "" );
     }
 
 
@@ -195,38 +222,15 @@ public class ReIndexServiceImpl implements ReIndexService {
         return getIndexResponse( jobId );
     }
 
-
-    /**
-     * Simple collector that counts state, then flushed every time a buffer is provided.  Writes final state when complete
-     */
-    private class FlushingCollector {
-
-        private final String jobId;
-        private long count;
-
-
-        private FlushingCollector( final String jobId ) {
-            this.jobId = jobId;
-        }
-
-
-        public void flushBuffer( final List<EdgeScope> buffer ) {
-            count += buffer.size();
-
-            //write our cursor state
-            if ( buffer.size() > 0 ) {
-                writeCursorState( jobId, buffer.get( buffer.size() - 1 ) );
-            }
-
-            writeStateMeta( jobId, Status.INPROGRESS, count, System.currentTimeMillis() );
-        }
-
-        public void complete(){
-            writeStateMeta( jobId, Status.COMPLETE, count, System.currentTimeMillis() );
-        }
+    @Override
+    public ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName ) {
+        Preconditions.checkNotNull( collectionName, "appIdString must not be null" );
+        Preconditions.checkNotNull( collectionName, "collectionName must not be null" );
+        return getIndexResponseForCollection( appIdString, collectionName );
     }
 
 
+
     /**
      * Get the resume edge scope
      *
@@ -345,7 +349,7 @@ public class ReIndexServiceImpl implements ReIndexService {
         final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY );
 
         if(stringStatus == null){
-           return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0 );
+           return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0, "" );
         }
 
         final Status status = Status.valueOf( stringStatus );
@@ -353,7 +357,39 @@ public class ReIndexServiceImpl implements ReIndexService {
         final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
         final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY );
 
-        return new ReIndexStatus( jobId, status, processedCount, lastUpdated );
+        return new ReIndexStatus( jobId, status, processedCount, lastUpdated, "" );
+    }
+
+
+    private void writeStateMetaForCollection(final String appIdString, final String collectionName,
+                                             final Status status, final long processedCount, final long lastUpdated ) {
+
+        if(logger.isDebugEnabled()) {
+            logger.debug( "Flushing state for collection {}, status {}, processedCount {}, lastUpdated {}",
+                collectionName, status, processedCount, lastUpdated);
+        }
+
+        mapManager.putString( appIdString + MAP_SEPARATOR + collectionName + MAP_STATUS_KEY, status.name() );
+        mapManager.putLong( appIdString + MAP_SEPARATOR + collectionName + MAP_COUNT_KEY, processedCount );
+        mapManager.putLong( appIdString + MAP_SEPARATOR + collectionName + MAP_UPDATED_KEY, lastUpdated );
+    }
+
+
+    private ReIndexStatus getIndexResponseForCollection( final String appIdString, final String collectionName ) {
+
+        final String stringStatus =
+            mapManager.getString( appIdString + MAP_SEPARATOR + collectionName + MAP_STATUS_KEY );
+
+        if(stringStatus == null){
+            return new ReIndexStatus( "", Status.UNKNOWN, 0, 0, collectionName );
+        }
+
+        final Status status = Status.valueOf( stringStatus );
+
+        final long processedCount = mapManager.getLong( appIdString + MAP_SEPARATOR + collectionName + MAP_COUNT_KEY );
+        final long lastUpdated = mapManager.getLong( appIdString + MAP_SEPARATOR + collectionName + MAP_UPDATED_KEY );
+
+        return new ReIndexStatus( "", status, processedCount, lastUpdated, collectionName );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/148337ec/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
index b8c1caa..b32a718 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
@@ -48,6 +48,8 @@ import org.apache.usergrid.services.ServicePayload;
 
 import com.fasterxml.jackson.jaxrs.json.annotation.JSONP;
 
+import java.util.Map;
+
 
 /**
  * A collection resource that stands before the Service Resource. If it cannot find
@@ -190,24 +192,38 @@ public class CollectionResource extends ServiceResource {
     }
 
 
-    // TODO: this can't be controlled and until it can be controlled we shouldn' allow muggles to do this.
-    // So system access only.
-    // TODO: use scheduler here to get around people sending a reindex call 30 times.
     @POST
     @Path("{itemName}/_reindex")
     @Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
-    @RequireSystemAccess
+    @RequireApplicationAccess
     @JSONP
     public ApiResponse executePostForReindexing(
-        @Context UriInfo ui, String body,
+        @Context UriInfo ui, final Map<String, Object> payload,
         @PathParam("itemName") PathSegment itemName,
         @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
 
         addItemToServiceContext( ui, itemName );
 
         IndexResource indexResource = new IndexResource(injector);
-        return indexResource.rebuildIndexesPost(
+        return indexResource.rebuildIndexCollectionPost(payload,
             services.getApplicationId().toString(),itemName.getPath(),false,callback );
     }
 
+    @GET
+    @Path("{itemName}/_reindex")
+    @Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
+    @RequireApplicationAccess
+    @JSONP
+    public ApiResponse executeGetForReindexStatus(
+        @Context UriInfo ui, final Map<String, Object> payload,
+        @PathParam("itemName") PathSegment itemName,
+        @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
+
+        addItemToServiceContext( ui, itemName );
+
+        IndexResource indexResource = new IndexResource(injector);
+        return indexResource.rebuildIndexCollectionGet(services.getApplicationId().toString(), itemName.getPath(),
+            callback );
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/148337ec/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
index be60177..ec86750 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
@@ -28,13 +28,16 @@ import com.google.inject.Injector;
 import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
 import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilderImpl;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
+import org.apache.usergrid.exception.ConflictException;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.index.utils.ConversionUtils;
 import org.apache.usergrid.persistence.index.utils.UUIDUtils;
 import org.apache.usergrid.rest.AbstractContextResource;
 import org.apache.usergrid.rest.ApiResponse;
 import org.apache.usergrid.rest.RootResource;
+import org.apache.usergrid.rest.security.annotations.RequireOrganizationAccess;
 import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
+import org.apache.usergrid.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.annotation.Scope;
@@ -182,26 +185,78 @@ public class IndexResource extends AbstractContextResource {
         return executeResumeAndCreateResponse( payload, request, callback );
     }
 
+    @RequireOrganizationAccess
+    @GET
+    @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" )
+    @JSONP
+    @Produces({ MediaType.APPLICATION_JSON, "application/javascript" })
+    public ApiResponse rebuildIndexCollectionGet( @PathParam( "applicationId" ) final String applicationIdStr,
+                                        @PathParam( "collectionName" ) final String collectionName,
+                                        @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
 
-    @RequireSystemAccess
+
+        throws Exception {
+        if (logger.isTraceEnabled()) {
+            logger.trace("Getting re-index status for app: {}, collection: {}", applicationIdStr, collectionName);
+        }
+
+
+        ReIndexService.ReIndexStatus status = getReIndexService().getStatusForCollection(applicationIdStr, collectionName);
+
+        final ApiResponse response = createApiResponse();
+
+        response.setAction( "get rebuild index status" );
+        response.setProperty( "collection", status.getCollectionName() );
+        response.setProperty( "status", status.getStatus() );
+        response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
+        response.setProperty( "numberQueued", status.getNumberProcessed() );
+        response.setSuccess();
+
+        return response;
+    }
+
+    @RequireOrganizationAccess
     @POST
     @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" )
     @JSONP
     @Produces({MediaType.APPLICATION_JSON, "application/javascript"})
-    public ApiResponse rebuildIndexesPost( @PathParam( "applicationId" ) final String applicationIdStr,
-                                               @PathParam( "collectionName" ) final String collectionName,
-                                               @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
-                                               @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+    public ApiResponse rebuildIndexCollectionPost(final Map<String, Object> payload,
+                                                  @PathParam( "applicationId" ) final String applicationIdStr,
+                                                  @PathParam( "collectionName" ) final String collectionName,
+                                                  @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
+                                                  @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
         throws Exception {
 
+        ReIndexService.ReIndexStatus existingStatus =
+            getReIndexService().getStatusForCollection(applicationIdStr, collectionName);
 
-        logger.info( "Rebuilding collection {} in  application {}", collectionName, applicationIdStr );
+        if(existingStatus.getStatus().equals(ReIndexService.Status.INPROGRESS)){
+            throw new ConflictException("Re-index for collection currently in progress");
+        }
+
+        logger.info( "Re-indexing collection {} in  application {}", collectionName, applicationIdStr );
 
         final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
 
+
         final ReIndexRequestBuilder request =
             createRequest().withApplicationId( appId ).withCollection( collectionName );
 
+        Map<String,Object> newPayload = payload;
+        if(newPayload == null ||  !payload.containsKey( UPDATED_FIELD )){
+            newPayload = new HashMap<>(1);
+            newPayload.put(UPDATED_FIELD,0);
+        }
+
+        Preconditions.checkArgument(newPayload.get(UPDATED_FIELD) instanceof Number,
+            "Property \"updated\" in the payload must be a number in unix timestamp millis format" );
+
+        //add our updated timestamp to the request
+        if ( newPayload.containsKey( UPDATED_FIELD ) ) {
+            final long timestamp = ConversionUtils.getLong(newPayload.get(UPDATED_FIELD));
+            request.withStartTimestamp( timestamp );
+        }
+
         return executeAndCreateResponse( request, callback );
     }
 
@@ -214,7 +269,6 @@ public class IndexResource extends AbstractContextResource {
     public ApiResponse rebuildIndexesPut( final Map<String, Object> payload,
                                               @PathParam( "applicationId" ) final String applicationIdStr,
                                               @PathParam( "collectionName" ) final String collectionName,
-                                              @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
                                               @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
         throws Exception {
 
@@ -350,7 +404,15 @@ public class IndexResource extends AbstractContextResource {
         final ApiResponse response = createApiResponse();
 
         response.setAction( "rebuild indexes" );
-        response.setProperty( "jobId", status.getJobId() );
+
+        if(StringUtils.isNotEmpty(status.getJobId())){
+            response.setProperty( "jobId", status.getJobId() );
+        }
+
+        if(StringUtils.isNotEmpty(status.getCollectionName())){
+            response.setProperty( "collection", status.getCollectionName() );
+        }
+
         response.setProperty( "status", status.getStatus() );
         response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
         response.setProperty( "numberQueued", status.getNumberProcessed() );