You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/10/04 05:49:11 UTC

[1/2] usergrid git commit: Revert "Add ability to start initial re-index seek with the UNIX timestamp. This will only start seeking from the time provided, rather than seeking all and discarding what doesn't match a filter."

Repository: usergrid
Updated Branches:
  refs/heads/hotfix-20160819 85cc93436 -> f8c92f6ef


Revert "Add ability to start initial re-index seek with the UNIX timestamp.  This will only start seeking from the time provided, rather than seeking all and discarding what doesn't match a filter."

This reverts commit 85cc93436a163c3ba21a7ac1286c6bce3daebeb4.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5db402d5
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5db402d5
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5db402d5

Branch: refs/heads/hotfix-20160819
Commit: 5db402d53e40b99901b2a97894cbdd77e60881b3
Parents: 85cc934
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Oct 3 21:46:46 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Oct 3 21:46:46 2016 -0700

----------------------------------------------------------------------
 .../corepersistence/index/ReIndexServiceImpl.java | 18 ++++--------------
 .../rx/impl/AllEntityIdsObservable.java           |  6 +-----
 .../rx/impl/AllEntityIdsObservableImpl.java       |  7 ++-----
 .../graph/serialization/EdgesObservable.java      |  4 +---
 .../serialization/impl/EdgesObservableImpl.java   | 16 +++-------------
 .../usergrid/rest/system/IndexResource.java       | 12 ++++++------
 6 files changed, 17 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index f37f9af..19fbcfa 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -135,17 +135,7 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
 
-        final long startTimestamp;
-        if ( reIndexRequestBuilder.getUpdateTimestamp().isPresent() && reIndexRequestBuilder.getUpdateTimestamp().get() > 0 ){
-
-            // edge timestamps are UUID timestamps, we need to convert from UNIX epoch to a UUID timestamp
-            long uuidEpochNanos = 0x01b21dd213814000L; // num 100 nano seconds since uuid epoch
-            startTimestamp = reIndexRequestBuilder.getUpdateTimestamp().get()*10000 + uuidEpochNanos;
-            logger.info("Reindex provided with from timestamp, converted to an Edge timestamp is: {}", startTimestamp);
-        }else{
-            startTimestamp = 0;
-        }
-
+        final long modifiedSince = reIndexRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
 
         // create an observable that loads a batch to be indexed
 
