You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/31 15:46:12 UTC

[17/37] usergrid git commit: Add a utility queue to the async event service for things like re-index.

Add a utility queue to the async event service for things like re-index.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f8c92f6e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f8c92f6e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f8c92f6e

Branch: refs/heads/usergrid-1318-queue
Commit: f8c92f6eff376a58394bca3be2340ace6d0de02c
Parents: 5db402d
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Oct 3 22:48:49 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Oct 3 22:48:49 2016 -0700

----------------------------------------------------------------------
 .../asyncevents/AsyncEventService.java          |   3 +-
 .../asyncevents/AsyncEventServiceImpl.java      | 114 +++++++++++++++----
 .../index/IndexProcessorFig.java                |   9 ++
 .../corepersistence/index/ReIndexAction.java    |   5 +-
 .../index/ReIndexServiceImpl.java               |   4 +-
 .../read/traverse/AbstractReadGraphFilter.java  |   2 +-
 .../AbstractReadReverseGraphFilter.java         |   2 +-
 7 files changed, 110 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index d833cf7..ec84a0a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -81,8 +81,9 @@ public interface AsyncEventService extends ReIndexAction {
     /**
      *
      * @param indexOperationMessage
+     * @param forUtilityQueue
      */
-    void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage );
+    void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue);
 
     /**
      * @param applicationScope

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index a108e40..bc5d139 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -78,6 +78,8 @@ import rx.Subscriber;
 import rx.Subscription;
 import rx.schedulers.Schedulers;
 
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
 
 /**
  * TODO, this whole class is becoming a nightmare.  We need to remove all consume from this class and refactor it into the following manner.
@@ -103,8 +105,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     // SQS maximum receive messages is 10
     public int MAX_TAKE = 10;
     public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
+    public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
+
 
     private final QueueManager queue;
+    private final QueueManager utilityQueue;
     private final IndexProcessorFig indexProcessorFig;
     private final QueueFig queueFig;
     private final IndexProducer indexProducer;
@@ -125,6 +130,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
     private final Counter indexErrorCounter;
     private final AtomicLong counter = new AtomicLong();
+    private final AtomicLong counterUtility = new AtomicLong();
     private final AtomicLong inFlight = new AtomicLong();
     private final Histogram messageCycle;
     private final MapManager esMapPersistence;
@@ -160,7 +166,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         this.rxTaskScheduler = rxTaskScheduler;
 
         QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
+        QueueScope utilityQueueScope = new QueueScopeImpl(QUEUE_NAME_UTILITY, QueueScope.RegionImplementation.ALL);
         this.queue = queueManagerFactory.getQueueManager(queueScope);
+        this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope);
 
         this.indexProcessorFig = indexProcessorFig;
         this.queueFig = queueFig;
@@ -201,12 +209,16 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void offerTopic( final Serializable operation ) {
+    private void offerTopic(final Serializable operation, boolean forUtilityQueue) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
             //signal to SQS
-            this.queue.sendMessageToTopic( operation );
+            if (forUtilityQueue) {
+                this.utilityQueue.sendMessageToTopic(operation);
+            } else {
+                this.queue.sendMessageToTopic(operation);
+            }
         }
         catch ( IOException e ) {
             throw new RuntimeException( "Unable to queue message", e );
@@ -216,12 +228,16 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
     }
 
-    private void offerBatch(final List operations){
-        final Timer.Context timer = this.writeTimer.time();
 
+    private void offerBatch(final List operations, boolean forUtilityQueue){
+        final Timer.Context timer = this.writeTimer.time();
         try {
             //signal to SQS
-            this.queue.sendMessages(operations);
+            if( forUtilityQueue ){
+                this.utilityQueue.sendMessages(operations);
+            }else{
+                this.queue.sendMessages(operations);
+            }
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -229,6 +245,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
     }
 
+    private void offerBatchToUtilityQueue(final List operations){
+        try {
+            //signal to SQS
+            this.utilityQueue.sendMessages(operations);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to queue message", e);
+        }
+    }
+
 
     /**
      * Take message from SQS
@@ -246,6 +271,22 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
     }
 
+    /**
+     * Take message from SQS utility queue
+     */
+    private List<QueueMessage> takeFromUtilityQueue() {
+
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
+            return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class);
+        }
+        finally {
+            //stop our timer
+            timer.stop();
+        }
+    }
+
 
 
     /**
@@ -271,6 +312,17 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
     /**
+     * Ack message in SQS
+     */
+    public void ackUtilityQueue(final List<QueueMessage> messages) {
+        try{
+            utilityQueue.commitMessages( messages );
+        }catch(Exception e){
+            throw new RuntimeException("Unable to ack messages", e);
+        }
+    }
+
+    /**
      * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed
      * @param messages
      * @return
@@ -401,7 +453,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
             applicationScope);
         offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
-            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
+            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), false);
     }
 
 
@@ -418,7 +470,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         final IndexOperationMessage indexMessage =
             eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
 
-        queueIndexOperationMessage( indexMessage );
+        queueIndexOperationMessage( indexMessage, false);
 
     }
 
@@ -515,8 +567,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     /**
      * Queue up an indexOperationMessage for multi region execution
      * @param indexOperationMessage
+     * @param forUtilityQueue
      */
-    public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
+    public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue) {
 
         // don't try to produce something with nothing
         if(indexOperationMessage == null || indexOperationMessage.isEmpty()){
@@ -533,8 +586,6 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         //write to the map in ES
         esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds );
 
-
-
         //now queue up the index message
 
         final ElasticsearchIndexEvent elasticsearchIndexEvent =
@@ -542,7 +593,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         //send to the topic so all regions index the batch
 
-        offerTopic( elasticsearchIndexEvent );
+        offerTopic( elasticsearchIndexEvent, forUtilityQueue );
     }
 
     private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) throws IndexDocNotFoundException {
@@ -607,7 +658,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         // queue the de-index of old versions to the topic so cleanup happens in all regions
         offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
-            new EntityIdScope( applicationScope, entityId), markedVersion) );
+            new EntityIdScope( applicationScope, entityId), markedVersion), false);
 
     }
 
