You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/08/04 20:30:29 UTC

incubator-usergrid git commit: Updates amazon async service wiring to wire to event builder

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-900 [created] d67c220dd


Updates amazon async service wiring to wire to event builder


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d67c220d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d67c220d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d67c220d

Branch: refs/heads/USERGRID-900
Commit: d67c220dd7221131a5c7bbcfebcd60f78cddffb1
Parents: b9d2d35
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 4 12:30:27 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 4 12:30:27 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 132 +++++++++----------
 .../asyncevents/AsyncIndexProvider.java         |  11 +-
 .../asyncevents/EventBuilder.java               |  11 +-
 .../asyncevents/EventBuilderImpl.java           |  12 +-
 .../asyncevents/InMemoryAsyncEventService.java  |  14 +-
 .../asyncevents/model/EntityIndexEvent.java     |  16 ++-
 .../index/AmazonAsyncEventServiceTest.java      |   7 +-
 7 files changed, 107 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index ec43ab7..0429af3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -27,14 +27,15 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.codahale.metrics.Histogram;
 import com.google.common.base.Preconditions;
-import org.apache.usergrid.corepersistence.CpEntityManager;
+
 import org.apache.usergrid.corepersistence.asyncevents.model.*;
 import org.apache.usergrid.corepersistence.index.*;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
-import org.apache.usergrid.utils.UUIDUtils;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,15 +79,19 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private final QueueManager queue;
     private final QueueScope queueScope;
     private final IndexProcessorFig indexProcessorFig;
-    private final IndexService indexService;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final IndexLocationStrategyFactory indexLocationStrategyFactory;
     private final EntityIndexFactory entityIndexFactory;
+    private final EventBuilder eventBuilder;
+    private final RxTaskScheduler rxTaskScheduler;
 
     private final Timer readTimer;
     private final Timer writeTimer;
     private final Timer ackTimer;
 
+    /**
+     * This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions
+     */
     private final Object mutex = new Object();
 
     private final Counter indexErrorCounter;
@@ -99,19 +104,17 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
 
     @Inject
-    public AmazonAsyncEventService(final QueueManagerFactory queueManagerFactory,
-                                   final IndexProcessorFig indexProcessorFig,
-                                   final MetricsFactory metricsFactory,
-                                   final IndexService indexService,
-                                   final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                   final IndexLocationStrategyFactory indexLocationStrategyFactory,
-                                   final EntityIndexFactory entityIndexFactory
-    ) {
-
-        this.indexService = indexService;
+    public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory, final IndexProcessorFig indexProcessorFig,
+                                    final MetricsFactory metricsFactory,  final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                    final IndexLocationStrategyFactory indexLocationStrategyFactory, final EntityIndexFactory entityIndexFactory,
+                                    final EventBuilder eventBuilder,
+                                    final RxTaskScheduler rxTaskScheduler ) {
+
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
         this.entityIndexFactory = entityIndexFactory;
+        this.eventBuilder = eventBuilder;
+        this.rxTaskScheduler = rxTaskScheduler;
 
         this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALLREGIONS);
         this.queue = queueManagerFactory.getQueueManager(queueScope);
@@ -144,7 +147,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         try {
             //signal to SQS
-            this.queue.sendMessage(operation);
+            this.queue.sendMessage( operation );
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -157,7 +160,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         try {
             //signal to SQS
-            this.queue.sendMessages(operations);
+            this.queue.sendMessages( operations );
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -189,33 +192,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
     /**
      * Ack message in SQS
      */
-    public void ack(final List<QueueMessage> messages) {
-
-        final Timer.Context timer = this.ackTimer.time();
-
-        try{
-            // no op
-            if (messages.size() == 0) {
-                return;
-            }
-            queue.commitMessages(messages);
-
-            //decrement our in-flight counter
-            inFlight.addAndGet(-1 * messages.size());
-
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
-        }
-        finally {
-            timer.stop();
-        }
-
-
-    }
-
-    /**
-     * Ack message in SQS
-     */
     public void ack(final QueueMessage message) {
 
         final Timer.Context timer = this.ackTimer.time();
@@ -290,7 +266,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
                                        final Entity entity) {
 
-        offer(new EntityIndexEvent(new EntityIdScope(applicationScope, entity.getId())));
+        offer(new EntityIndexEvent(new EntityIdScope(applicationScope, entity.getId()), 0));
     }
 
 
@@ -298,7 +274,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityIndexUpdate");
 
-        final AsyncEvent event = (AsyncEvent) message.getBody();
+        final EntityIndexEvent event = (EntityIndexEvent) message.getBody();
 
         Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
         Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_INDEX, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getEventType()));
