You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/02/25 20:56:50 UTC

[04/15] usergrid git commit: Reduce SQS hop for entity write/update indexing events.

Reduce SQS hop for entity write/update indexing events.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b4634dc4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b4634dc4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b4634dc4

Branch: refs/heads/master
Commit: b4634dc42f767a982892362bbd4aa66059bf1998
Parents: d4c7a3c
Author: Michael Russo <mi...@gmail.com>
Authored: Fri Feb 19 13:35:48 2016 -0800
Committer: Michael Russo <mi...@gmail.com>
Committed: Fri Feb 19 13:35:48 2016 -0800

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |   4 +-
 .../asyncevents/AmazonAsyncEventService.java    | 844 -------------------
 .../asyncevents/AsyncEventService.java          |   5 +-
 .../asyncevents/AsyncEventServiceImpl.java      | 798 ++++++++++++++++++
 .../asyncevents/AsyncIndexProvider.java         |   4 +-
 .../asyncevents/EventBuilder.java               |  12 +-
 .../asyncevents/EventBuilderImpl.java           |  15 +-
 .../asyncevents/IndexDocNotFoundException.java  |  37 +
 .../corepersistence/index/IndexServiceImpl.java |   4 +-
 .../read/search/CandidateEntityFilter.java      |  10 +-
 .../index/AmazonAsyncEventServiceTest.java      | 103 ---
 .../index/AsyncEventServiceImplTest.java        | 103 +++
 .../index/AsyncIndexServiceTest.java            |   3 +-
 .../index/impl/EsIndexProducerImpl.java         |   5 +-
 .../usergrid/persistence/queue/QueueFig.java    |   4 +
 .../usergrid/services/ServiceManager.java       |   5 +-
 16 files changed, 972 insertions(+), 984 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 6c2ef0b..b677f79 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -590,7 +590,7 @@ public class CpEntityManager implements EntityManager {
 
         // update in all containing collections and connection indexes
 
-        indexService.queueEntityIndexUpdate( applicationScope, cpEntity );
+        indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0);
     }
 
 