@@ -717,9 +768,14 @@ public class AsyncEventServiceImpl implements AsyncEventService {
      */
     public void start() {
         final int count = indexProcessorFig.getWorkerCount();
+        final int utilityCount = indexProcessorFig.getWorkerCountUtility();
 
         for (int i = 0; i < count; i++) {
-            startWorker();
+            startWorker(QUEUE_NAME);
+        }
+
+        for (int i = 0; i < utilityCount; i++) {
+            startWorker(QUEUE_NAME_UTILITY);
         }
     }
 
@@ -738,22 +794,31 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void startWorker() {
+    private void startWorker(final String type) {
+        Preconditions.checkNotNull(type, "Worker type required");
         synchronized (mutex) {
 
+            boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+
             Observable<List<QueueMessage>> consumer =
                     Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
                         @Override
                         public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
 
                             //name our thread so it's easy to see
-                            Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
+                            long threadNum = isUtilityQueue ? counterUtility.incrementAndGet() : counter.incrementAndGet();
+                            Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
 
-                            List<QueueMessage> drainList = null;
+                                List<QueueMessage> drainList = null;
 
                             do {
                                 try {
-                                    drainList = take();
+                                    if ( isUtilityQueue ){
+                                        drainList = takeFromUtilityQueue();
+                                    }else{
+                                        drainList = take();
+
+                                    }
                                     //emit our list in it's entity to hand off to a worker pool
                                         subscriber.onNext(drainList);
 
@@ -808,7 +873,12 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
                                                      // ack each message if making it to this point
                                                      if( messagesToAck.size() > 0 ){
-                                                         ack( messagesToAck );
+
+                                                         if ( isUtilityQueue ){
+                                                             ackUtilityQueue( messagesToAck );
+                                                         }else{
+                                                             ack( messagesToAck );
+                                                         }
                                                      }
 
                                                      return messagesToAck;
@@ -861,7 +931,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             // collect into a list of QueueMessages that can be ack'd later
             .collect(Collectors.toList());
 
-       queueIndexOperationMessage(combined);
+       queueIndexOperationMessage(combined, false);
 
         return queueMessages;
     }
@@ -871,10 +941,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         EntityIndexOperation entityIndexOperation =
             new EntityIndexOperation( applicationScope, id, updatedSince);
 
-        queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null));
+        queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false);
     }
 
-    public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
+    public void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue) {
 
         final List<EntityIndexEvent> batch = new ArrayList<>();
         edges.forEach(e -> {
@@ -884,7 +954,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         });
 
-        offerBatch( batch );
+        offerBatch( batch, forUtilityQueue );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 1038408..45dff1c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -36,6 +36,8 @@ public interface IndexProcessorFig extends GuicyFig {
 
     String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
 
+    String ELASTICSEARCH_WORKER_COUNT_UTILITY = "elasticsearch.worker_count_utility";
+
     String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
 
     String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
@@ -82,6 +84,13 @@ public interface IndexProcessorFig extends GuicyFig {
     int getWorkerCount();
 
     /**
+     * The number of worker threads used to read utility requests from the queue ( mostly re-index ).
+     */
+    @Default("2")
+    @Key(ELASTICSEARCH_WORKER_COUNT_UTILITY)
+    int getWorkerCountUtility();
+
+    /**
      * Set the implementation to use for queuing.
      * Valid values: TEST, LOCAL, SQS, SNS
      * NOTE: SQS and SNS equate to the same implementation of Amazon queue services.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index 5e201fb..2b3573e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -39,9 +39,10 @@ public interface ReIndexAction {
     void index( final ApplicationScope applicationScope, final Id id, final long updatedSince );
 
     /**
-     * Index a batch list of entities.
+     * Index a batch list of entities.  Goes to the utility queue.
      * @param edges
      * @param updatedSince
+     * @param forUtilityQueue
      */
-    void indexBatch ( final List<EdgeScope> edges, final long updatedSince);
+    void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 19fbcfa..b292005 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -169,7 +169,7 @@ public class ReIndexServiceImpl implements ReIndexService {
             .buffer( indexProcessorFig.getReindexBufferSize())
             .doOnNext( edgeScopes -> {
                 logger.info("Sending batch of {} to be indexed.", edgeScopes.size());
-                indexService.indexBatch(edgeScopes, modifiedSince);
+                indexService.indexBatch(edgeScopes, modifiedSince, true);
                 count.addAndGet(edgeScopes.size() );
                 if( edgeScopes.size() > 0 ) {
                     writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
@@ -178,7 +178,7 @@ public class ReIndexServiceImpl implements ReIndexService {
             .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
             .subscribeOn( Schedulers.io() ).subscribe();
 
-        
+
         return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 62f6548..83f4c8b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -258,7 +258,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
             .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
             .filter(msg -> !msg.isEmpty())
             .doOnNext(indexOperation -> {
-                asyncEventService.queueIndexOperationMessage(indexOperation);
+                asyncEventService.queueIndexOperationMessage(indexOperation, false);
             });
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
index dcda98f..1afb524 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
@@ -251,7 +251,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
             .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
             .filter(msg -> !msg.isEmpty())
             .doOnNext(indexOperation -> {
-                asyncEventService.queueIndexOperationMessage(indexOperation);
+                asyncEventService.queueIndexOperationMessage(indexOperation, false);
             });
 
     }