@@ -307,13 +283,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
         //only process the same version, otherwise ignore
         final EntityIdScope entityIdScope = event.getEntityIdScope();
         final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
+        final Id entityId = entityIdScope.getId();
+        final long updatedAfter = event.getUpdatedAfter();
+
+        final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
 
-        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
+        final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation );
 
-        ecm.load(entityIdScope.getId())
-                .first()
-                .flatMap(entity -> indexService.indexEntity(applicationScope, entity))
-                .doOnNext(ignore -> ack(message)).subscribe();
+        subscibeAndAck( observable, message );
     }
 
 
@@ -324,7 +301,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         EdgeIndexEvent operation = new EdgeIndexEvent(applicationScope, entity.getId(), newEdge);
 
-        offer(operation);
+        offer( operation );
     }
 
     public void handleEdgeIndex(final QueueMessage message) {
@@ -339,18 +316,21 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final ApplicationScope applicationScope = event.getApplicationScope();
         final Edge edge = event.getEdge();
 
-        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
 
-        ecm.load(event.getEntityId())
-                .flatMap(entity -> indexService.indexEdge(applicationScope, entity, edge))
-                .doOnNext(ignore -> ack(message)).subscribe();
+
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+        final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(event.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
+            applicationScope, entity, edge ) );
+
+        subscibeAndAck( edgeIndexObservable, message );
     }
 
     @Override
     public void queueDeleteEdge(final ApplicationScope applicationScope,
                                 final Edge edge) {
 
-        offer(new EdgeDeleteEvent(applicationScope, edge));
+        offer( new EdgeDeleteEvent( applicationScope, edge ) );
     }
 
     public void handleEdgeDelete(final QueueMessage message) {
@@ -367,15 +347,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
 
-        indexService.deleteIndexEdge(applicationScope, edge)
-                .doOnNext(ignore -> ack(message)).subscribe();
+        final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge( applicationScope, edge );
+
+        subscibeAndAck( observable, message );
     }
 
 
     @Override
     public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
 
-        offer(new EntityDeleteEvent(new EntityIdScope(applicationScope, entityId)));
+        offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
     }
 
     public void handleEntityDelete(final QueueMessage message) {
@@ -384,7 +365,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         final AsyncEvent event = (AsyncEvent) message.getBody();
         Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityDelete");
-        Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE, String.format("Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType()));
+        Preconditions.checkArgument( event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE,
+            String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType() ) );
 
         final ApplicationScope applicationScope = event.getApplicationScope();
         final Id entityId = event.getEntityId();
@@ -392,10 +374,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
         if (logger.isDebugEnabled())
             logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
 
-        ack(message);
+        ack( message );
+
+        final EventBuilderImpl.EntityDeleteResults
+            entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
+
+
+        final Observable merged = Observable.merge( entityDeleteResults.getEntitiesCompacted(),
+            entityDeleteResults.getIndexObservable() );
 
-        indexService.deleteEntityIndexes(applicationScope, entityId, UUIDUtils.maxTimeUUID(Long.MAX_VALUE))
-                .doOnNext(ignore -> ack(message)).subscribe();
+        subscibeAndAck( merged, message );
     }
 
 
@@ -407,9 +395,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
         Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType()));
 
         final IndexLocationStrategy indexLocationStrategy = event.getIndexLocationStrategy();
-        final EntityIndex index = entityIndexFactory.createEntityIndex(indexLocationStrategy);
+        final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
         index.initialize();