@@ -1107,7 +1107,7 @@ public class CpEntityManager implements EntityManager {
 
         //Adding graphite metrics
 
-        indexService.queueEntityIndexUpdate(applicationScope, cpEntity);
+        indexService.queueEntityIndexUpdate(applicationScope, cpEntity, 0);
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
deleted file mode 100644
index 00dc69a..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ /dev/null
@@ -1,844 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.asyncevents;
-
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.apache.usergrid.persistence.index.impl.*;
-import org.elasticsearch.action.index.IndexRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
-import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
-import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
-import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
-import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
-import org.apache.usergrid.persistence.map.MapManager;
-import org.apache.usergrid.persistence.map.MapManagerFactory;
-import org.apache.usergrid.persistence.map.MapScope;
-import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import rx.Observable;
-import rx.Subscriber;
-import rx.Subscription;
-import rx.schedulers.Schedulers;
-
-
-/**
- * TODO, this whole class is becoming a nightmare.  We need to remove all consume from this class and refactor it into the following manner.
- *
- * 1. Produce.  Keep the code in the handle as is
- * 2. Consume:  Move the code into a refactored system
- * 2.1 A central dispatcher
- * 2.2 An interface that produces an observable of type BatchOperation.  Any handler will be refactored into it's own
- *      impl that will then emit a stream of batch operations to perform
- * 2.3 The central dispatcher will then subscribe to these events and merge them.  Handing them off to a batch handler
- * 2.4 The batch handler will roll up the operations into a batch size, and then queue them
- * 2.5 The receive batch handler will execute the batch operations
- *
- * TODO determine how we error handle?
- *
- */
-@Singleton
-public class AmazonAsyncEventService implements AsyncEventService {
-
-
-    private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);
-
-    // SQS maximum receive messages is 10
-    public int MAX_TAKE = 10;
-    public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
-
-    private final QueueManager queue;
-    private final IndexProcessorFig indexProcessorFig;
-    private final QueueFig queueFig;
-    private final IndexProducer indexProducer;
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-    private final IndexLocationStrategyFactory indexLocationStrategyFactory;
-    private final EntityIndexFactory entityIndexFactory;
-    private final EventBuilder eventBuilder;
-    private final RxTaskScheduler rxTaskScheduler;
-
-    private final Timer readTimer;
-    private final Timer writeTimer;
-    private final Timer ackTimer;
-
-    /**
-     * This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions
-     */
-    private final Object mutex = new Object();
-
-    private final Counter indexErrorCounter;
-    private final AtomicLong counter = new AtomicLong();
-    private final AtomicLong inFlight = new AtomicLong();
-    private final Histogram messageCycle;
-    private final MapManager esMapPersistence;
-
-    //the actively running subscription
-    private List<Subscription> subscriptions = new ArrayList<>();
-
-
-    @Inject
-    public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory,
-                                    final IndexProcessorFig indexProcessorFig,
-                                    final IndexProducer indexProducer,
-                                    final MetricsFactory metricsFactory,
-                                    final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                    final IndexLocationStrategyFactory indexLocationStrategyFactory,
-                                    final EntityIndexFactory entityIndexFactory,
-                                    final EventBuilder eventBuilder,
-                                    final MapManagerFactory mapManagerFactory,
-                                    final QueueFig queueFig,
-                                    @EventExecutionScheduler
-                                    final RxTaskScheduler rxTaskScheduler ) {
-        this.indexProducer = indexProducer;
-
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.indexLocationStrategyFactory = indexLocationStrategyFactory;
-        this.entityIndexFactory = entityIndexFactory;
-        this.eventBuilder = eventBuilder;
-
-        final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(),  "indexEvents");
-
-        this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );
-
-        this.rxTaskScheduler = rxTaskScheduler;
-
-        QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
-        this.queue = queueManagerFactory.getQueueManager(queueScope);
-
-        this.indexProcessorFig = indexProcessorFig;
-        this.queueFig = queueFig;
-
-        this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
-        this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read");
-        this.ackTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.ack");
-        this.indexErrorCounter = metricsFactory.getCounter(AmazonAsyncEventService.class, "async_event.error");
-        this.messageCycle = metricsFactory.getHistogram(AmazonAsyncEventService.class, "async_event.message_cycle");
-
-
-        //wire up the gauge of inflight message
-        metricsFactory.addGauge(AmazonAsyncEventService.class, "async-event.inflight", new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                return inFlight.longValue();
-            }
-        });
-
-        start();
-    }
-
-
-    /**
-     * Offer the EntityIdScope to SQS
-     */
-    private void offer(final Serializable operation) {
-        final Timer.Context timer = this.writeTimer.time();
-
-        try {
-            //signal to SQS
-            this.queue.sendMessage( operation );
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to queue message", e);
-        } finally {
-            timer.stop();
-        }
-    }
-
-
-    private void offerTopic( final Serializable operation ) {
-        final Timer.Context timer = this.writeTimer.time();
-
-        try {
-            //signal to SQS
-            this.queue.sendMessageToTopic( operation );
-        }
-        catch ( IOException e ) {
-            throw new RuntimeException( "Unable to queue message", e );
-        }
-        finally {
-            timer.stop();
-        }
-    }
-
-    private void offerBatch(final List operations){
-        final Timer.Context timer = this.writeTimer.time();
-
-        try {
-            //signal to SQS
-            this.queue.sendMessages(operations);
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to queue message", e);
-        } finally {
-            timer.stop();
-        }
-    }
-
-
-    /**
-     * Take message from SQS
-     */
-    private List<QueueMessage> take() {
-
-        final Timer.Context timer = this.readTimer.time();
-
-        try {
-            return queue.getMessages(MAX_TAKE,
-                    indexProcessorFig.getIndexQueueVisibilityTimeout(),
-                    indexProcessorFig.getIndexQueueTimeout(),
-                    AsyncEvent.class);
-        }
-        //stop our timer
-        finally {
-            timer.stop();
-        }
-    }
-
-
-
-    /**
-     * Ack message in SQS
-     */
-    public void ack(final List<QueueMessage> messages) {
-
-        final Timer.Context timer = this.ackTimer.time();
-
-        try{
-            queue.commitMessages( messages );
-
-            //decrement our in-flight counter
-            inFlight.decrementAndGet();
-
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
-        }finally {
-            timer.stop();
-        }
-
-
-    }
-
-    /**
-     * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed
-     * @param messages
-     * @return
-     */
-    private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) {
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("callEventHandlers with {} message", messages.size());
-        }
-
-        Stream<IndexEventResult> indexEventResults = messages.stream().map(message -> {
-            AsyncEvent event = null;
-            try {
-                event = (AsyncEvent) message.getBody();
-            } catch (ClassCastException cce) {
-                logger.error("Failed to deserialize message body", cce);
-            }
-
-            if (event == null) {
-                logger.error("AsyncEvent type or event is null!");
-                return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(),
-                    System.currentTimeMillis());
-            }
-
-            final AsyncEvent thisEvent = event;
-
-            if (logger.isDebugEnabled()) {
-                logger.debug("Processing {} event", event);
-            }
-
-            try {
-                //check for empty sets if this is true
-                boolean validateEmptySets = true;
-                Observable<IndexOperationMessage> indexoperationObservable;
-                //merge each operation to a master observable;
-                if ( event instanceof EdgeDeleteEvent ) {
-                    indexoperationObservable = handleEdgeDelete( message );
-                }
-                else if ( event instanceof EdgeIndexEvent ) {
-                    indexoperationObservable = handleEdgeIndex( message );
-                }
-                else if ( event instanceof EntityDeleteEvent ) {
-                    indexoperationObservable = handleEntityDelete( message );
-                    validateEmptySets = false; // do not check this one for an empty set b/c it can be empty
-
-                }
-                else if ( event instanceof EntityIndexEvent ) {
-                    indexoperationObservable = handleEntityIndexUpdate( message );
-                }
-                else if ( event instanceof InitializeApplicationIndexEvent ) {
-                    //does not return observable
-                    handleInitializeApplicationIndex(event, message);
-                    indexoperationObservable = Observable.just(new IndexOperationMessage());
-                    validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
-                } else if (event instanceof ElasticsearchIndexEvent) {
-                    handleIndexOperation((ElasticsearchIndexEvent) event);
-                    indexoperationObservable = Observable.just(new IndexOperationMessage());
-                    validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
-                } else {
-                    throw new Exception("Unknown EventType");//TODO: print json instead
-                }
-
-                //collect all of the
-                IndexOperationMessage indexOperationMessage = indexoperationObservable
-                    .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
-                    .toBlocking().lastOrDefault(null);
-
-                if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) {
-                    logger.error("Received empty index sequence message:({}), body:({}) ", message.getMessageId(),
-                        message.getStringBody());
-                    throw new Exception("Received empty index sequence.");
-                }
-
-                //return type that can be indexed and ack'd later
-                return new IndexEventResult(Optional.fromNullable(message),
-                    Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
-            } catch (Exception e) {
-                logger.error("Failed to index message: {} {}", message.getMessageId(), message.getStringBody(), e);
-                return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(),
-                    event.getCreationTime());
-            }
-        });
-
-
-        return indexEventResults.collect(Collectors.toList());
-    }
-
-    @Override
-    public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
-        IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
-            applicationScope);
-        offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
-            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
-    }
-
-
-    @Override
-    public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
-                                       final Entity entity) {
-
-        offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),new EntityIdScope(applicationScope, entity.getId()), 0));
-    }
-
-
-    public Observable<IndexOperationMessage> handleEntityIndexUpdate(final QueueMessage message) {
-
-        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );
-
-        final AsyncEvent event = ( AsyncEvent ) message.getBody();
-
-        Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
-        Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass()));
-
-        final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event;
-
-
-        //process the entity immediately
-        //only process the same version, otherwise ignore
-        final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope();
-        final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
-        final Id entityId = entityIdScope.getId();
-        final long updatedAfter = entityIndexEvent.getUpdatedAfter();
-
-        final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
-
-        final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation );
-        return observable;
-    }
-
-
-    @Override
-    public void queueNewEdge(final ApplicationScope applicationScope,
-                             final Entity entity,
-                             final Edge newEdge) {
-
-        EdgeIndexEvent operation = new EdgeIndexEvent(queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge);
-
-        offer( operation );
-    }
-
-    public Observable<IndexOperationMessage> handleEdgeIndex(final QueueMessage message) {
-
-        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex" );
-
-        final AsyncEvent event = (AsyncEvent) message.getBody();
-
-        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" );
-        Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass()));
-
-        final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;
-
-        final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope();
-        final Edge edge = edgeIndexEvent.getEdge();
-
-
-
-        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-        final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load( edgeIndexEvent.getEntityId() ).flatMap(
-            entity -> eventBuilder.buildNewEdge(applicationScope, entity, edge));
-        return edgeIndexObservable;
-    }
-
-    @Override
-    public void queueDeleteEdge(final ApplicationScope applicationScope,
-                                final Edge edge) {
-
-        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
-    }
-
-    public Observable<IndexOperationMessage> handleEdgeDelete(final QueueMessage message) {
-
-        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" );
-
-        final AsyncEvent event = (AsyncEvent) message.getBody();
-
-        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" );
-        Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass()));
-
-
-        final EdgeDeleteEvent edgeDeleteEvent = ( EdgeDeleteEvent ) event;
-
-        final ApplicationScope applicationScope = edgeDeleteEvent.getApplicationScope();
-        final Edge edge = edgeDeleteEvent.getEdge();
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
-        }
-
-        return eventBuilder.buildDeleteEdge(applicationScope, edge);
-    }
-
-
-    @Override
-    public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
-
-        offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
-    }
-
-
-    /**
-     * Queue up an indexOperationMessage for multi region execution
-     * @param indexOperationMessage
-     */
-    public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
-
-        // don't try to produce something with nothing
-        if(indexOperationMessage.isEmpty()){
-            return;
-        }
-
-        final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
-
-        final UUID newMessageId = UUIDGenerator.newTimeUUID();
-
-        final int expirationTimeInSeconds =
-            ( int ) TimeUnit.MILLISECONDS.toSeconds( indexProcessorFig.getIndexMessageTtl() );
-
-        //write to the map in ES
-        esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds );
-
-
-
-        //now queue up the index message
-
-        final ElasticsearchIndexEvent elasticsearchIndexEvent =
-            new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId );
-
-        //send to the topic so all regions index the batch
-
-        offerTopic( elasticsearchIndexEvent );
-    }
-
-    public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
-         Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
-
-        final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
-
-        Preconditions.checkNotNull( messageId, "messageId must not be null" );
-
-
-        //load the entity
-
-        final String message = esMapPersistence.getString( messageId.toString() );
-
-        final IndexOperationMessage indexOperationMessage;
-
-        if(message == null){
-            logger.warn( "Received message with id {} to process, unable to find it, reading with higher consistency level",
-                messageId);
-
-            final String highConsistency =  esMapPersistence.getStringHighConsistency( messageId.toString() );
-
-            if(highConsistency == null){
-                logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level",
-                    messageId);
-
-                throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
-            }
-
-            indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class );
-
-        } else{
-            indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
-        }
-
-        initializeEntityIndexes(indexOperationMessage);
-
-        //NOTE that we intentionally do NOT delete from the map.  We can't know when all regions have consumed the message
-        //so we'll let compaction on column expiration handle deletion
-
-        //read the value from the string
-
-        Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" );
-        Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" );
-
-
-        //now execute it
-        indexProducer.put(indexOperationMessage).toBlocking().last();
-
-    }
-
-    /**
-     *     this method will call initialize for each message, since we are caching the entity indexes,
-     *     we don't worry about aggregating by app id
-     * @param indexOperationMessage
-     */
-    private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) {
-
-        // create a set so we can have a unique list of appIds for which we call createEntityIndex
-        Set<UUID> appIds = new HashSet<>();
-
-        // loop through all indexRequests and add the appIds to the set
-        indexOperationMessage.getIndexRequests().forEach(req -> {
-            UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
-            appIds.add(appId);
-        });
-
-        // loop through all deindexRequests and add the appIds to the set
-        indexOperationMessage.getDeIndexRequests().forEach(req -> {
-            UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
-            appIds.add(appId);
-        });
-
-        // for each of the appIds in the unique set, call create entity index to ensure the aliases are created
-        appIds.forEach(appId -> {
-                ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
-                entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
-            }
-        );
-    }
-
-
-    @Override
-    public long getQueueDepth() {
-        return queue.getQueueDepth();
-    }
-
-    public Observable<IndexOperationMessage> handleEntityDelete(final QueueMessage message) {
-
-        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
-
-        final AsyncEvent event = (AsyncEvent) message.getBody();
-        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" );
-        Preconditions.checkArgument( event instanceof EntityDeleteEvent,
-            String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) );
-
-
-        final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
-        final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
-        final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
-
-        if (logger.isDebugEnabled())
-            logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
-
-        final EventBuilderImpl.EntityDeleteResults
-            entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
-
-
-        // Delete the entities and remove from graph separately
-        entityDeleteResults.getEntitiesDeleted().toBlocking().lastOrDefault(null);
-
-        entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);
-
-        return entityDeleteResults.getIndexObservable();
-    }
-
-
-    public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
-        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
-        Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));
-
-        final InitializeApplicationIndexEvent initializeApplicationIndexEvent =
-            ( InitializeApplicationIndexEvent ) event;
-
-        final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy();
-        final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
-        index.initialize();
-    }
-
-    /**
-     * Loop through and start the workers
-     */
-    public void start() {
-        final int count = indexProcessorFig.getWorkerCount();
-
-        for (int i = 0; i < count; i++) {
-            startWorker();
-        }
-    }
-
-
-    /**
-     * Stop the workers
-     */
-    public void stop() {
-        synchronized (mutex) {
-            //stop consuming
-
-            for (final Subscription subscription : subscriptions) {
-                subscription.unsubscribe();
-            }
-        }
-    }
-
-
-    private void startWorker() {
-        synchronized (mutex) {
-
-            Observable<List<QueueMessage>> consumer =
-                    Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
-                        @Override
-                        public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
-
-                            //name our thread so it's easy to see
-                            Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
-
-                            List<QueueMessage> drainList = null;
-
-                            do {
-                                try {
-                                    drainList = take();
-                                    //emit our list in it's entity to hand off to a worker pool
-                                        subscriber.onNext(drainList);
-
-                                    //take since  we're in flight
-                                    inFlight.addAndGet( drainList.size() );
-                                }
-                                catch ( Throwable t ) {
-                                    final long sleepTime = indexProcessorFig.getFailureRetryTime();
-
-                                    logger.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, t );
-
-                                    if ( drainList != null ) {
-                                        inFlight.addAndGet( -1 * drainList.size() );
-                                    }
-
-
-                                    try {
-                                        Thread.sleep( sleepTime );
-                                    }
-                                    catch ( InterruptedException ie ) {
-                                        //swallow
-                                    }
-
-                                    indexErrorCounter.inc();
-                                }
-                            }
-                            while ( true );
-                        }
-                    } )        //this won't block our read loop, just reads and proceeds
-                        .flatMap( sqsMessages -> {
-
-                            //do this on a different schedule, and introduce concurrency with flatmap for faster processing
-                            return Observable.just( sqsMessages )
-
-                                             .map( messages -> {
-                                                 if ( messages == null || messages.size() == 0 ) {
-                                                     return null;
-                                                 }
-
-                                                 try {
-                                                     List<IndexEventResult> indexEventResults =
-                                                         callEventHandlers( messages );
-                                                     List<QueueMessage> messagesToAck =
-                                                         submitToIndex( indexEventResults );
-                                                     if ( messagesToAck == null || messagesToAck.size() == 0 ) {
-                                                         logger.error(
-                                                             "No messages came back from the queue operation, should have seen {} messages",
-                                                                 messages.size() );
-                                                         return messagesToAck;
-                                                     }
-                                                     if ( messagesToAck.size() < messages.size() ) {
-                                                         logger.error( "Missing messages from queue post operation",
-                                                             messages, messagesToAck );
-                                                     }
-                                                     //ack each message, but only if we didn't error.
-                                                     ack( messagesToAck );
-                                                     return messagesToAck;
-                                                 }
-                                                 catch ( Exception e ) {
-                                                     logger.error( "failed to ack messages to sqs", e );
-                                                     return null;
-                                                     //do not rethrow so we can process all of them
-                                                 }
-                                             } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
-                            //end flatMap
-                        }, indexProcessorFig.getEventConcurrencyFactor() );
-
-            //start in the background
-
-            final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe();
-
-            subscriptions.add(subscription);
-        }
-    }
-
-    /**
-     * Submit results to index and return the queue messages to be ack'd
-     * @param indexEventResults
-     * @return
-     */
-    private List<QueueMessage> submitToIndex( List<IndexEventResult> indexEventResults) {
-        //if nothing came back then return null
-        if(indexEventResults==null){
-            return null;
-        }
-
-        final IndexOperationMessage combined = new IndexOperationMessage();
-
-        //stream and filer the messages
-        List<QueueMessage> messagesToAck = indexEventResults.stream()
-            .map(indexEventResult -> {
-                //collect into the index submission
-                if (indexEventResult.getIndexOperationMessage().isPresent()) {
-                    combined.ingest(indexEventResult.getIndexOperationMessage().get());
-                }
-                return indexEventResult;
-            })
-                //filter out the ones that need to be ack'd
-            .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
-            .map(indexEventResult -> {
-                //record the cycle time
-                messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
-                return indexEventResult;
-            })
-                //ack after successful completion of the operation.
-            .map(result -> result.getQueueMessage().get())
-            .collect(Collectors.toList());
-
-            queueIndexOperationMessage( combined );
-
-        return messagesToAck;
-    }
-
-    public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
-        //change to id scope to avoid serialization issues
-        offer( new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, id ), updatedSince ) );
-    }
-
-    public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
-
-        List batch = new ArrayList<EdgeScope>();
-        for ( EdgeScope e : edges){
-            //change to id scope to avoid serialization issues
-            batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
-        }
-        offerBatch( batch );
-    }
-
-
-    public class IndexEventResult{
-        private final Optional<QueueMessage> queueMessage;
-        private final Optional<IndexOperationMessage> indexOperationMessage;
-        private final long creationTime;
-
-
-        public IndexEventResult(Optional<QueueMessage> queueMessage, Optional<IndexOperationMessage> indexOperationMessage, long creationTime){
-
-            this.queueMessage = queueMessage;
-            this.indexOperationMessage = indexOperationMessage;
-
-            this.creationTime = creationTime;
-        }
-
-
-        public Optional<QueueMessage> getQueueMessage() {
-            return queueMessage;
-        }
-
-        public Optional<IndexOperationMessage> getIndexOperationMessage() {
-            return indexOperationMessage;
-        }
-
-        public long getCreationTime() {
-            return creationTime;
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index dbf8996..288fb12 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -22,9 +22,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
 
 import org.apache.usergrid.corepersistence.index.ReIndexAction;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -48,8 +46,9 @@ public interface AsyncEventService extends ReIndexAction {
      * After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly.
      * @param applicationScope
      * @param entity The entity to index.  Should be fired when an entity is updated
+     * @param updatedAfter
      */
-    void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity);
+    void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter);
 
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
new file mode 100644
index 0000000..e101761
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -0,0 +1,798 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.usergrid.persistence.index.impl.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.persistence.queue.QueueFig;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+import rx.Subscriber;
+import rx.Subscription;
+import rx.schedulers.Schedulers;
+
+
+/**
+ * TODO, this whole class is becoming a nightmare.  We need to remove all consume from this class and refactor it into the following manner.
+ *
+ * 1. Produce.  Keep the code in the handle as is
+ * 2. Consume:  Move the code into a refactored system
+ * 2.1 A central dispatcher
+ * 2.2 An interface that produces an observable of type BatchOperation.  Any handler will be refactored into it's own
+ *      impl that will then emit a stream of batch operations to perform
+ * 2.3 The central dispatcher will then subscribe to these events and merge them.  Handing them off to a batch handler
+ * 2.4 The batch handler will roll up the operations into a batch size, and then queue them
+ * 2.5 The receive batch handler will execute the batch operations
+ *
+ * TODO determine how we error handle?
+ *
+ */
+@Singleton
+public class AsyncEventServiceImpl implements AsyncEventService {
+
+
+    private static final Logger logger = LoggerFactory.getLogger(AsyncEventServiceImpl.class);
+
+    // SQS maximum receive messages is 10
+    public int MAX_TAKE = 10;
+    public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
+
+    private final QueueManager queue;
+    private final IndexProcessorFig indexProcessorFig;
+    private final QueueFig queueFig;
+    private final IndexProducer indexProducer;
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+    private final EntityIndexFactory entityIndexFactory;
+    private final EventBuilder eventBuilder;
+    private final RxTaskScheduler rxTaskScheduler;
+
+    private final Timer readTimer;
+    private final Timer writeTimer;
+    private final Timer ackTimer;
+
+    /**
+     * This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions
+     */
+    private final Object mutex = new Object();
+
+    private final Counter indexErrorCounter;
+    private final AtomicLong counter = new AtomicLong();
+    private final AtomicLong inFlight = new AtomicLong();
+    private final Histogram messageCycle;
+    private final MapManager esMapPersistence;
+
+    //the actively running subscription
+    private List<Subscription> subscriptions = new ArrayList<>();
+
+
+    @Inject
+    public AsyncEventServiceImpl(final QueueManagerFactory queueManagerFactory,
+                                 final IndexProcessorFig indexProcessorFig,
+                                 final IndexProducer indexProducer,
+                                 final MetricsFactory metricsFactory,
+                                 final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                 final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                                 final EntityIndexFactory entityIndexFactory,
+                                 final EventBuilder eventBuilder,
+                                 final MapManagerFactory mapManagerFactory,
+                                 final QueueFig queueFig,
+                                 @EventExecutionScheduler
+                                    final RxTaskScheduler rxTaskScheduler ) {
+        this.indexProducer = indexProducer;
+
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+        this.entityIndexFactory = entityIndexFactory;
+        this.eventBuilder = eventBuilder;
+
+        final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(),  "indexEvents");
+
+        this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );
+
+        this.rxTaskScheduler = rxTaskScheduler;
+
+        QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
+        this.queue = queueManagerFactory.getQueueManager(queueScope);
+
+        this.indexProcessorFig = indexProcessorFig;
+        this.queueFig = queueFig;
+
+        this.writeTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.write");
+        this.readTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.read");
+        this.ackTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.ack");
+        this.indexErrorCounter = metricsFactory.getCounter(AsyncEventServiceImpl.class, "async_event.error");
+        this.messageCycle = metricsFactory.getHistogram(AsyncEventServiceImpl.class, "async_event.message_cycle");
+
+
+        //wire up the gauge of inflight message
+        metricsFactory.addGauge(AsyncEventServiceImpl.class, "async-event.inflight", new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return inFlight.longValue();
+            }
+        });
+
+        start();
+    }
+
+
+    /**
+     * Offer the EntityIdScope to SQS
+     */
+    private void offer(final Serializable operation) {
+        final Timer.Context timer = this.writeTimer.time();
+
+        try {
+            //signal to SQS
+            this.queue.sendMessage( operation );
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to queue message", e);
+        } finally {
+            timer.stop();
+        }
+    }
+
+
+    private void offerTopic( final Serializable operation ) {
+        final Timer.Context timer = this.writeTimer.time();
+
+        try {
+            //signal to SQS
+            this.queue.sendMessageToTopic( operation );
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to queue message", e );
+        }
+        finally {
+            timer.stop();
+        }
+    }
+
+    private void offerBatch(final List operations){
+        final Timer.Context timer = this.writeTimer.time();
+
+        try {
+            //signal to SQS
+            this.queue.sendMessages(operations);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to queue message", e);
+        } finally {
+            timer.stop();
+        }
+    }
+
+
+    /**
+     * Take message from SQS
+     */
+    private List<QueueMessage> take() {
+
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
+            return queue.getMessages(MAX_TAKE,
+                    indexProcessorFig.getIndexQueueVisibilityTimeout(),
+                    indexProcessorFig.getIndexQueueTimeout(),
+                    AsyncEvent.class);
+        }
+        //stop our timer
+        finally {
+            timer.stop();
+        }
+    }
+
+
+
+    /**
+     * Ack message in SQS
+     */
+    public void ack(final List<QueueMessage> messages) {
+
+        final Timer.Context timer = this.ackTimer.time();
+
+        try{
+            queue.commitMessages( messages );
+
+            //decrement our in-flight counter
+            inFlight.decrementAndGet();
+
+        }catch(Exception e){
+            throw new RuntimeException("Unable to ack messages", e);
+        }finally {
+            timer.stop();
+        }
+
+
+    }
+
+    /**
+     * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed
+     * @param messages
+     * @return
+     */
+    private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) {
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("callEventHandlers with {} message", messages.size());
+        }
+
+        Stream<IndexEventResult> indexEventResults = messages.parallelStream().map(message ->
+
+        {
+            AsyncEvent event = null;
+            try {
+                event = (AsyncEvent) message.getBody();
+
+            } catch (ClassCastException cce) {
+                logger.error("Failed to deserialize message body", cce);
+                return new IndexEventResult(Optional.absent(), System.currentTimeMillis());
+            }
+
+            if (event == null) {
+                logger.error("AsyncEvent type or event is null!");
+                return new IndexEventResult(Optional.absent(), System.currentTimeMillis());
+            }
+
+            final AsyncEvent thisEvent = event;
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Processing {} event", event);
+            }
+
+            try {
+
+                // deletes are 2-part, actual IO to delete data, then queue up a de-index
+                if ( event instanceof EdgeDeleteEvent ) {
+
+                    handleEdgeDelete( message );
+                }
+                // deletes are 2-part, actual IO to delete data, then queue up a de-index
+                else if ( event instanceof EntityDeleteEvent ) {
+
+                    handleEntityDelete( message );
+                }
+                // application initialization has special logic, therefore a special event type
+                else if ( event instanceof InitializeApplicationIndexEvent ) {
+
+                    handleInitializeApplicationIndex(event, message);
+                }
+                // this is the main event that pulls the index doc from map persistence and hands to the index producer
+                else if (event instanceof ElasticsearchIndexEvent) {
+
+                    handleIndexOperation((ElasticsearchIndexEvent) event);
+
+                } else {
+
+                    throw new Exception("Unknown EventType for message: "+ message.getStringBody());
+                }
+
+
+                //return type that can be indexed and ack'd later
+                return new IndexEventResult(Optional.of(message), thisEvent.getCreationTime());
+
+            } catch (IndexDocNotFoundException e){
+
+                // this exception is throw when we wait before trying quorum read on map persistence.
+                // return empty event result so the event's message doesn't get ack'd
+                logger.info(e.getMessage());
+                return new IndexEventResult(Optional.absent(), event.getCreationTime());
+
+            } catch (Exception e) {
+
+                // if the event fails to process, log the message and return empty event result so it doesn't get ack'd
+                logger.error("Failed to process message: {} {}", message.getMessageId(), message.getStringBody(), e);
+                return new IndexEventResult(Optional.absent(), event.getCreationTime());
+            }
+        });
+
+
+        return indexEventResults.collect(Collectors.toList());
+    }
+
+    @Override
+    public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
+        IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
+            applicationScope);
+        offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
+            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
+    }
+
+
+    @Override
+    public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
+                                       final Entity entity, long updatedAfter) {
+
+
+        final EntityIndexOperation entityIndexOperation =
+            new EntityIndexOperation( applicationScope, entity.getId(), updatedAfter);
+
+        final IndexOperationMessage indexMessage =
+            eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
+
+        queueIndexOperationMessage( indexMessage );
+
+    }
+
+
+    @Override
+    public void queueNewEdge(final ApplicationScope applicationScope,
+                             final Entity entity,
+                             final Edge newEdge) {
+
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+        final IndexOperationMessage indexMessage = ecm.load( entity.getId() )
+            .flatMap( loadedEntity -> eventBuilder.buildNewEdge(applicationScope, entity, newEdge) )
+            .toBlocking().lastOrDefault(null);
+
+        queueIndexOperationMessage( indexMessage );
+
+    }
+
+
+    @Override
+    public void queueDeleteEdge(final ApplicationScope applicationScope,
+                                final Edge edge) {
+
+        offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
+    }
+
+    public void handleEdgeDelete(final QueueMessage message) {
+
+        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" );
+
+        final AsyncEvent event = (AsyncEvent) message.getBody();
+
+        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" );
+        Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass()));
+
+
+        final EdgeDeleteEvent edgeDeleteEvent = ( EdgeDeleteEvent ) event;
+
+        final ApplicationScope applicationScope = edgeDeleteEvent.getApplicationScope();
+        final Edge edge = edgeDeleteEvent.getEdge();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
+        }
+
+        IndexOperationMessage indexMessage =
+            eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null);
+
+        queueIndexOperationMessage(indexMessage);
+
+    }
+
+
+
+    /**
+     * Queue up an indexOperationMessage for multi region execution
+     * @param indexOperationMessage
+     */
+    public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
+
+        // don't try to produce something with nothing
+        if(indexOperationMessage == null || indexOperationMessage.isEmpty()){
+            return;
+        }
+
+        final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
+
+        final UUID newMessageId = UUIDGenerator.newTimeUUID();
+
+        final int expirationTimeInSeconds =
+            ( int ) TimeUnit.MILLISECONDS.toSeconds( indexProcessorFig.getIndexMessageTtl() );
+
+        //write to the map in ES
+        esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds );
+
+
+
+        //now queue up the index message
+
+        final ElasticsearchIndexEvent elasticsearchIndexEvent =
+            new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId );
+
+        //send to the topic so all regions index the batch
+
+        offerTopic( elasticsearchIndexEvent );
+    }
+
+    public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
+         Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
+
+        final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
+
+        Preconditions.checkNotNull( messageId, "messageId must not be null" );
+
+
+        //load the entity
+
+        final String message = esMapPersistence.getString( messageId.toString() );
+
+        final IndexOperationMessage indexOperationMessage;
+
+        if(message == null) {
+
+           if ( System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) {
+
+               logger.warn("Received message with id {} to process, unable to find it, reading with higher consistency level",
+                   messageId);
+
+               final String highConsistency = esMapPersistence.getStringHighConsistency(messageId.toString());
+
+               if (highConsistency == null) {
+                   logger.error("Unable to find the ES batch with id {} to process at a higher consistency level",
+                       messageId);
+
+                   throw new RuntimeException("Unable to find the ES batch to process with message id " + messageId);
+               }
+
+               indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class);
+
+           } else{
+
+               throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
+
+           }
+
+        } else{
+            indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
+        }
+
+        initializeEntityIndexes(indexOperationMessage);
+
+        //NOTE that we intentionally do NOT delete from the map.  We can't know when all regions have consumed the message
+        //so we'll let compaction on column expiration handle deletion
+
+        //read the value from the string
+
+        Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" );
+        Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" );
+
+
+        //now execute it
+        indexProducer.put(indexOperationMessage).toBlocking().last();
+
+    }
+
+    /**
+     *     this method will call initialize for each message, since we are caching the entity indexes,
+     *     we don't worry about aggregating by app id
+     * @param indexOperationMessage
+     */
+    private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) {
+
+        // create a set so we can have a unique list of appIds for which we call createEntityIndex
+        Set<UUID> appIds = new HashSet<>();
+
+        // loop through all indexRequests and add the appIds to the set
+        indexOperationMessage.getIndexRequests().forEach(req -> {
+            UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+            appIds.add(appId);
+        });
+
+        // loop through all deindexRequests and add the appIds to the set
+        indexOperationMessage.getDeIndexRequests().forEach(req -> {
+            UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+            appIds.add(appId);
+        });
+
+        // for each of the appIds in the unique set, call create entity index to ensure the aliases are created
+        appIds.forEach(appId -> {
+                ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
+                entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
+            }
+        );
+    }
+
+
+    @Override
+    public long getQueueDepth() {
+        return queue.getQueueDepth();
+    }
+
+    @Override
+    public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
+
+        offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
+    }
+
+    public void handleEntityDelete(final QueueMessage message) {
+
+        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
+
+        final AsyncEvent event = (AsyncEvent) message.getBody();
+        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" );
+        Preconditions.checkArgument( event instanceof EntityDeleteEvent,
+            String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) );
+
+
+        final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
+        final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
+        final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
+
+        if (logger.isDebugEnabled())
+            logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
+
+        final EventBuilderImpl.EntityDeleteResults
+            entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
+
+
+        // Delete the entities and remove from graph separately
+        entityDeleteResults.getEntitiesDeleted().toBlocking().lastOrDefault(null);
+
+        entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);
+
+        IndexOperationMessage indexMessage = entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null);
+
+        queueIndexOperationMessage(indexMessage);
+
+    }
+
+
+    public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
+        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
+        Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));
+
+        final InitializeApplicationIndexEvent initializeApplicationIndexEvent =
+            ( InitializeApplicationIndexEvent ) event;
+
+        final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy();
+        final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
+        index.initialize();
+    }
+
+    /**
+     * Loop through and start the workers
+     */
+    public void start() {
+        final int count = indexProcessorFig.getWorkerCount();
+
+        for (int i = 0; i < count; i++) {
+            startWorker();
+        }
+    }
+
+
+    /**
+     * Stop the workers
+     */
+    public void stop() {
+        synchronized (mutex) {
+            //stop consuming
+
+            for (final Subscription subscription : subscriptions) {
+                subscription.unsubscribe();
+            }
+        }
+    }
+
+
+    private void startWorker() {
+        synchronized (mutex) {
+
+            Observable<List<QueueMessage>> consumer =
+                    Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
+                        @Override
+                        public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
+
+                            //name our thread so it's easy to see
+                            Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
+
+                            List<QueueMessage> drainList = null;
+
+                            do {
+                                try {
+                                    drainList = take();
+                                    //emit our list in it's entity to hand off to a worker pool
+                                        subscriber.onNext(drainList);
+
+                                    //take since  we're in flight
+                                    inFlight.addAndGet( drainList.size() );
+                                }
+                                catch ( Throwable t ) {
+                                    final long sleepTime = indexProcessorFig.getFailureRetryTime();
+
+                                    logger.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, t );
+
+                                    if ( drainList != null ) {
+                                        inFlight.addAndGet( -1 * drainList.size() );
+                                    }
+
+
+                                    try {
+                                        Thread.sleep( sleepTime );
+                                    }
+                                    catch ( InterruptedException ie ) {
+                                        //swallow
+                                    }
+
+                                    indexErrorCounter.inc();
+                                }
+                            }
+                            while ( true );
+                        }
+                    } )        //this won't block our read loop, just reads and proceeds
+                        .flatMap( sqsMessages -> {
+
+                            //do this on a different schedule, and introduce concurrency with flatmap for faster processing
+                            return Observable.just( sqsMessages )
+
+                                             .map( messages -> {
+                                                 if ( messages == null || messages.size() == 0 ) {
+                                                     return null;
+                                                 }
+
+                                                 try {
+                                                     List<IndexEventResult> indexEventResults = callEventHandlers( messages );
+                                                     List<QueueMessage> messagesToAck = ackMessages( indexEventResults );
+
+                                                     if ( messagesToAck == null || messagesToAck.size() == 0 ) {
+                                                         logger.error(
+                                                             "No messages came back from the queue operation, should have seen {} messages",
+                                                                 messages.size() );
+                                                         return messagesToAck;
+                                                     }
+
+                                                     if ( messagesToAck.size() < messages.size() ) {
+                                                         logger.error( "Missing messages from queue post operation",
+                                                             messages, messagesToAck );
+                                                     }
+                                                     //ack each message, but only if we didn't error.
+                                                     ack( messagesToAck );
+                                                     return messagesToAck;
+                                                 }
+                                                 catch ( Exception e ) {
+                                                     logger.error( "failed to ack messages to sqs", e );
+                                                     return null;
+                                                     //do not rethrow so we can process all of them
+                                                 }
+                                             } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+                            //end flatMap
+                        }, indexProcessorFig.getEventConcurrencyFactor() );
+
+            //start in the background
+
+            final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe();
+
+            subscriptions.add(subscription);
+        }
+    }
+
+    /**
+     * Submit results to index and return the queue messages to be ack'd
+     * @param indexEventResults
+     * @return
+     */
+    private List<QueueMessage> ackMessages(List<IndexEventResult> indexEventResults) {
+        //if nothing came back then return null
+        if(indexEventResults==null){
+            return null;
+        }
+
+        // stream the messages to record the cycle time
+        return indexEventResults.stream()
+            .map(indexEventResult -> {
+                //record the cycle time
+                messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+                return indexEventResult;
+            })
+            // filter out messages that are not present, they were not processed and put into the results
+            .filter( result -> result.getQueueMessage().isPresent() )
+            .map(result -> result.getQueueMessage().get())
+            // collect
+            .collect(Collectors.toList());
+    }
+
+    public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
+        //change to id scope to avoid serialization issues
+        offer( new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, id ), updatedSince ) );
+    }
+
+    public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
+
+        List batch = new ArrayList<EdgeScope>();
+        for ( EdgeScope e : edges){
+            //change to id scope to avoid serialization issues
+            batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
+        }
+        offerBatch( batch );
+    }
+
+
+    public class IndexEventResult{
+        private final Optional<QueueMessage> queueMessage;
+        private final long creationTime;
+
+        public IndexEventResult(Optional<QueueMessage> queueMessage, long creationTime){
+
+            this.queueMessage = queueMessage;
+            this.creationTime = creationTime;
+        }
+
+
+        public Optional<QueueMessage> getQueueMessage() {
+            return queueMessage;
+        }
+
+        public long getCreationTime() {
+            return creationTime;
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 96da2df..abd4ce1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -104,14 +104,14 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
 
         switch (impl) {
             case LOCAL:
-                AmazonAsyncEventService eventService = new AmazonAsyncEventService(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory,
+                AsyncEventServiceImpl eventService = new AsyncEventServiceImpl(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory,
                     entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder,mapManagerFactory, queueFig,rxTaskScheduler);
                 eventService.MAX_TAKE = 1000;
                 return eventService;
             case SQS:
                 throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region");
             case SNS:
-                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
+                return new AsyncEventServiceImpl(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
                     entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
             default:
                 throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index 480756f..a47ec77 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -39,14 +39,6 @@ import rx.Observable;
 public interface EventBuilder {
 
     /**
-     * Return the cold observable of entity index update operations
-     * @param applicationScope
-     * @param entity
-     * @return
-     */
-    Observable<IndexOperationMessage> buildEntityIndexUpdate( ApplicationScope applicationScope, Entity entity );
-
-    /**
      * Return the cold observable of the new edge operation
      * @param applicationScope
      * @param entity
@@ -69,7 +61,9 @@ public interface EventBuilder {
      * @param entityId
      * @return
      */
-    EventBuilderImpl.EntityDeleteResults buildEntityDelete( ApplicationScope applicationScope, Id entityId );
+    EntityDeleteResults buildEntityDelete(ApplicationScope applicationScope, Id entityId );
+
+
 
     /**
      * Re-index an entity in the scope provided

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 4e476db..2edc668 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -73,19 +73,6 @@ public class EventBuilderImpl implements EventBuilder {
     }
 
 
-    @Override
-    public Observable<IndexOperationMessage> buildEntityIndexUpdate( final ApplicationScope applicationScope,
-                                                                     final Entity entity ) {
-        //process the entity immediately
-        //only process the same version, otherwise ignore
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Indexing  in app scope {} entity {}", entity, applicationScope);
-        }
-
-        return indexService.indexEntity( applicationScope, entity );
-    }
-
 
     @Override
     public Observable<IndexOperationMessage> buildNewEdge( final ApplicationScope applicationScope, final Entity entity,
@@ -118,7 +105,7 @@ public class EventBuilderImpl implements EventBuilder {
     //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
 
     @Override
-    public EntityDeleteResults buildEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
+    public EntityDeleteResults buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
         if (logger.isDebugEnabled()) {
             logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java
new file mode 100644
index 0000000..c0e022f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.util.UUID;
+
+public class IndexDocNotFoundException extends RuntimeException {
+
+    final UUID batchId;
+
+    public IndexDocNotFoundException(final UUID batchId){
+
+        super("Index batch ID "+batchId.toString()+" not found in map persistence");
+        this.batchId = batchId;
+
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 68c398f..7512c90 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.index;
 
 import java.util.Iterator;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.utils.UUIDUtils;
@@ -104,7 +105,8 @@ public class IndexServiceImpl implements IndexService {
 
         //do our observable for batching
         //try to send a whole batch if we can
-        final Observable<IndexOperationMessage>  batches =  sourceEdgesToIndex.buffer( indexFig.getIndexBatchSize() )
+        final Observable<IndexOperationMessage>  batches =  sourceEdgesToIndex
+            .buffer( indexFig.getIndexBatchSize(), 1000, TimeUnit.MILLISECONDS )
 
             //map into batches based on our buffer size
             .flatMap( buffer -> Observable.from( buffer )

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index bf444b5..d47e96c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -230,10 +230,16 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
 
 
             //entity is newer than ES version, could be an update or the entity is marked as deleted
-            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) {
+            if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ||
+                    !entity.getEntity().isPresent()  ||
+                    entity.getStatus() == MvccEntity.Status.DELETED ) {
 
-                logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+                // when updating entities, we don't delete previous versions from ES so this action is expected
+                if(logger.isDebugEnabled()){
+                    logger.debug( "Deindexing stale entity on edge {} for entityId {} and version {}",
                         searchEdge, entityId, entityVersion);
+                }
+
                 batch.deindex( searchEdge, entityId, candidateVersion );
                 return;
             }