@@ -175,11 +165,11 @@ public class ReIndexServiceImpl implements ReIndexService {
         }
 
         allEntityIdsObservable.getEdgesToEntities( applicationScopes,
-            reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue(), startTimestamp )
+            reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
             .buffer( indexProcessorFig.getReindexBufferSize())
             .doOnNext( edgeScopes -> {
                 logger.info("Sending batch of {} to be indexed.", edgeScopes.size());
-                indexService.indexBatch(edgeScopes, startTimestamp);
+                indexService.indexBatch(edgeScopes, modifiedSince);
                 count.addAndGet(edgeScopes.size() );
                 if( edgeScopes.size() > 0 ) {
                     writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
@@ -188,7 +178,7 @@ public class ReIndexServiceImpl implements ReIndexService {
             .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
             .subscribeOn( Schedulers.io() ).subscribe();
 
-
+        
         return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
index fe7a455..9070609 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
@@ -46,12 +46,8 @@ public interface AllEntityIdsObservable {
      * @param appScopes
      * @param edgeType The edge type to use (if specified)
      * @param lastEdge The edge to resume processing from
-     * @param startTimestamp An optional unix timestamp to start the seek ( it will be converted to an Edge )
      * @return
      */
-    Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes,
-                                              final Optional<String> edgeType,
-                                              Optional<Edge> lastEdge,
-                                              final long startTimestamp );
+    Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge);
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
index e6f3633..0420a32 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
@@ -82,15 +82,12 @@ public class AllEntityIdsObservableImpl implements AllEntityIdsObservable {
 
 
     @Override
-    public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes,
-                                                     final Optional<String> edgeType,
-                                                     final Optional<Edge> lastEdge,
-                                                     final long startTimestamp ) {
+    public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge) {
 
         return appScopes.flatMap( applicationScope -> {
             final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
 
-            return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType, lastEdge, startTimestamp )
+            return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType, lastEdge )
                                   .map( edge -> new EdgeScope(applicationScope, edge ));
         } );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 7c83207..78a1d4b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -64,10 +64,8 @@ public interface EdgesObservable {
      * @param sourceNode
      * @param edgeType The edge type if specified.  Otherwise all types will be used
      * @param resume The edge to start seeking after.  Otherwise starts at the most recent
-     * @param startTimestamp A unix timestamp to start seeking from if you don't have the edge cursor
      * @return
      */
     Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
-                                                final Optional<String> edgeType, final Optional<Edge> resume,
-                                                final long startTimestamp );
+                                                final Optional<String> edgeType, final Optional<Edge> resume );
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index 2504e87..20efe42 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -20,7 +20,6 @@
 package org.apache.usergrid.persistence.graph.serialization.impl;
 
 
-import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,8 +74,8 @@ public class EdgesObservableImpl implements EdgesObservable {
 
     @Override
     public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
-                                                       final Optional<String> edgeTypeInput, final Optional<Edge> resume,
-                                                       final long startTimestamp ) {
+                                                       final Optional<String> edgeTypeInput, final Optional<Edge> resume  ) {
+
 
 
         final Observable<String> edgeTypes = edgeTypeInput.isPresent()? Observable.just( edgeTypeInput.get() ):
@@ -85,22 +84,13 @@ public class EdgesObservableImpl implements EdgesObservable {
 
         return edgeTypes.flatMap(  edgeType -> {
 
-            final Optional<Edge> start;
-
-            if( !resume.isPresent() && startTimestamp > 0 ){
-                // the target node doesn't matter here, the search only looks at the timestamp
-                start = Optional.of(new SimpleEdge(sourceNode, edgeType, sourceNode, startTimestamp));
-            }else{
-                start = resume;
-            }
-
                 if (logger.isTraceEnabled()) {
                     logger.trace("Loading edges of edgeType {} from {}", edgeType, sourceNode);
                 }
 
                 return gm.loadEdgesFromSource(
                     new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                       start ) );
+                       resume ) );
         } );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
