You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/24 13:06:56 UTC
[71/83] [abbrv] usergrid git commit: Add a utility queue to the async
event service for things like re-index.
Add a utility queue to the async event service for things like re-index.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f8c92f6e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f8c92f6e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f8c92f6e
Branch: refs/heads/asf-site
Commit: f8c92f6eff376a58394bca3be2340ace6d0de02c
Parents: 5db402d
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Oct 3 22:48:49 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Oct 3 22:48:49 2016 -0700
----------------------------------------------------------------------
.../asyncevents/AsyncEventService.java | 3 +-
.../asyncevents/AsyncEventServiceImpl.java | 114 +++++++++++++++----
.../index/IndexProcessorFig.java | 9 ++
.../corepersistence/index/ReIndexAction.java | 5 +-
.../index/ReIndexServiceImpl.java | 4 +-
.../read/traverse/AbstractReadGraphFilter.java | 2 +-
.../AbstractReadReverseGraphFilter.java | 2 +-
7 files changed, 110 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index d833cf7..ec84a0a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -81,8 +81,9 @@ public interface AsyncEventService extends ReIndexAction {
/**
*
* @param indexOperationMessage
+ * @param forUtilityQueue
*/
- void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage );
+ void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue);
/**
* @param applicationScope
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index a108e40..bc5d139 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -78,6 +78,8 @@ import rx.Subscriber;
import rx.Subscription;
import rx.schedulers.Schedulers;
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
/**
* TODO, this whole class is becoming a nightmare. We need to remove all consume from this class and refactor it into the following manner.
@@ -103,8 +105,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
// SQS maximum receive messages is 10
public int MAX_TAKE = 10;
public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
+ public static final String QUEUE_NAME_UTILITY = "utility"; //keep this short as AWS limits queue name size to 80 chars
+
private final QueueManager queue;
+ private final QueueManager utilityQueue;
private final IndexProcessorFig indexProcessorFig;
private final QueueFig queueFig;
private final IndexProducer indexProducer;
@@ -125,6 +130,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
private final Counter indexErrorCounter;
private final AtomicLong counter = new AtomicLong();
+ private final AtomicLong counterUtility = new AtomicLong();
private final AtomicLong inFlight = new AtomicLong();
private final Histogram messageCycle;
private final MapManager esMapPersistence;
@@ -160,7 +166,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
this.rxTaskScheduler = rxTaskScheduler;
QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
+ QueueScope utilityQueueScope = new QueueScopeImpl(QUEUE_NAME_UTILITY, QueueScope.RegionImplementation.ALL);
this.queue = queueManagerFactory.getQueueManager(queueScope);
+ this.utilityQueue = queueManagerFactory.getQueueManager(utilityQueueScope);
this.indexProcessorFig = indexProcessorFig;
this.queueFig = queueFig;
@@ -201,12 +209,16 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
- private void offerTopic( final Serializable operation ) {
+ private void offerTopic(final Serializable operation, boolean forUtilityQueue) {
final Timer.Context timer = this.writeTimer.time();
try {
//signal to SQS
- this.queue.sendMessageToTopic( operation );
+ if (forUtilityQueue) {
+ this.utilityQueue.sendMessageToTopic(operation);
+ } else {
+ this.queue.sendMessageToTopic(operation);
+ }
}
catch ( IOException e ) {
throw new RuntimeException( "Unable to queue message", e );
@@ -216,12 +228,16 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
}
- private void offerBatch(final List operations){
- final Timer.Context timer = this.writeTimer.time();
+ private void offerBatch(final List operations, boolean forUtilityQueue){
+ final Timer.Context timer = this.writeTimer.time();
try {
//signal to SQS
- this.queue.sendMessages(operations);
+ if( forUtilityQueue ){
+ this.utilityQueue.sendMessages(operations);
+ }else{
+ this.queue.sendMessages(operations);
+ }
} catch (IOException e) {
throw new RuntimeException("Unable to queue message", e);
} finally {
@@ -229,6 +245,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
}
+ private void offerBatchToUtilityQueue(final List operations){
+ try {
+ //signal to SQS
+ this.utilityQueue.sendMessages(operations);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to queue message", e);
+ }
+ }
+
/**
* Take message from SQS
@@ -246,6 +271,22 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
}
+ /**
+ * Take message from SQS utility queue
+ */
+ private List<QueueMessage> takeFromUtilityQueue() {
+
+ final Timer.Context timer = this.readTimer.time();
+
+ try {
+ return utilityQueue.getMessages(MAX_TAKE, AsyncEvent.class);
+ }
+ finally {
+ //stop our timer
+ timer.stop();
+ }
+ }
+
/**
@@ -271,6 +312,17 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
/**
+ * Ack message in SQS
+ */
+ public void ackUtilityQueue(final List<QueueMessage> messages) {
+ try{
+ utilityQueue.commitMessages( messages );
+ }catch(Exception e){
+ throw new RuntimeException("Unable to ack messages", e);
+ }
+ }
+
+ /**
* calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed
* @param messages
* @return
@@ -401,7 +453,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
applicationScope);
offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
- new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
+ new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ), false);
}
@@ -418,7 +470,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
final IndexOperationMessage indexMessage =
eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
- queueIndexOperationMessage( indexMessage );
+ queueIndexOperationMessage( indexMessage, false);
}
@@ -515,8 +567,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
/**
* Queue up an indexOperationMessage for multi region execution
* @param indexOperationMessage
+ * @param forUtilityQueue
*/
- public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
+ public void queueIndexOperationMessage(final IndexOperationMessage indexOperationMessage, boolean forUtilityQueue) {
// don't try to produce something with nothing
if(indexOperationMessage == null || indexOperationMessage.isEmpty()){
@@ -533,8 +586,6 @@ public class AsyncEventServiceImpl implements AsyncEventService {
//write to the map in ES
esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds );
-
-
//now queue up the index message
final ElasticsearchIndexEvent elasticsearchIndexEvent =
@@ -542,7 +593,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
//send to the topic so all regions index the batch
- offerTopic( elasticsearchIndexEvent );
+ offerTopic( elasticsearchIndexEvent, forUtilityQueue );
}
private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) throws IndexDocNotFoundException {
@@ -607,7 +658,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
// queue the de-index of old versions to the topic so cleanup happens in all regions
offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
- new EntityIdScope( applicationScope, entityId), markedVersion) );
+ new EntityIdScope( applicationScope, entityId), markedVersion), false);
}
@@ -717,9 +768,14 @@ public class AsyncEventServiceImpl implements AsyncEventService {
*/
public void start() {
final int count = indexProcessorFig.getWorkerCount();
+ final int utilityCount = indexProcessorFig.getWorkerCountUtility();
for (int i = 0; i < count; i++) {
- startWorker();
+ startWorker(QUEUE_NAME);
+ }
+
+ for (int i = 0; i < utilityCount; i++) {
+ startWorker(QUEUE_NAME_UTILITY);
}
}
@@ -738,22 +794,31 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
- private void startWorker() {
+ private void startWorker(final String type) {
+ Preconditions.checkNotNull(type, "Worker type required");
synchronized (mutex) {
+ boolean isUtilityQueue = isNotEmpty(type) && type.toLowerCase().contains(QUEUE_NAME_UTILITY.toLowerCase());
+
Observable<List<QueueMessage>> consumer =
Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
@Override
public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
//name our thread so it's easy to see
- Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
+ long threadNum = isUtilityQueue ? counterUtility.incrementAndGet() : counter.incrementAndGet();
+ Thread.currentThread().setName( "QueueConsumer_" + type+ "_" + threadNum );
- List<QueueMessage> drainList = null;
+ List<QueueMessage> drainList = null;
do {
try {
- drainList = take();
+ if ( isUtilityQueue ){
+ drainList = takeFromUtilityQueue();
+ }else{
+ drainList = take();
+
+ }
//emit our list in it's entity to hand off to a worker pool
subscriber.onNext(drainList);
@@ -808,7 +873,12 @@ public class AsyncEventServiceImpl implements AsyncEventService {
// ack each message if making it to this point
if( messagesToAck.size() > 0 ){
- ack( messagesToAck );
+
+ if ( isUtilityQueue ){
+ ackUtilityQueue( messagesToAck );
+ }else{
+ ack( messagesToAck );
+ }
}
return messagesToAck;
@@ -861,7 +931,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
// collect into a list of QueueMessages that can be ack'd later
.collect(Collectors.toList());
- queueIndexOperationMessage(combined);
+ queueIndexOperationMessage(combined, false);
return queueMessages;
}
@@ -871,10 +941,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
EntityIndexOperation entityIndexOperation =
new EntityIndexOperation( applicationScope, id, updatedSince);
- queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null));
+ queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null), false);
}
- public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
+ public void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue) {
final List<EntityIndexEvent> batch = new ArrayList<>();
edges.forEach(e -> {
@@ -884,7 +954,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
});
- offerBatch( batch );
+ offerBatch( batch, forUtilityQueue );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 1038408..45dff1c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -36,6 +36,8 @@ public interface IndexProcessorFig extends GuicyFig {
String ELASTICSEARCH_WORKER_COUNT = "elasticsearch.worker_count";
+ String ELASTICSEARCH_WORKER_COUNT_UTILITY = "elasticsearch.worker_count_utility";
+
String EVENT_CONCURRENCY_FACTOR = "event.concurrency.factor";
String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
@@ -82,6 +84,13 @@ public interface IndexProcessorFig extends GuicyFig {
int getWorkerCount();
/**
+ * The number of worker threads used to read utility requests from the queue ( mostly re-index ).
+ */
+ @Default("2")
+ @Key(ELASTICSEARCH_WORKER_COUNT_UTILITY)
+ int getWorkerCountUtility();
+
+ /**
* Set the implementation to use for queuing.
* Valid values: TEST, LOCAL, SQS, SNS
* NOTE: SQS and SNS equate to the same implementation of Amazon queue services.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index 5e201fb..2b3573e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -39,9 +39,10 @@ public interface ReIndexAction {
void index( final ApplicationScope applicationScope, final Id id, final long updatedSince );
/**
- * Index a batch list of entities.
+ * Index a batch list of entities. Goes to the utility queue.
* @param edges
* @param updatedSince
+ * @param forUtilityQueue
*/
- void indexBatch ( final List<EdgeScope> edges, final long updatedSince);
+ void indexBatch(final List<EdgeScope> edges, final long updatedSince, boolean forUtilityQueue);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index 19fbcfa..b292005 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -169,7 +169,7 @@ public class ReIndexServiceImpl implements ReIndexService {
.buffer( indexProcessorFig.getReindexBufferSize())
.doOnNext( edgeScopes -> {
logger.info("Sending batch of {} to be indexed.", edgeScopes.size());
- indexService.indexBatch(edgeScopes, modifiedSince);
+ indexService.indexBatch(edgeScopes, modifiedSince, true);
count.addAndGet(edgeScopes.size() );
if( edgeScopes.size() > 0 ) {
writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
@@ -178,7 +178,7 @@ public class ReIndexServiceImpl implements ReIndexService {
.doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
.subscribeOn( Schedulers.io() ).subscribe();
-
+
return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 62f6548..83f4c8b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -258,7 +258,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
.collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
.filter(msg -> !msg.isEmpty())
.doOnNext(indexOperation -> {
- asyncEventService.queueIndexOperationMessage(indexOperation);
+ asyncEventService.queueIndexOperationMessage(indexOperation, false);
});
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c92f6e/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
index dcda98f..1afb524 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
@@ -251,7 +251,7 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
.collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
.filter(msg -> !msg.isEmpty())
.doOnNext(indexOperation -> {
- asyncEventService.queueIndexOperationMessage(indexOperation);
+ asyncEventService.queueIndexOperationMessage(indexOperation, false);
});
}