You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2017/07/14 18:14:11 UTC
usergrid git commit: Initial commit for exposing collection re-index
to non sysadmin users.
Repository: usergrid
Updated Branches:
refs/heads/expose-reindex [created] 148337ec8
Initial commit for exposing collection re-index to non sysadmin users.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/148337ec
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/148337ec
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/148337ec
Branch: refs/heads/expose-reindex
Commit: 148337ec848b2077c14967a138242e39476b2ef6
Parents: a6fee78
Author: Michael Russo <ru...@google.com>
Authored: Wed Jun 28 11:07:15 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Wed Jun 28 11:07:15 2017 -0700
----------------------------------------------------------------------
.../corepersistence/index/ReIndexService.java | 33 +++++-
.../index/ReIndexServiceImpl.java | 106 +++++++++++++------
.../rest/applications/CollectionResource.java | 28 +++--
.../usergrid/rest/system/IndexResource.java | 78 ++++++++++++--
4 files changed, 194 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/148337ec/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index b9238e5..d37f117 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.index;
+import org.apache.usergrid.utils.StringUtils;
+
/**
* An interface for re-indexing all entities in an application
*/
@@ -47,6 +49,13 @@ public interface ReIndexService {
*/
ReIndexStatus getStatus( final String jobId );
+ /**
+ * Get the status of a collection job
+ * @param collectionName The collectionName for the rebuild index
+ * @return
+ */
+ ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName );
+
/**
* The response when requesting a re-index operation
@@ -56,14 +65,27 @@ public interface ReIndexService {
final Status status;
final long numberProcessed;
final long lastUpdated;
+ final String collectionName;
public ReIndexStatus( final String jobId, final Status status, final long numberProcessed,
- final long lastUpdated ) {
- this.jobId = jobId;
+ final long lastUpdated, final String collectionName ) {
+
+ if(StringUtils.isNotEmpty(jobId)){
+ this.jobId = jobId;
+ }else {
+ this.jobId = "";
+ }
+
this.status = status;
this.numberProcessed = numberProcessed;
this.lastUpdated = lastUpdated;
+
+ if(StringUtils.isNotEmpty(collectionName)){
+ this.collectionName = collectionName;
+ }else {
+ this.collectionName = "";
+ }
}
@@ -74,6 +96,13 @@ public interface ReIndexService {
return jobId;
}
+ /**
+ * Get the jobId used to resume this operation
+ */
+ public String getCollectionName() {
+ return collectionName;
+ }
+
/**
* Get the last updated time, as a long
http://git-wip-us.apache.org/repos/asf/usergrid/blob/148337ec/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 0660d5e..0731af7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -74,6 +74,7 @@ public class ReIndexServiceImpl implements ReIndexService {
private static final String MAP_COUNT_KEY = "count";
private static final String MAP_STATUS_KEY = "status";
private static final String MAP_UPDATED_KEY = "lastUpdated";
+ private static final String MAP_SEPARATOR = "|||";
private final AllApplicationsObservable allApplicationsObservable;
@@ -139,7 +140,9 @@ public class ReIndexServiceImpl implements ReIndexService {
// create an observable that loads a batch to be indexed
- if(reIndexRequestBuilder.getCollectionName().isPresent()) {
+ final boolean isForCollection = reIndexRequestBuilder.getCollectionName().isPresent();
+
+ if(isForCollection) {
String collectionName = InflectionUtils.pluralize(
CpNamingUtils.getNameFromEdgeType(reIndexRequestBuilder.getCollectionName().get() ));
@@ -174,12 +177,36 @@ public class ReIndexServiceImpl implements ReIndexService {
if( edgeScopes.size() > 0 ) {
writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
}
- writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); })
- .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
+ if( isForCollection ){
+ writeStateMetaForCollection(
+ appId.get().getApplication().getUuid().toString(),
+ reIndexRequestBuilder.getCollectionName().get(),
+ Status.INPROGRESS, count.get(),
+ System.currentTimeMillis() );
+ }else{
+ writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() );
+ }
+ })
+ .doOnCompleted(() ->{
+ if( isForCollection ){
+ writeStateMetaForCollection(
+ appId.get().getApplication().getUuid().toString(),
+ reIndexRequestBuilder.getCollectionName().get(),
+ Status.COMPLETE, count.get(),
+ System.currentTimeMillis() );
+ }else {
+ writeStateMeta(jobId, Status.COMPLETE, count.get(), System.currentTimeMillis());
+ }
+ })
.subscribeOn( Schedulers.io() ).subscribe();
+ if(isForCollection){
+ return new ReIndexStatus( "", Status.STARTED, 0, 0, reIndexRequestBuilder.getCollectionName().get() );
+
+ }
+
- return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
+ return new ReIndexStatus( jobId, Status.STARTED, 0, 0, "" );
}
@@ -195,38 +222,15 @@ public class ReIndexServiceImpl implements ReIndexService {
return getIndexResponse( jobId );
}
-
- /**
- * Simple collector that counts state, then flushed every time a buffer is provided. Writes final state when complete
- */
- private class FlushingCollector {
-
- private final String jobId;
- private long count;
-
-
- private FlushingCollector( final String jobId ) {
- this.jobId = jobId;
- }
-
-
- public void flushBuffer( final List<EdgeScope> buffer ) {
- count += buffer.size();
-
- //write our cursor state
- if ( buffer.size() > 0 ) {
- writeCursorState( jobId, buffer.get( buffer.size() - 1 ) );
- }
-
- writeStateMeta( jobId, Status.INPROGRESS, count, System.currentTimeMillis() );
- }
-
- public void complete(){
- writeStateMeta( jobId, Status.COMPLETE, count, System.currentTimeMillis() );
- }
+ @Override
+ public ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName ) {
+ Preconditions.checkNotNull( collectionName, "appIdString must not be null" );
+ Preconditions.checkNotNull( collectionName, "collectionName must not be null" );
+ return getIndexResponseForCollection( appIdString, collectionName );
}
+
/**
* Get the resume edge scope
*
@@ -345,7 +349,7 @@ public class ReIndexServiceImpl implements ReIndexService {
final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY );
if(stringStatus == null){
- return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0 );
+ return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0, "" );
}
final Status status = Status.valueOf( stringStatus );
@@ -353,7 +357,39 @@ public class ReIndexServiceImpl implements ReIndexService {
final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY );
- return new ReIndexStatus( jobId, status, processedCount, lastUpdated );
+ return new ReIndexStatus( jobId, status, processedCount, lastUpdated, "" );
+ }
+
+
+ private void writeStateMetaForCollection(final String appIdString, final String collectionName,
+ final Status status, final long processedCount, final long lastUpdated ) {
+
+ if(logger.isDebugEnabled()) {
+ logger.debug( "Flushing state for collection {}, status {}, processedCount {}, lastUpdated {}",
+ collectionName, status, processedCount, lastUpdated);
+ }
+
+ mapManager.putString( appIdString + MAP_SEPARATOR + collectionName + MAP_STATUS_KEY, status.name() );
+ mapManager.putLong( appIdString + MAP_SEPARATOR + collectionName + MAP_COUNT_KEY, processedCount );
+ mapManager.putLong( appIdString + MAP_SEPARATOR + collectionName + MAP_UPDATED_KEY, lastUpdated );
+ }
+
+
+ private ReIndexStatus getIndexResponseForCollection( final String appIdString, final String collectionName ) {
+
+ final String stringStatus =
+ mapManager.getString( appIdString + MAP_SEPARATOR + collectionName + MAP_STATUS_KEY );
+
+ if(stringStatus == null){
+ return new ReIndexStatus( "", Status.UNKNOWN, 0, 0, collectionName );
+ }
+
+ final Status status = Status.valueOf( stringStatus );
+
+ final long processedCount = mapManager.getLong( appIdString + MAP_SEPARATOR + collectionName + MAP_COUNT_KEY );
+ final long lastUpdated = mapManager.getLong( appIdString + MAP_SEPARATOR + collectionName + MAP_UPDATED_KEY );
+
+ return new ReIndexStatus( "", status, processedCount, lastUpdated, collectionName );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/148337ec/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
index b8c1caa..b32a718 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java
@@ -48,6 +48,8 @@ import org.apache.usergrid.services.ServicePayload;
import com.fasterxml.jackson.jaxrs.json.annotation.JSONP;
+import java.util.Map;
+
/**
* A collection resource that stands before the Service Resource. If it cannot find
@@ -190,24 +192,38 @@ public class CollectionResource extends ServiceResource {
}
- // TODO: this can't be controlled and until it can be controlled we shouldn' allow muggles to do this.
- // So system access only.
- // TODO: use scheduler here to get around people sending a reindex call 30 times.
@POST
@Path("{itemName}/_reindex")
@Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
- @RequireSystemAccess
+ @RequireApplicationAccess
@JSONP
public ApiResponse executePostForReindexing(
- @Context UriInfo ui, String body,
+ @Context UriInfo ui, final Map<String, Object> payload,
@PathParam("itemName") PathSegment itemName,
@QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
addItemToServiceContext( ui, itemName );
IndexResource indexResource = new IndexResource(injector);
- return indexResource.rebuildIndexesPost(
+ return indexResource.rebuildIndexCollectionPost(payload,
services.getApplicationId().toString(),itemName.getPath(),false,callback );
}
+ @GET
+ @Path("{itemName}/_reindex")
+ @Produces({ MediaType.APPLICATION_JSON,"application/javascript"})
+ @RequireApplicationAccess
+ @JSONP
+ public ApiResponse executeGetForReindexStatus(
+ @Context UriInfo ui, final Map<String, Object> payload,
+ @PathParam("itemName") PathSegment itemName,
+ @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception {
+
+ addItemToServiceContext( ui, itemName );
+
+ IndexResource indexResource = new IndexResource(injector);
+ return indexResource.rebuildIndexCollectionGet(services.getApplicationId().toString(), itemName.getPath(),
+ callback );
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/148337ec/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
index be60177..ec86750 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
@@ -28,13 +28,16 @@ import com.google.inject.Injector;
import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilderImpl;
import org.apache.usergrid.corepersistence.index.ReIndexService;
+import org.apache.usergrid.exception.ConflictException;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.index.utils.ConversionUtils;
import org.apache.usergrid.persistence.index.utils.UUIDUtils;
import org.apache.usergrid.rest.AbstractContextResource;
import org.apache.usergrid.rest.ApiResponse;
import org.apache.usergrid.rest.RootResource;
+import org.apache.usergrid.rest.security.annotations.RequireOrganizationAccess;
import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
+import org.apache.usergrid.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
@@ -182,26 +185,78 @@ public class IndexResource extends AbstractContextResource {
return executeResumeAndCreateResponse( payload, request, callback );
}
+ @RequireOrganizationAccess
+ @GET
+ @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" )
+ @JSONP
+ @Produces({ MediaType.APPLICATION_JSON, "application/javascript" })
+ public ApiResponse rebuildIndexCollectionGet( @PathParam( "applicationId" ) final String applicationIdStr,
+ @PathParam( "collectionName" ) final String collectionName,
+ @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
- @RequireSystemAccess
+
+ throws Exception {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Getting re-index status for app: {}, collection: {}", applicationIdStr, collectionName);
+ }
+
+
+ ReIndexService.ReIndexStatus status = getReIndexService().getStatusForCollection(applicationIdStr, collectionName);
+
+ final ApiResponse response = createApiResponse();
+
+ response.setAction( "get rebuild index status" );
+ response.setProperty( "collection", status.getCollectionName() );
+ response.setProperty( "status", status.getStatus() );
+ response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
+ response.setProperty( "numberQueued", status.getNumberProcessed() );
+ response.setSuccess();
+
+ return response;
+ }
+
+ @RequireOrganizationAccess
@POST
@Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" )
@JSONP
@Produces({MediaType.APPLICATION_JSON, "application/javascript"})
- public ApiResponse rebuildIndexesPost( @PathParam( "applicationId" ) final String applicationIdStr,
- @PathParam( "collectionName" ) final String collectionName,
- @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
- @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+ public ApiResponse rebuildIndexCollectionPost(final Map<String, Object> payload,
+ @PathParam( "applicationId" ) final String applicationIdStr,
+ @PathParam( "collectionName" ) final String collectionName,
+ @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
+ @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
throws Exception {
+ ReIndexService.ReIndexStatus existingStatus =
+ getReIndexService().getStatusForCollection(applicationIdStr, collectionName);
- logger.info( "Rebuilding collection {} in application {}", collectionName, applicationIdStr );
+ if(existingStatus.getStatus().equals(ReIndexService.Status.INPROGRESS)){
+ throw new ConflictException("Re-index for collection currently in progress");
+ }
+
+ logger.info( "Re-indexing collection {} in application {}", collectionName, applicationIdStr );
final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
+
final ReIndexRequestBuilder request =
createRequest().withApplicationId( appId ).withCollection( collectionName );
+ Map<String,Object> newPayload = payload;
+ if(newPayload == null || !payload.containsKey( UPDATED_FIELD )){
+ newPayload = new HashMap<>(1);
+ newPayload.put(UPDATED_FIELD,0);
+ }
+
+ Preconditions.checkArgument(newPayload.get(UPDATED_FIELD) instanceof Number,
+ "Property \"updated\" in the payload must be a number in unix timestamp millis format" );
+
+ //add our updated timestamp to the request
+ if ( newPayload.containsKey( UPDATED_FIELD ) ) {
+ final long timestamp = ConversionUtils.getLong(newPayload.get(UPDATED_FIELD));
+ request.withStartTimestamp( timestamp );
+ }
+
return executeAndCreateResponse( request, callback );
}
@@ -214,7 +269,6 @@ public class IndexResource extends AbstractContextResource {
public ApiResponse rebuildIndexesPut( final Map<String, Object> payload,
@PathParam( "applicationId" ) final String applicationIdStr,
@PathParam( "collectionName" ) final String collectionName,
- @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
@QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
throws Exception {
@@ -350,7 +404,15 @@ public class IndexResource extends AbstractContextResource {
final ApiResponse response = createApiResponse();
response.setAction( "rebuild indexes" );
- response.setProperty( "jobId", status.getJobId() );
+
+ if(StringUtils.isNotEmpty(status.getJobId())){
+ response.setProperty( "jobId", status.getJobId() );
+ }
+
+ if(StringUtils.isNotEmpty(status.getCollectionName())){
+ response.setProperty( "collection", status.getCollectionName() );
+ }
+
response.setProperty( "status", status.getStatus() );
response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
response.setProperty( "numberQueued", status.getNumberProcessed() );