index 2be5b87..be60177 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
@@ -61,7 +61,7 @@ import java.util.UUID;
 public class IndexResource extends AbstractContextResource {
 
     private static final Logger logger = LoggerFactory.getLogger( IndexResource.class );
-    private static final String SINCE_FIELD = "since";
+    private static final String UPDATED_FIELD = "updated";
 
 
 
@@ -321,17 +321,17 @@ public class IndexResource extends AbstractContextResource {
                                                             final String callback ) {
 
         Map<String,Object> newPayload = payload;
-        if(newPayload == null ||  !payload.containsKey(SINCE_FIELD)){
+        if(newPayload == null ||  !payload.containsKey( UPDATED_FIELD )){
             newPayload = new HashMap<>(1);
-            newPayload.put(SINCE_FIELD,0);
+            newPayload.put(UPDATED_FIELD,0);
         }
 
-        Preconditions.checkArgument(newPayload.get(SINCE_FIELD) instanceof Number,
+        Preconditions.checkArgument(newPayload.get(UPDATED_FIELD) instanceof Number,
                 "You must specified the field \"updated\" in the payload and it must be a timestamp" );
 
         //add our updated timestamp to the request
-        if ( newPayload.containsKey(SINCE_FIELD) ) {
-            final long timestamp = ConversionUtils.getLong(newPayload.get(SINCE_FIELD));
+        if ( newPayload.containsKey( UPDATED_FIELD ) ) {
+            final long timestamp = ConversionUtils.getLong(newPayload.get(UPDATED_FIELD));
             request.withStartTimestamp( timestamp );
         }
 


[2/2] usergrid git commit: Add a utility queue to the async event service for things like re-index.

Posted by mr...@apache.org.
Add a utility queue to the async event service for things like re-index.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f8c92f6e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f8c92f6e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f8c92f6e

Branch: refs/heads/hotfix-20160819
Commit: f8c92f6eff376a58394bca3be2340ace6d0de02c
Parents: 5db402d
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Oct 3 22:48:49 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Oct 3 22:48:49 2016 -0700

----------------------------------------------------------------------
 .../asyncevents/AsyncEventService.java          |   3 +-
 .../asyncevents/AsyncEventServiceImpl.java      | 114 +++++++++++++++----
 .../index/IndexProcessorFig.java                |   9 ++
 .../corepersistence/index/ReIndexAction.java    |   5 +-
 .../index/ReIndexServiceImpl.java               |   4 +-
 .../read/traverse/AbstractReadGraphFilter.java  |   2 +-
 .../AbstractReadReverseGraphFilter.java         |   2 +-
 7 files changed, 110 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index d833cf7..ec84a0a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -81,8 +81,9 @@ public interface AsyncEventService extends ReIndexAction {
     /**
      *
      * @param indexOperationMessage
+     * @param forUtilityQueue
      */
-    void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage );
+    void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue);
 
     /**
      * @param applicationScope

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index a108e40..bc5d139 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -78,6 +78,8 @@ import rx.Subscriber;
 import rx.Subscription;
 import rx.schedulers.Schedulers;
 
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
 
 /**
  * TODO, this whole class is becoming a nightmare.  We need to remove all consume from this class and refactor it into the following manner.
@@ -103,8 +105,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     // SQS maximum receive messages is 10
     public int MAX_TAKE = 10;
     public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
+    public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
+
 
     private final QueueManager queue;
+    private final QueueManager utilityQueue;
     private final IndexProcessorFig indexProcessorFig;
     private final QueueFig queueFig;
     private final IndexProducer indexProducer;
@@ -125,6 +130,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
     private final Counter indexErrorCounter;
     private final AtomicLong counter = new AtomicLong();
+    private final AtomicLong counterUtility = new AtomicLong();
     private final AtomicLong inFlight = new AtomicLong();
     private final Histogram messageCycle;
     private final MapManager esMapPersistence;
@@ -160,7 +166,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         this.rxTaskScheduler = rxTaskScheduler;
 
         QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
+        QueueScope utilityQueueScope = new QueueScopeImpl(QUEUE_NAME_UTILITY, QueueScope.RegionImplementation.ALL);
         this.queue = queueManagerFactory.getQueueManager(queueScope);
+        this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope);
 
         this.indexProcessorFig = indexProcessorFig;
         this.queueFig = queueFig;
@@ -201,12 +209,16 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void offerTopic( final Serializable operation ) {
+    private void offerTopic(final Serializable operation, boolean forUtilityQueue) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
             //signal to SQS
-            this.queue.sendMessageToTopic( operation );
+            if (forUtilityQueue) {
+                this.utilityQueue.sendMessageToTopic(operation);
+            } else {
+                this.queue.sendMessageToTopic(operation);
+            }
         }
         catch ( IOException e ) {
             throw new RuntimeException( "Unable to queue message", e );
@@ -216,12 +228,16 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
     }
 
-    private void offerBatch(final List operations){
-        final Timer.Context timer = this.writeTimer.time();
 
+    private void offerBatch(final List operations, boolean forUtilityQueue){
+        final Timer.Context timer = this.writeTimer.time();
         try {
             //signal to SQS
-            this.queue.sendMessages(operations);
+            if( forUtilityQueue ){
+                this.utilityQueue.sendMessages(operations);
+            }else{
+                this.queue.sendMessages(operations);
+            }
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -229,6 +245,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
     }
 
+    private void offerBatchToUtilityQueue(final List operations){
+        try {
+            //signal to SQS
+            this.utilityQueue.sendMessages(operations);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to queue message", e);
+        }
+    }
+
 
     /**
      * Take message from SQS
@@ -246,6 +271,22 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
     }
 
+    /**
+     * Take message from SQS utility queue
+     */
+    private List<QueueMessage> takeFromUtilityQueue() {
+
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
+            return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class);
+        }
+        finally {
+            //stop our timer
+            timer.stop();
+        }
+    }
+
 
 
     /**
@@ -271,6 +312,17 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
     /**
+     * Ack message in SQS
+     */
+    public void ackUtilityQueue(final List<QueueMessage> messages) {
+        try{
+            utilityQueue.commitMessages( messages );
+        }catch(Exception e){
+            throw new RuntimeException("Unable to ack messages", e);
+        }
+    }
+
+    /**
      * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed
      * @param messages
      * @return
@@ -401,7 +453,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
             applicationScope);
         offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
-            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
+            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), false);
     }
 
 
@@ -418,7 +470,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         final IndexOperationMessage indexMessage =
             eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
 
-        queueIndexOperationMessage( indexMessage );
+        queueIndexOperationMessage( indexMessage, false);
 
     }
 
@@ -515,8 +567,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     /**
      * Queue up an indexOperationMessage for multi region execution
      * @param indexOperationMessage
+     * @param forUtilityQueue
      */
-    public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
+    public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue) {
 
         // don't try to produce something with nothing
         if(indexOperationMessage == null || indexOperationMessage.isEmpty()){
@@ -533,8 +586,6 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         //write to the map in ES
         esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds );
 
-
-
         //now queue up the index message
 
         final ElasticsearchIndexEvent elasticsearchIndexEvent =
@@ -542,7 +593,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         //send to the topic so all regions index the batch
 
-        offerTopic( elasticsearchIndexEvent );
+        offerTopic( elasticsearchIndexEvent, forUtilityQueue );
     }
 
     private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) throws IndexDocNotFoundException {
@@ -607,7 +658,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         // queue the de-index of old versions to the topic so cleanup happens in all regions
         offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
-            new EntityIdScope( applicationScope, entityId), markedVersion) );
+            new EntityIdScope( applicationScope, entityId), markedVersion), false);
 
     }
 