-        ack(message);
+        ack( message );
     }
 
     /**
@@ -494,7 +482,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
         //change to id scope to avoid serialization issues
-        offer(new EntityIndexEvent(new EntityIdScope(applicationScope, id)));
+        offer(new EntityIndexEvent(new EntityIdScope(applicationScope, id), updatedSince));
     }
 
     public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
@@ -502,8 +490,18 @@ public class AmazonAsyncEventService implements AsyncEventService {
         List batch = new ArrayList<EdgeScope>();
         for ( EdgeScope e : edges){
             //change to id scope to avoid serialization issues
-            batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode())));
+            batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
         }
-        offerBatch(batch);
+        offerBatch( batch );
+    }
+
+
+    /**
+     * Subscribes to the observable and acks the message via SQS on completion
+     * @param observable
+     * @param message
+     */
+    private void subscibeAndAck( final Observable<?> observable, final QueueMessage message ){
+       observable.doOnCompleted( ()-> ack(message)  ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 0a58369..0e773cf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -44,7 +44,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
 
     private final QueueManagerFactory queueManagerFactory;
     private final MetricsFactory metricsFactory;
-    private final IndexService indexService;
     private final RxTaskScheduler rxTaskScheduler;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final EventBuilder eventBuilder;
@@ -58,7 +57,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
     public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
                               final QueueManagerFactory queueManagerFactory,
                               final MetricsFactory metricsFactory,
-                              final IndexService indexService,
                               final RxTaskScheduler rxTaskScheduler,
                               final EntityCollectionManagerFactory entityCollectionManagerFactory,
                               final EventBuilder eventBuilder,
@@ -68,7 +66,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
         this.indexProcessorFig = indexProcessorFig;
         this.queueManagerFactory = queueManagerFactory;
         this.metricsFactory = metricsFactory;
-        this.indexService = indexService;
         this.rxTaskScheduler = rxTaskScheduler;
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.eventBuilder = eventBuilder;
@@ -97,11 +94,11 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
             case LOCAL:
                 return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously());
             case SQS:
-                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
-                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory );
+                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory,
+                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
             case SNS:
-                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
-                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory );
+                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory,
+                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
             default:
                 throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index f9f157e..d246e2f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -37,13 +37,14 @@ import rx.Observable;
  * Interface for constructing an observable stream to perform asynchonous events
  */
 public interface EventBuilder {
+
     /**
      * Return the cold observable of entity index update operations
      * @param applicationScope
      * @param entity
      * @return
      */
-    Observable<IndexOperationMessage> queueEntityIndexUpdate( ApplicationScope applicationScope, Entity entity );
+    Observable<IndexOperationMessage> buildEntityIndexUpdate( ApplicationScope applicationScope, Entity entity );
 
     /**
      * Return the cold observable of the new edge operation
@@ -52,7 +53,7 @@ public interface EventBuilder {
      * @param newEdge
      * @return
      */
-    Observable<IndexOperationMessage> queueNewEdge( ApplicationScope applicationScope, Entity entity, Edge newEdge );
+    Observable<IndexOperationMessage> buildNewEdge( ApplicationScope applicationScope, Entity entity, Edge newEdge );
 
     /**
      * Return the cold observable of the deleted edge operations
@@ -60,7 +61,7 @@ public interface EventBuilder {
      * @param edge
      * @return
      */
-    Observable<IndexOperationMessage> queueDeleteEdge( ApplicationScope applicationScope, Edge edge );
+    Observable<IndexOperationMessage> buildDeleteEdge( ApplicationScope applicationScope, Edge edge );
 
     /**
      * Return a ben with 2 obervable streams for entity delete.
@@ -68,14 +69,14 @@ public interface EventBuilder {
      * @param entityId
      * @return
      */
-    EventBuilderImpl.EntityDeleteResults queueEntityDelete( ApplicationScope applicationScope, Id entityId );
+    EventBuilderImpl.EntityDeleteResults buildEntityDelete( ApplicationScope applicationScope, Id entityId );
 
     /**
      * Re-index an entity in the scope provided
      * @param entityIndexOperation
      * @return
      */
-    Observable<IndexOperationMessage> index( EntityIndexOperation entityIndexOperation );
+    Observable<IndexOperationMessage> buildEntityIndex( EntityIndexOperation entityIndexOperation );
 
     /**
      * A bean to hold both our observables so the caller can choose the subscription mechanism.  Note that

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index e4cd4b5..46cec2e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -73,7 +73,7 @@ public class EventBuilderImpl implements EventBuilder {
 
 
     @Override
-    public Observable<IndexOperationMessage> queueEntityIndexUpdate( final ApplicationScope applicationScope,
+    public Observable<IndexOperationMessage> buildEntityIndexUpdate( final ApplicationScope applicationScope,
                                                                      final Entity entity ) {
 
         //process the entity immediately
@@ -89,7 +89,7 @@ public class EventBuilderImpl implements EventBuilder {
 
 
     @Override
-    public Observable<IndexOperationMessage> queueNewEdge( final ApplicationScope applicationScope, final Entity entity,
+    public Observable<IndexOperationMessage> buildNewEdge( final ApplicationScope applicationScope, final Entity entity,
                                                            final Edge newEdge ) {
 
         log.debug( "Indexing  in app scope {} with entity {} and new edge {}",
@@ -103,8 +103,8 @@ public class EventBuilderImpl implements EventBuilder {
 
 
     @Override
-    public Observable<IndexOperationMessage> queueDeleteEdge( final ApplicationScope applicationScope,
-                                                              final Edge edge ) {
+    public Observable<IndexOperationMessage> buildDeleteEdge( final ApplicationScope applicationScope, final Edge
+        edge ) {
         log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge );
 
         final Observable<IndexOperationMessage> edgeObservable =
@@ -121,7 +121,7 @@ public class EventBuilderImpl implements EventBuilder {
     //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
 
     @Override
-    public EntityDeleteResults queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
+    public EntityDeleteResults buildEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
         log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId );
 
         final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
@@ -163,7 +163,7 @@ public class EventBuilderImpl implements EventBuilder {
 
 
     @Override
-    public Observable<IndexOperationMessage> index( final EntityIndexOperation entityIndexOperation ) {
+    public Observable<IndexOperationMessage> buildEntityIndex( final EntityIndexOperation entityIndexOperation ) {
 
         final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index adb4a90..830033d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -20,8 +20,6 @@
 package org.apache.usergrid.corepersistence.asyncevents;
 
 
-import com.amazonaws.services.opsworks.model.App;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,19 +75,19 @@ public class InMemoryAsyncEventService implements AsyncEventService {
         //only process the same version, otherwise ignore
 
 
-        run( eventBuilder.queueEntityIndexUpdate(applicationScope, entity) );
+        run( eventBuilder.buildEntityIndexUpdate( applicationScope, entity ) );
     }
 
 
     @Override
     public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) {
-        run( eventBuilder.queueNewEdge(applicationScope, entity, newEdge) );
+        run( eventBuilder.buildNewEdge( applicationScope, entity, newEdge ) );
     }
 
 
     @Override
     public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) {
-        run( eventBuilder.queueDeleteEdge(applicationScope, edge) );
+        run( eventBuilder.buildDeleteEdge( applicationScope, edge ) );
     }
 
 
@@ -97,7 +95,7 @@ public class InMemoryAsyncEventService implements AsyncEventService {
     public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
 
         final EventBuilderImpl.EntityDeleteResults results =
-            eventBuilder.queueEntityDelete( applicationScope, entityId );
+            eventBuilder.buildEntityDelete( applicationScope, entityId );
 
         run( results.getIndexObservable() );
         run( results.getEntitiesCompacted() );
@@ -107,7 +105,7 @@ public class InMemoryAsyncEventService implements AsyncEventService {
     public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ) {
         final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince );
 
-        run(eventBuilder.index( entityIndexOperation ));
+        run(eventBuilder.buildEntityIndex( entityIndexOperation ));
     }
 
     public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
@@ -115,7 +113,7 @@ public class InMemoryAsyncEventService implements AsyncEventService {
             final EntityIndexOperation entityIndexOperation = new EntityIndexOperation(e.getApplicationScope(),
                 e.getEdge().getTargetNode(), updatedSince);
 
-            run(eventBuilder.index (entityIndexOperation));
+            run(eventBuilder.buildEntityIndex( entityIndexOperation ));
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
index 7b79987..81961a0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
@@ -27,10 +27,24 @@ import org.apache.usergrid.persistence.collection.serialization.impl.migration.E
  */
 @JsonDeserialize(as = AsyncEvent.class)
 public final class EntityIndexEvent extends AsyncEvent {
+
+    private long updatedAfter;
+
     public EntityIndexEvent() {
     }
 
-    public EntityIndexEvent(EntityIdScope entityIdScope) {
+    public EntityIndexEvent(EntityIdScope entityIdScope, final long updatedAfter ) {
         super(EventType.ENTITY_INDEX, entityIdScope);
+        this.updatedAfter = updatedAfter;
+    }
+
+
+    public long getUpdatedAfter() {
+        return updatedAfter;
+    }
+
+
+    public void setUpdatedAfter( long updatedAfter ) {
+        this.updatedAfter = updatedAfter;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index 9cf896c..d37701b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.junit.Rule;
 import org.junit.runner.RunWith;
@@ -71,6 +72,9 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
     @Inject
     public RxTaskScheduler rxTaskScheduler;
 
+    @Inject
+    public EventBuilder eventBuilder;
+
 
     @Inject
     public IndexLocationStrategyFactory indexLocationStrategyFactory;
@@ -81,8 +85,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
 
     @Override
     protected AsyncEventService getAsyncEventService() {
-        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
-                    entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory );
+        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
     }