@@ -717,9 +768,14 @@ public class AsyncEventServiceImpl implements AsyncEventService {
      */
     public void start() {
         final int count = indexProcessorFig.getWorkerCount();
+        final int utilityCount = indexProcessorFig.getWorkerCountUtility();
 
         for (int i = 0; i < count; i++) {
-            startWorker();
+            startWorker(QUEUE_NAME);
+        }
+
+        for (int i = 0; i < utilityCount; i++) {
+            startWorker(QUEUE_NAME_UTILITY);
         }
     }
 
@@ -738,22 +794,31 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void startWorker() {
+    private void startWorker(final String type) {
+        Preconditions.checkNotNull(type, "Worker type required");
         synchronized (mutex) {
 
+            boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+
             Observable<List<QueueMessage>> consumer =
                     Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
                         @Override
                         public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
 
                             //name our thread so it's easy to see
-                            Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
+                            long threadNum = isUtilityQueue ? counterUtility.incrementAndGet() : counter.incrementAndGet();
+                            Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
 
-                            List<QueueMessage> drainList = null;
+                                List<QueueMessage> drainList = null;
 
                             do {
                                 try {
-                                    drainList = take();
+                                    if ( isUtilityQueue ){
+                                        drainList = takeFromUtilityQueue();
+                                    }else{
+                                        drainList = take();
+
+                                    }
                                     //emit our list in it's entity to hand off to a worker pool
                                         subscriber.onNext(drainList);
 
@@ -808,7 +873,12 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                                                      // ack each message if making it to this point
                                                      if( messagesToAck.size() > 0 ){
-                                                         ack( messagesToAck );
+
+                                                         if ( isUtilityQueue ){
+                                                             ackUtilityQueue( messagesToAck );
+                                                         }else{
+                                                             ack( messagesToAck );
+                                                         }
                                                      }
 
                                                      return messagesToAck;
@@ -861,7 +931,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             // collect into a list of QueueMessages that can be ack'd later
             .collect(Collectors.toList());
 
-       queueIndexOperationMessage(combined);
+       queueIndexOperationMessage(combined, false);
 
         return queueMessages;
     }
@@ -871,10 +941,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         EntityIndexOperation entityIndexOperation =
             new EntityIndexOperation( applicationScope, id, updatedSince);
 
-        queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null));
+        queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false);
     }
 
-    public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
+    public void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue) {
 
         final List<EntityIndexEvent> batch = new ArrayList<>();
         edges.forEach(e -> {
@@ -884,7 +954,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         });
 
-        offerBatch( batch );
+        offerBatch( batch, forUtilityQueue );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 1038408..45dff1c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -36,6 +36,8 @@ public interface IndexProcessorFig extends GuicyFig {
 
     String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
 
+    String ELASTICSEARCH_WORKER_COUNT_UTILITY = "elasticsearch.worker_count_utility";
+
     String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
 
     String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
@@ -82,6 +84,13 @@ public interface IndexProcessorFig extends GuicyFig {
     int getWorkerCount();
 
     /**
+     * The number of worker threads used to read utility requests from the queue ( mostly re-index ).
+     */
+    @Default("2")
+    @Key(ELASTICSEARCH_WORKER_COUNT_UTILITY)
+    int getWorkerCountUtility();
+
+    /**
      * Set the implementation to use for queuing.
      * Valid values: TEST, LOCAL, SQS, SNS
      * NOTE: SQS and SNS equate to the same implementation of Amazon queue services.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index 5e201fb..2b3573e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -39,9 +39,10 @@ public interface ReIndexAction {
     void index( final ApplicationScope applicationScope, final Id id, final long updatedSince );
 
     /**
-     * Index a batch list of entities.
+     * Index a batch list of entities.  Goes to the utility queue.
      * @param edges
      * @param updatedSince
+     * @param forUtilityQueue
      */
-    void indexBatch ( final List<EdgeScope> edges, final long updatedSince);
+    void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 19fbcfa..b292005 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -169,7 +169,7 @@ public class ReIndexServiceImpl implements ReIndexService {
             .buffer( indexProcessorFig.getReindexBufferSize())
             .doOnNext( edgeScopes -> {
                 logger.info("Sending batch of {} to be indexed.", edgeScopes.size());
-                indexService.indexBatch(edgeScopes, modifiedSince);
+                indexService.indexBatch(edgeScopes, modifiedSince, true);
                 count.addAndGet(edgeScopes.size() );
                 if( edgeScopes.size() > 0 ) {
                     writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
@@ -178,7 +178,7 @@ public class ReIndexServiceImpl implements ReIndexService {
             .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
             .subscribeOn( Schedulers.io() ).subscribe();
 
-        
+
         return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 62f6548..83f4c8b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -258,7 +258,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
             .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
             .filter(msg -> !msg.isEmpty())
             .doOnNext(indexOperation -> {
-                asyncEventService.queueIndexOperationMessage(indexOperation);
+                asyncEventService.queueIndexOperationMessage(indexOperation, false);
             });
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
index dcda98f..1afb524 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
@@ -251,7 +251,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
             .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
             .filter(msg -> !msg.isEmpty())
             .doOnNext(indexOperation -> {
-                asyncEventService.queueIndexOperationMessage(indexOperation);
+                asyncEventService.queueIndexOperationMessage(indexOperation, false);
             });
 
     }