You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2017/07/09 00:14:39 UTC

usergrid git commit: Converts de-indexing of edges and entities to identify documents in the index to delete based on data in Cassandra vs. resource intensive queries to the index. Fixes issue where nodes were not actually getting deleted from graph as m

Repository: usergrid
Updated Branches:
  refs/heads/collectionDelete a881d5b55 -> 99ba349c8


Converts de-indexing of edges and entities to identify documents in the index to delete based on data in Cassandra vs. resource intensive queries to the index. Fixes issue where nodes were not actually getting deleted from graph as marked edges were being filtered out during the delete process itself. Update to a newer vs. of jamm (used for jvm memory management in the test framework).


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/99ba349c
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/99ba349c
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/99ba349c

Branch: refs/heads/collectionDelete
Commit: 99ba349c822ebb5f4d7599854b46ca2115b13b10
Parents: a881d5b
Author: Michael Russo <ru...@google.com>
Authored: Sat Jul 8 17:10:25 2017 -0700
Committer: Michael Russo <ru...@google.com>
Committed: Sat Jul 8 17:10:25 2017 -0700

----------------------------------------------------------------------
 .../corepersistence/EntityManagerFig.java       |   2 +-
 .../asyncevents/AsyncEventServiceImpl.java      |  24 ++--
 .../asyncevents/EventBuilder.java               |  17 +--
 .../asyncevents/EventBuilderImpl.java           | 144 ++++++++++++++-----
 .../corepersistence/index/IndexService.java     |  16 +--
 .../corepersistence/index/IndexServiceImpl.java |  57 ++------
 .../read/traverse/AbstractReadGraphFilter.java  |  37 +----
 .../AbstractReadReverseGraphFilter.java         |  36 +----
 .../rx/EdgesFromSourceObservableIT.java         |   2 +-
 .../rx/EdgesToTargetObservableIT.java           |   4 +-
 .../core/src/test/resources/project.properties  |   2 +-
 .../persistence/graph/GraphManager.java         |   2 +-
 .../graph/impl/GraphManagerImpl.java            |  20 +--
 .../graph/impl/SimpleSearchByEdgeType.java      |   9 ++
 .../graph/impl/stage/NodeDeleteListener.java    |   4 +-
 .../impl/stage/NodeDeleteListenerImpl.java      |  62 +++-----
 .../graph/serialization/EdgesObservable.java    |   6 +-
 .../impl/EdgeMetadataSerializationV2Impl.java   |   1 -
 .../serialization/impl/EdgesObservableImpl.java |  20 ++-
 .../impl/TargetIdObservableImpl.java            |   4 +-
 .../impl/migration/EdgeDataMigrationImpl.java   |   2 +-
 .../graph/impl/NodeDeleteListenerTest.java      |   8 +-
 .../usergrid/persistence/index/EntityIndex.java |  12 +-
 .../index/impl/EsEntityIndexImpl.java           |  63 --------
 stack/pom.xml                                   |   8 +-
 stack/rest/pom.xml                              |   4 +-
 .../rest/src/test/resources/project.properties  |   2 +-
 stack/services/pom.xml                          |   4 +-
 .../src/test/resources/project.properties       |   2 +-
 stack/test-utils/pom.xml                        |   4 +-
 .../src/test/resources/project.properties       |   2 +-
 stack/tools/pom.xml                             |   2 +-
 stack/websocket/pom.xml                         |   2 +-
 33 files changed, 241 insertions(+), 343 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
index 872ffbb..46c7a1d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
@@ -39,7 +39,7 @@ public interface EntityManagerFig extends GuicyFig {
     int sleep();
 
     @Key( "usergrid.entityManager.enable_deindex_on_update" )
-    @Default( "true" )
+    @Default( "false" )
     boolean getDeindexOnUpdate();
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 5628a11..257e172 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -57,6 +57,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.queue.*;
 import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
+import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
@@ -668,8 +669,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
 
         // default this observable's return to empty index operation message if nothing is emitted
-        return eventBuilder.buildDeleteEdge(applicationScope, edge)
-            .toBlocking().lastOrDefault(new IndexOperationMessage());
+        return eventBuilder.buildDeleteEdge(applicationScope, edge);
 
     }
 
@@ -866,18 +866,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
         }
 
-        final EventBuilderImpl.EntityDeleteResults entityDeleteResults = markedOnly ?
+        final IndexOperationMessage indexOperationMessage = markedOnly ?
             eventBuilder.buildEntityDelete( applicationScope, entityId ) :
             eventBuilder.buildEntityDeleteAllVersions( applicationScope, entityId );
 
-
-        // Delete the entities and remove from graph separately
-        entityDeleteResults.getEntitiesDeleted().toBlocking().lastOrDefault(null);
-
-        entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);
-
-        // default this observable's return to empty index operation message if nothing is emitted
-        return entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(new IndexOperationMessage());
+        return indexOperationMessage;
 
     }
 
@@ -983,8 +976,13 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             startWorker(QUEUE_NAME_UTILITY);
         }
 
-        for (int i = 0; i < indexDeadCount; i++) {
-            startDeadQueueWorker(QUEUE_NAME);
+        if( indexQueue instanceof SNSQueueManagerImpl ) {
+            logger.info("Queue manager implementation supports dead letters, start dead letter queue worker.");
+            for (int i = 0; i < indexDeadCount; i++) {
+                startDeadQueueWorker(QUEUE_NAME);
+            }
+        }else{
+            logger.info("Queue manager implementation does NOT support dead letters, NOT starting dead letter queue worker.");
         }
 
         for (int i = 0; i < utilityDeadCount; i++) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index 8618c73..081b3bc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -27,6 +27,7 @@ import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -54,7 +55,7 @@ public interface EventBuilder {
      * @param edge
      * @return
      */
-    Observable<IndexOperationMessage> buildDeleteEdge( ApplicationScope applicationScope, Edge edge );
+    IndexOperationMessage buildDeleteEdge( ApplicationScope applicationScope, Edge edge );
 
     /**
      * Return a bin with 2 observable streams for entity delete.
@@ -62,7 +63,7 @@ public interface EventBuilder {
      * @param entityId
      * @return
      */
-    EntityDeleteResults buildEntityDelete(ApplicationScope applicationScope, Id entityId );
+    IndexOperationMessage buildEntityDelete(ApplicationScope applicationScope, Id entityId );
 
     /**
      * Return a bin with 2 observable streams for entity delete. This deletes all versions -- used only for an old
@@ -71,7 +72,7 @@ public interface EventBuilder {
      * @param entityId
      * @return
      */
-    EntityDeleteResults buildEntityDeleteAllVersions(ApplicationScope applicationScope, Id entityId );
+    IndexOperationMessage buildEntityDeleteAllVersions(ApplicationScope applicationScope, Id entityId );
 
 
 
@@ -103,17 +104,17 @@ public interface EventBuilder {
 
 
 
-        private final Observable<Id> compactedNode;
+        private final Observable<MarkedEdge> deletedEdges;
 
 
 
 
         public EntityDeleteResults( final Observable<IndexOperationMessage> indexOperationMessageObservable,
                                     final Observable<List<MvccLogEntry>> entitiesDeleted,
-                                    final Observable<Id> compactedNode) {
+                                    final Observable<MarkedEdge> deletedEdges) {
             this.indexOperationMessageObservable = indexOperationMessageObservable;
             this.entitiesDeleted = entitiesDeleted;
-            this.compactedNode = compactedNode;
+            this.deletedEdges = deletedEdges;
         }
 
 
@@ -125,8 +126,8 @@ public interface EventBuilder {
             return entitiesDeleted;
         }
 
-        public Observable<Id> getCompactedNode() {
-            return compactedNode;
+        public Observable<MarkedEdge> getEdgesDeleted() {
+            return deletedEdges;
         }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 33d384e..ade6818 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -90,22 +90,45 @@ public class EventBuilderImpl implements EventBuilder {
 
 
     @Override
-    public Observable<IndexOperationMessage> buildDeleteEdge( final ApplicationScope applicationScope, final Edge
+    public IndexOperationMessage buildDeleteEdge( final ApplicationScope applicationScope, final Edge
         edge ) {
         if (logger.isDebugEnabled()) {
             logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
         }
 
         final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
-        return gm.deleteEdge( edge )
-            .flatMap( deletedEdge -> indexService.deleteIndexEdge( applicationScope, deletedEdge ));
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+        final IndexOperationMessage combined = new IndexOperationMessage();
+
+        gm.deleteEdge( edge )
+            .doOnNext( deletedEdge -> {
+
+                logger.debug("Processing deleted edge for de-indexing {}", deletedEdge);
+
+                // get ALL versions of the target node as any connection from this source node needs to be removed
+                ecm.getVersionsFromMaxToMin(deletedEdge.getTargetNode(), UUIDUtils.newTimeUUID())
+                    .doOnNext(mvccLogEntry -> {
+                        logger.debug("Adding edge {} mvccLogEntry {} to de-index batch", deletedEdge.getTargetNode(), mvccLogEntry);
+                        combined.ingest(
+                            indexService
+                                .deIndexEdge(applicationScope, deletedEdge, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion())
+                                .toBlocking().lastOrDefault(new IndexOperationMessage()));
+
+                    }).toBlocking().lastOrDefault(null);
+
+            }).toBlocking().lastOrDefault(null);
+
+        return combined;
     }
 
 
     //Does the queue entityDelete mark the entity then immediately does to the deleteEntityIndex. seems like
     //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
 
-    private EntityDeleteResults buildEntityDeleteCommon(final ApplicationScope applicationScope, final Id entityId, boolean markedOnly) {
+    private IndexOperationMessage buildEntityDeleteCommon(final ApplicationScope applicationScope, final Id entityId,
+                                                          boolean markedOnly) {
+
         if (logger.isDebugEnabled()) {
             logger.debug("Deleting entity id ({} versions) from index in app scope {} with entityId {}",
                 markedOnly ? "marked" : "all", applicationScope, entityId);
@@ -122,42 +145,91 @@ public class EventBuilderImpl implements EventBuilder {
             ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking()
                 .firstOrDefault( null );
 
-        // De-indexing and entity deletes don't check log entries.  We must do that first. If no DELETED logs, then
-        // return an empty observable as our no-op.
-        Observable<IndexOperationMessage> deIndexObservable = Observable.empty();
-        Observable<List<MvccLogEntry>> ecmDeleteObservable = Observable.empty();
-
-        if(mostRecentToDelete != null || !markedOnly){
-
-            // fetch entity versions to be de-index by looking in cassandra
-            deIndexObservable = markedOnly ?
-                indexService.deIndexEntity(applicationScope, entityId, mostRecentToDelete.getVersion(),
-                    getVersionsOlderThanMarked(ecm, entityId, mostRecentToDelete.getVersion())) :
-                indexService.deIndexEntity(applicationScope, entityId, UUIDUtils.newTimeUUID(),
-                    getAllVersions(ecm, entityId));
-
-            ecmDeleteObservable =
-                ecm.getVersionsFromMaxToMin( entityId, mostRecentToDelete.getVersion() )
-                    .filter( mvccLogEntry->
-                        mvccLogEntry.getVersion().timestamp() <= mostRecentToDelete.getVersion().timestamp() )
-                    .buffer( serializationFig.getBufferSize() )
-                    .doOnNext( buffer -> ecm.delete( buffer ) );
-        }
 
-        // Graph compaction checks the versions inside compactNode, just build this up for the caller to subscribe to
-        final Observable<Id> graphCompactObservable = gm.compactNode(entityId);
+        // if only marked entities should be deleted and nothing is marked, then abort
+        if(markedOnly && mostRecentToDelete == null){
+            return new IndexOperationMessage();
+        }
 
-        return new EntityDeleteResults( deIndexObservable, ecmDeleteObservable, graphCompactObservable );
+        final List<MvccLogEntry> logEntries = new ArrayList<>();
+        Observable<MvccLogEntry> mvccLogEntryListObservable =
+            ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() );
+            if(markedOnly){
+                mvccLogEntryListObservable
+                    .filter(mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED);
+            }
+            mvccLogEntryListObservable
+                .filter( mvccLogEntry-> mvccLogEntry.getVersion().timestamp() <= mostRecentToDelete.getVersion().timestamp() )
+                .buffer( serializationFig.getBufferSize() )
+                .doOnNext( buffer -> ecm.delete( buffer ) )
+                .doOnNext(mvccLogEntries -> {
+                        logEntries.addAll(mvccLogEntries);
+                }).toBlocking().lastOrDefault(null);
+
+        IndexOperationMessage combined = new IndexOperationMessage();
+
+        // do the edge deletes and build up de-index messages for each edge deleted
+        // assume we have "server1" and "region1" nodes in the graph with the following relationships (edges/connections):
+        //
+        // region1  -- zzzconnzzz|has -->  server1
+        // server1  -- zzzconnzzz|in  -->  region1
+        //
+        // there will always be a relationship from the appId to each entity based on the entity type (collection):
+        //
+        // application -- zzzcollzzz|servers --> server1
+        // application -- zzzcollzzz|regions --> region1
+        //
+        // When deleting either "server1" or "region1" entity, the connections should get deleted and de-indexed along
+        // with the entry for the entity itself in the collection. The above example should have at minimum 3 things to
+        // be de-indexed. There may be more as either "server1" or "region1" could have multiple versions.
+        //
+        // Further comments using the example of deleting "server1" from the above example.
+        gm.compactNode(entityId).doOnNext(markedEdge -> {
+
+            logger.debug("Processing deleted edge for de-indexing {}", markedEdge);
+
+            // if the edge was for a connection where the entity to be deleted is the source node, we need to load
+            // the target node's versions so that all versions of connections to that entity can be de-indexed
+            // server1  -- zzzconnzzz|in  -->  region1
+            if(!markedEdge.getTargetNode().getType().equals(entityId.getType())){
+
+                // get ALL versions of the target node as any connection from this source node needs to be removed
+                ecm.getVersionsFromMaxToMin( markedEdge.getTargetNode(), UUIDUtils.newTimeUUID() )
+                    .doOnNext(mvccLogEntry -> {
+                        logger.debug("Adding edge {} mvccLogEntry {} to de-index batch", markedEdge, mvccLogEntry);
+                        combined.ingest(
+                            indexService
+                                .deIndexEdge(applicationScope, markedEdge, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion())
+                                .toBlocking().lastOrDefault(new IndexOperationMessage()));
+
+                    }).toBlocking().lastOrDefault(null);
+
+            }else {
+
+                // for each version of the entity being deleted, de-index the connections where the entity is the target
+                // node ( application -- zzzcollzzz|servers --> server1 ) or (region1  -- zzzconnzzz|has -->  server1)
+                logEntries.forEach(logEntry -> {
+                    logger.debug("Adding edge {} mvccLogEntry {} to de-index batch", markedEdge, logEntry);
+                    combined.ingest(
+                        indexService
+                            .deIndexEdge(applicationScope, markedEdge, logEntry.getEntityId(), logEntry.getVersion())
+                            .toBlocking().lastOrDefault(new IndexOperationMessage()));
+                });
+            }
+
+        }).toBlocking().lastOrDefault(null);
+
+        return combined;
     }
 
     @Override
-    public EntityDeleteResults buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
+    public IndexOperationMessage buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
         return buildEntityDeleteCommon(applicationScope, entityId, true);
     }
 
     // this deletes all versions of an entity, only used for collection delete
     @Override
-    public EntityDeleteResults buildEntityDeleteAllVersions(final ApplicationScope applicationScope, final Id entityId ) {
+    public IndexOperationMessage buildEntityDeleteAllVersions(final ApplicationScope applicationScope, final Id entityId ) {
         return buildEntityDeleteCommon(applicationScope, entityId, false);
     }
 
@@ -201,22 +273,22 @@ public class EventBuilderImpl implements EventBuilder {
 
 
         return indexService.deIndexOldVersions( applicationScope, entityId,
-            getVersionsOlderThanMarked(ecm, entityId, markedVersion), markedVersion);
+            getVersionsOlderThanOrEqualToMarked(ecm, entityId, markedVersion));
 
     }
 
 
-    private List<UUID> getVersionsOlderThanMarked( final EntityCollectionManager ecm,
-                                                   final Id entityId, final UUID markedVersion ){
+    private List<UUID> getVersionsOlderThanOrEqualToMarked(final EntityCollectionManager ecm,
+                                                           final Id entityId, final UUID markedVersion ){
 
         final List<UUID> versions = new ArrayList<>();
 
-        // only take last 5 versions to avoid eating memory. a tool can be built for massive cleanups for old usergrid
+        // only take last 100 versions to avoid eating memory. a tool can be built for massive cleanups for old usergrid
         // clusters that do not have this in-line cleanup
         ecm.getVersionsFromMaxToMin( entityId, markedVersion)
-            .take(5)
+            .take(100)
             .forEach( mvccLogEntry -> {
-                if ( mvccLogEntry.getVersion().timestamp() < markedVersion.timestamp() ) {
+                if ( mvccLogEntry.getVersion().timestamp() <= markedVersion.timestamp() ) {
                     versions.add(mvccLogEntry.getVersion());
                 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
index b989a9c..9d43b99 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
@@ -69,18 +69,15 @@ public interface IndexService {
      */
     Observable<IndexOperationMessage> deleteIndexEdge(final ApplicationScope applicationScope, final Edge edge);
 
-
     /**
-     * De-index all documents with the specified entityId and versions provided.  This will also remove any documents
-     * where the entity is a source/target node ( index docs where this entityId is a part of connections).
-     *
+     * Delete an index edge from the specified scope for a specific entity version
      * @param applicationScope
-     * @param entityId
-     * @param markedVersion
+     * @param edge
      * @return
      */
-    Observable<IndexOperationMessage> deIndexEntity(final ApplicationScope applicationScope, final Id entityId,
-                                                    final UUID markedVersion, final List<UUID> allVersionsBeforeMarked);
+    Observable<IndexOperationMessage> deIndexEdge(final ApplicationScope applicationScope, final Edge edge,
+                                                  final Id entityId, final UUID entityVersion);
+
 
 
     /**
@@ -88,10 +85,9 @@ public interface IndexService {
      *
      * @param applicationScope
      * @param entityId
-     * @param markedVersion
      * @return
      */
     Observable<IndexOperationMessage> deIndexOldVersions(final ApplicationScope applicationScope, final Id entityId,
-                                                         final List<UUID> versions, UUID markedVersion);
+                                                         final List<UUID> versions);
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 8473b2e..c8dfc31 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -29,9 +29,7 @@ import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.*;
 import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.index.*;
@@ -101,7 +99,7 @@ public class IndexServiceImpl implements IndexService {
 
 
         //we always index in the target scope
-        final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId );
+        final Observable<Edge> edgesToTarget = edgesObservable.edgesToTarget( gm, entityId, true);
 
         //we may have to index  we're indexing from source->target here
         final Observable<IndexEdge> sourceEdgesToIndex = edgesToTarget.map( edge -> generateScopeFromSource( edge ) );
@@ -214,10 +212,8 @@ public class IndexServiceImpl implements IndexService {
         return Optional.of(defaultProperties);
     }
 
-    //Steps to delete an IndexEdge.
-    //1.Take the search edge given and search for all the edges in elasticsearch matching that search edge
-    //2. Batch Delete all of those edges returned in the previous search.
-    //TODO: optimize loops further.
+    // DO NOT USE THIS AS THE QUERY TO ES CAN CAUSE EXTREME LOAD
+    // TODO REMOVE THIS AND UPDATE THE TESTS TO NOT USE THIS METHOD
     @Override
     public Observable<IndexOperationMessage> deleteIndexEdge( final ApplicationScope applicationScope,
                                                               final Edge edge ) {
@@ -254,53 +250,28 @@ public class IndexServiceImpl implements IndexService {
         return ObservableTimer.time( batches, addTimer );
     }
 
-
     @Override
-    public Observable<IndexOperationMessage> deIndexEntity( final ApplicationScope applicationScope, final Id entityId,
-                                                            final UUID markedVersion,
-                                                            final List<UUID> allVersionsBeforeMarked ) {
-
-        final EntityIndex ei = entityIndexFactory.
-            createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
-        if (logger.isDebugEnabled()) {
-            logger.debug("deIndexEntity: entityId={}:{}, markedVersion={}, otherVersionsSize={}",
-                entityId.getUuid().toString(), entityId.getType(), markedVersion.toString(), allVersionsBeforeMarked.size());
-        }
-
-        // use LONG.MAX_VALUE in search edge because this value is not used elsewhere in lower code foe de-indexing
-        // previously .timstamp() was used on entityId, but some entities do not have type-1 UUIDS ( legacy data)
-        final SearchEdge searchEdgeFromSource = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
-            CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId,
-            Long.MAX_VALUE ) );
-
+    public Observable<IndexOperationMessage> deIndexEdge(final ApplicationScope applicationScope, final Edge edge,
+                                                         final Id entityId, final UUID entityVersion){
 
-
-        final EntityIndexBatch batch = ei.createBatch();
-
-        // de-index each version of the entity before the marked version
-        allVersionsBeforeMarked.forEach(version -> batch.deindex(searchEdgeFromSource, entityId, version));
-
-
-        // for now, query the index to remove docs where the entity is source/target node and older than markedVersion
-        // TODO: investigate getting this information from graph
-        CandidateResults candidateResults = ei.getNodeDocsOlderThanMarked(entityId, markedVersion );
-        candidateResults.forEach(candidateResult -> batch.deindex(candidateResult));
-
-        return Observable.just(batch.build());
+        final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope));
+        final EntityIndexBatch entityBatch = ei.createBatch();
+        entityBatch.deindex(generateScopeFromSource( edge ), entityId, entityVersion);
+        return Observable.just(entityBatch.build());
 
     }
 
+
     @Override
     public Observable<IndexOperationMessage> deIndexOldVersions(final ApplicationScope applicationScope,
                                                                 final Id entityId,
-                                                                final List<UUID> versions,
-                                                                UUID markedVersion) {
+                                                                final List<UUID> versions) {
 
         final EntityIndex ei = entityIndexFactory.
             createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
 
-        // use LONG.MAX_VALUE in search edge because this value is not used elsewhere in lower code foe de-indexing
-        // previously .timstamp() was used on entityId, but some entities do not have type-1 UUIDS ( legacy data)
+        // use LONG.MAX_VALUE in search edge because this value is not used elsewhere in lower code for de-indexing
+        // previously .timsetamp() was used on entityId, but some entities do not have type-1 UUIDS ( legacy data)
         final SearchEdge searchEdgeFromSource = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
             CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId,
             Long.MAX_VALUE ) );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 83f4c8b..e9aa6c8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -121,12 +121,8 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
                 if (isDeleted) {
 
                     logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
-                    final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
-
-                    indexMessageObservable
-                        .compose(applyCollector())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
 
                 }
 
@@ -135,18 +131,8 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
                     final Id sourceNodeId = markedEdge.getSourceNode();
                     logger.info("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
 
-                    final EventBuilderImpl.EntityDeleteResults
-                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
-
-                    entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
-
-                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
-                        entityDeleteResults.getCompactedNode())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
-                        subscribe();
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
 
                 }
 
@@ -155,19 +141,8 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
                     final Id targetNodeId = markedEdge.getTargetNode();
                     logger.info("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
 
-                    final EventBuilderImpl.EntityDeleteResults
-                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
-
-                    entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
-
-                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
-                        entityDeleteResults.getCompactedNode())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
-                        subscribe();
-
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
                 }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
index 1afb524..1b662cc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadReverseGraphFilter.java
@@ -121,12 +121,8 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
                 if (isDeleted) {
 
                     logger.info("Edge {} is deleted when seeking, deleting the edge", markedEdge);
-                    final Observable<IndexOperationMessage> indexMessageObservable = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
-
-                    indexMessageObservable
-                        .compose(applyCollector())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildDeleteEdge(applicationScope, markedEdge);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
 
                 }
 
@@ -135,18 +131,8 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
                     final Id sourceNodeId = markedEdge.getSourceNode();
                     logger.info("Edge {} has a deleted source node, deleting the entity for id {}", markedEdge, sourceNodeId);
 
-                    final EventBuilderImpl.EntityDeleteResults
-                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
-
-                    entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
-
-                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
-                        entityDeleteResults.getCompactedNode())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
-                        subscribe();
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, sourceNodeId);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
 
                 }
 
@@ -155,18 +141,8 @@ public abstract class AbstractReadReverseGraphFilter extends AbstractPathFilter<
                     final Id targetNodeId = markedEdge.getTargetNode();
                     logger.info("Edge {} has a deleted target node, deleting the entity for id {}", markedEdge, targetNodeId);
 
-                    final EventBuilderImpl.EntityDeleteResults
-                        entityDeleteResults = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
-
-                    entityDeleteResults.getIndexObservable()
-                        .compose(applyCollector())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler())
-                        .subscribe();
-
-                    Observable.merge(entityDeleteResults.getEntitiesDeleted(),
-                        entityDeleteResults.getCompactedNode())
-                        .subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).
-                        subscribe();
+                    final IndexOperationMessage indexOperationMessage = eventBuilder.buildEntityDelete(applicationScope, targetNodeId);
+                    asyncEventService.queueIndexOperationMessage(indexOperationMessage, true);
 
                 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
index 3bfe460..68834b3 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
@@ -92,7 +92,7 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        edgesToTargetObservable.edgesToTarget( gm, target ).doOnNext( new Action1<Edge>() {
+        edgesToTargetObservable.edgesToTarget( gm, target, true).doOnNext(new Action1<Edge>() {
             @Override
             public void call( final Edge edge ) {
                 final String edgeType = edge.getType();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index 9e84219..55b77c0 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -89,7 +89,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        edgesFromSourceObservable.edgesFromSourceDescending( gm, applicationId ).doOnNext( edge -> {
+        edgesFromSourceObservable.edgesFromSourceDescending( gm, applicationId, true).doOnNext(edge -> {
             final String edgeType = edge.getType();
             final Id target = edge.getTargetNode();
 
@@ -118,7 +118,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         //test connections
 
-        edgesFromSourceObservable.edgesFromSourceDescending( gm, source ).doOnNext( edge -> {
+        edgesFromSourceObservable.edgesFromSourceDescending( gm, source, true).doOnNext(edge -> {
             final String edgeType = edge.getType();
             final Id target = edge.getTargetNode();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/core/src/test/resources/project.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/project.properties b/stack/core/src/test/resources/project.properties
index 1a848bc..77a785a 100644
--- a/stack/core/src/test/resources/project.properties
+++ b/stack/core/src/test/resources/project.properties
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 target.directory=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
index 000c633..b746c61 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -93,7 +93,7 @@ public interface GraphManager extends CPManager {
      * @param node The node to remove.  This will apply a timestamp to apply the delete + compact operation.  Any edges connected to this node with a timestamp
      * <= the specified time on the mark will be removed from the graph
      */
-    Observable<Id> compactNode( final Id node );
+    Observable<MarkedEdge> compactNode( final Id node );
 
     /**
      * Get all versions of this edge where versions <= max version

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 5fcdcb4..d22ac65 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -262,24 +262,16 @@ public class GraphManagerImpl implements GraphManager {
 
 
     @Override
-    public Observable<Id> compactNode( final Id inputNode ) {
-
+    public Observable<MarkedEdge> compactNode( final Id inputNode ) {
 
         final UUID startTime = UUIDGenerator.newTimeUUID();
 
-
-        final Observable<Id> nodeObservable =
-            Observable.just( inputNode ).map( node -> nodeSerialization.getMaxVersion( scope, node ) ).takeWhile(
-                maxTimestamp -> maxTimestamp.isPresent() )
-
+        final Observable<MarkedEdge> nodeObservable =
+            Observable.just( inputNode )
+                .map( node -> nodeSerialization.getMaxVersion( scope, node ) )
+                .takeWhile(maxTimestamp -> maxTimestamp.isPresent() )
                 //map our delete listener
-                .flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) )
-                    //set to 0 if nothing is emitted
-                .lastOrDefault( 0 )
-                    //log for posterity
-                .doOnNext( count -> logger.trace( "Removed {} edges from node {}", count, inputNode ) )
-                    //return our id
-                .map( count -> inputNode );
+                .flatMap( timestamp -> nodeDeleteListener.receive( scope, inputNode, startTime ) );
 
         return ObservableTimer.time( nodeObservable, this.deleteNodeTimer );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
index 9392dbc..71d2f1d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
@@ -169,6 +169,15 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
         return true;
     }
 
+    @Override
+    public String toString(){
+        return "SimpleSearchByEdgeType{node="+node
+            +", type="+type
+            +", maxTimestamp="+maxTimestamp
+            +", order="+order
+            +", filterMarked="+filterMarked
+            +", last="+last+"}";
+    }
 
     @Override
     public int hashCode() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
index 68569e5..2181526 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListener.java
@@ -22,6 +22,8 @@ package org.apache.usergrid.persistence.graph.impl.stage;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
@@ -42,5 +44,5 @@ public interface NodeDeleteListener {
        * @return An observable that emits the total number of edges that have been removed with this node both as the
        *         target and source
        */
-    Observable<Integer> receive( final ApplicationScope scope, final Id node, final UUID timestamp );
+    Observable<MarkedEdge> receive(final ApplicationScope scope, final Id node, final UUID timestamp );
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index cd5b1a8..f13f552 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -98,50 +98,34 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
      * @return An observable that emits the total number of edges that have been removed with this node both as the
      *         target and source
      */
-    public Observable<Integer> receive( final ApplicationScope scope, final Id node, final UUID timestamp ) {
+    public Observable<MarkedEdge> receive( final ApplicationScope scope, final Id node, final UUID timestamp ) {
 
 
         return Observable.just( node )
 
                 //delete source and targets in parallel and merge them into a single observable
-                .flatMap( new Func1<Id, Observable<Integer>>() {
-                    @Override
-                    public Observable<Integer> call( final Id node ) {
-
-                        final Optional<Long> maxVersion = nodeSerialization.getMaxVersion( scope, node );
-
-                        if (logger.isTraceEnabled()) {
-                            logger.trace("Node with id {} has max version of {}", node, maxVersion.orNull());
-                        }
-
-
-                        if ( !maxVersion.isPresent() ) {
-                            return Observable.empty();
-                        }
-
-
-                        //do all the delete, then when done, delete the node
-                        return doDeletes( node, scope, maxVersion.get(), timestamp ).count()
-                                //if nothing is ever emitted, emit 0 so that we know no operations took place.
-                                // Finally remove
-                                // the
-                                // target node in the mark
-                                .doOnCompleted( new Action0() {
-                                    @Override
-                                    public void call() {
-                                        try {
-                                            nodeSerialization.delete( scope, node, maxVersion.get()).execute();
-                                        }
-                                        catch ( ConnectionException e ) {
-                                            throw new RuntimeException( "Unable to connect to casandra", e );
-                                        }
-                                    }
-                                } );
-                    }
-                } ).defaultIfEmpty( 0 );
-    }
+                .flatMap( id -> {
 
+                    final Optional<Long> maxVersion = nodeSerialization.getMaxVersion( scope, node );
+
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("Node with id {} has max version of {}", node, maxVersion.orNull());
+                    }
+                    if ( !maxVersion.isPresent() ) {
+                        return Observable.empty();
+                    }
 
+                    // do all the edge deletes and then remove the marked node, return all edges just deleted
+                    return
+                        doDeletes( node, scope, maxVersion.get(), timestamp ).doOnCompleted( () -> {
+                            try {
+                                nodeSerialization.delete( scope, node, maxVersion.get()).execute();
+                            } catch ( ConnectionException e ) {
+                                throw new RuntimeException( "Unable to connect to cassandra", e );
+                            }
+                        });
+                });
+    }
     /**
      * Do the deletes
      */
@@ -162,7 +146,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
                                 return storageSerialization.getEdgesToTarget(scope,
-                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
+                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent(), false));
                             }
                         }));
 
@@ -174,7 +158,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
                                 return storageSerialization.getEdgesFromSource(scope,
-                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent()));
+                                    new SimpleSearchByEdgeType(node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent(), false));
                             }
                         }));
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 78a1d4b..5577bd0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -36,9 +36,10 @@ public interface EdgesObservable {
      * Return an observable of all edges from a source
      * @param gm
      * @param sourceNode
+     * @param filterMarked
      * @return
      */
-    Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode );
+    Observable<Edge> edgesFromSourceDescending(final GraphManager gm, final Id sourceNode, boolean filterMarked);
 
 
     /**
@@ -54,9 +55,10 @@ public interface EdgesObservable {
      * Return an observable of all edges to a target
      * @param gm
      * @param targetNode
+     * @param filterMarked
      * @return
      */
-    Observable<Edge> edgesToTarget(final GraphManager gm,  final Id targetNode);
+    Observable<Edge> edgesToTarget(final GraphManager gm, final Id targetNode, boolean filterMarked);
 
     /**
      * Return an observable of all edges from a source node.  Ordered ascending, from the startTimestamp if specified

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
index 1f81864..e9e2b28 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
@@ -400,7 +400,6 @@ public class EdgeMetadataSerializationV2Impl implements EdgeMetadataSerializatio
         ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateSearchEdgeType( search );
 
-
         final Id applicationId = scope.getApplication();
         final Id searchNode = search.getNode();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index 20efe42..685b84c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -35,7 +35,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import com.google.common.base.Optional;
 
 import rx.Observable;
-import rx.functions.Func1;
 
 
 /**
@@ -55,7 +54,7 @@ public class EdgesObservableImpl implements EdgesObservable {
      * Get all edges from the source
      */
     @Override
-    public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode ) {
+    public Observable<Edge> edgesFromSourceDescending(final GraphManager gm, final Id sourceNode, boolean filterMarked) {
         final Observable<String> edgeTypes =
             gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
 
@@ -67,7 +66,7 @@ public class EdgesObservableImpl implements EdgesObservable {
 
                 return gm.loadEdgesFromSource(
                     new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                        Optional.<Edge>absent() ) );
+                        Optional.<Edge>absent(), filterMarked ) );
         } );
     }
 
@@ -119,19 +118,16 @@ public class EdgesObservableImpl implements EdgesObservable {
      * Get all edges from the source
      */
     @Override
-    public Observable<Edge> edgesToTarget( final GraphManager gm, final Id targetNode ) {
-        final Observable<String> edgeTypes =
-            gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) );
-
-        return edgeTypes.flatMap( edgeType -> {
+    public Observable<Edge> edgesToTarget(final GraphManager gm, final Id targetNode, boolean filterMarked) {
 
-            if (logger.isTraceEnabled()) {
+        return gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) )
+            .flatMap( edgeType -> {
+                if (logger.isTraceEnabled()) {
                 logger.trace("Loading edges of edgeType {} to {}", edgeType, targetNode);
-            }
-
+                }
             return gm.loadEdgesToTarget(
                 new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                    Optional.<Edge>absent() ) );
+                    Optional.<Edge>absent(), filterMarked ) );
         } );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
index 6a08d46..69dd43b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.graph.serialization.impl;
 
 
 import com.google.inject.Inject;
-import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
 import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
@@ -29,7 +28,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
-import rx.functions.Func1;
 
 /**
  * Emits the id of all nodes that are target nodes from the given source node
@@ -55,7 +53,7 @@ public class TargetIdObservableImpl implements TargetIdObservable {
     public Observable<Id> getTargetNodes(final GraphManager gm, final Id sourceNode) {
 
         //only search edge types that start with collections
-        return edgesFromSourceObservable.edgesFromSourceDescending( gm, sourceNode ).map( edge -> {
+        return edgesFromSourceObservable.edgesFromSourceDescending( gm, sourceNode, true).map(edge -> {
             final Id targetNode = edge.getTargetNode();
 
             if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
index 8eccdbd..1d4331c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
@@ -89,7 +89,7 @@ public class EdgeDataMigrationImpl implements DataMigration {
             final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope );
 
             //get edges from the source
-            return edgesFromSourceObservable.edgesFromSourceDescending( gm, graphNode.entryNode ).buffer( 1000 )
+            return edgesFromSourceObservable.edgesFromSourceDescending( gm, graphNode.entryNode, true).buffer( 1000 )
                                             .doOnNext( edges -> {
                                                     final MutationBatch batch = keyspace.prepareMutationBatch();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index 438a978..80198de 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -135,7 +135,7 @@ public class NodeDeleteListenerTest {
         UUID eventTime = UUIDGenerator.newTimeUUID();
 
 
-        int count = deleteListener.receive( scope, sourceNode, eventTime ).toBlocking().last();
+        int count = deleteListener.receive( scope, sourceNode, eventTime ).count().toBlocking().last();
 
         assertEquals( "Mark was not set, no delete should be executed", 0, count );
 
@@ -171,7 +171,7 @@ public class NodeDeleteListenerTest {
 
         nodeSerialization.mark( scope, sourceNode, timestamp ).execute();
 
-        int count = deleteListener.receive( scope, sourceNode, deleteEventTimestamp ).toBlocking().last();
+        int count = deleteListener.receive( scope, sourceNode, deleteEventTimestamp ).count().toBlocking().last();
 
         assertEquals( 1, count );
 
@@ -256,7 +256,7 @@ public class NodeDeleteListenerTest {
 
         nodeSerialization.mark( scope, targetNode, deleteBefore ).execute();
 
-        int count = deleteListener.receive( scope, targetNode, UUIDGenerator.newTimeUUID() ).toBlocking().last();
+        int count = deleteListener.receive( scope, targetNode, UUIDGenerator.newTimeUUID() ).count().toBlocking().last();
 
         assertEquals( 1, count );
 
@@ -366,7 +366,7 @@ public class NodeDeleteListenerTest {
 
         nodeSerialization.mark( scope, toDelete, deleteVersion ).execute();
 
-        int count = deleteListener.receive( scope, toDelete, UUIDGenerator.newTimeUUID() ).toBlocking().last();
+        int count = deleteListener.receive( scope, toDelete, UUIDGenerator.newTimeUUID() ).count().toBlocking().last();
 
         assertEquals( edgeCount, count );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index 14020a9..b444199 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -20,14 +20,12 @@
 package org.apache.usergrid.persistence.index;
 
 
-import com.google.common.base.Optional;
 import org.apache.usergrid.persistence.core.CPManager;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.model.entity.Id;
 import rx.Observable;
 
 import java.util.Map;
-import java.util.UUID;
 
 
 /**
@@ -36,7 +34,7 @@ import java.util.UUID;
 public interface EntityIndex extends CPManager {
 
 
-    public static final int MAX_LIMIT = 1000;
+    int MAX_LIMIT = 1000;
 
     /**
      * Create an index and add to alias, will create alias and remove any old index from write alias if alias already exists
@@ -134,14 +132,6 @@ public interface EntityIndex extends CPManager {
     CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId);
 
     /**
-     * Returns all entity docs that match the entityId being the nodeId ( aka connections where entityId = sourceNode)
-     *
-     * @param entityId      The entityId to match when searching
-     * @param markedVersion The version that has been marked for deletion. All version before this one must be deleted.
-     * @return
-     */
-    CandidateResults getNodeDocsOlderThanMarked(final Id entityId, final UUID markedVersion);
-    /**
      * delete all application records
      *
      * @return

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index a35921c..68c139a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -34,7 +34,6 @@ import org.apache.usergrid.persistence.core.migration.data.VersionedData;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.core.util.StringUtils;
-import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder.SearchRequestBuilderStrategyV2;
 import org.apache.usergrid.persistence.index.exceptions.IndexException;
@@ -584,68 +583,6 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
     }
 
 
-    @Override
-    public CandidateResults getNodeDocsOlderThanMarked(final Id entityId, final UUID markedVersion ) {
-
-        // TODO: investigate if functionality via iterator so a caller can page the deletion until all is gone
-
-        Preconditions.checkNotNull( entityId, "entityId cannot be null" );
-        Preconditions.checkNotNull(markedVersion, "markedVersion cannot be null");
-        ValidationUtils.verifyVersion(markedVersion);
-
-        SearchResponse searchResponse;
-        List<CandidateResult> candidates = new ArrayList<>();
-
-        final long markedTimestamp = markedVersion.timestamp();
-
-        // never let this fetch more than 100 to save memory
-        final int searchLimit = Math.min(100, indexFig.getVersionQueryLimit());
-
-        // this query will find all the documents where this entity is a source/target node
-        final QueryBuilder nodeQuery = QueryBuilders
-            .termQuery(IndexingUtils.EDGE_NODE_ID_FIELDNAME, IndexingUtils.nodeId(entityId));
-
-        final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder()
-            .addSort(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME, SortOrder.ASC);
-
-        try {
-
-            long queryTimestamp = 0L;
-
-            QueryBuilder timestampQuery =  QueryBuilders
-                .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
-                .gte(queryTimestamp)
-                .lt(markedTimestamp);
-
-            QueryBuilder finalQuery = QueryBuilders.constantScoreQuery(
-                QueryBuilders
-                    .boolQuery()
-                    .must(timestampQuery)
-                    .must(nodeQuery)
-            );
-
-
-            searchResponse = srb
-                .setQuery(finalQuery)
-                .setSize(searchLimit)
-                .execute()
-                .actionGet();
-
-
-            candidates = aggregateScrollResults(candidates, searchResponse, markedVersion);
-
-        }
-        catch ( Throwable t ) {
-            logger.error( "Unable to communicate with Elasticsearch", t.getMessage() );
-            failureMonitor.fail( "Unable to execute batch", t );
-            throw t;
-        }
-        failureMonitor.success();
-
-        return new CandidateResults( candidates, Collections.EMPTY_SET);
-    }
-
-
     /**
      * Completely delete an index.
      */

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index 513c894..c98c72d 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -661,9 +661,9 @@
             </dependency>
 
             <dependency>
-                <groupId>com.github.stephenc</groupId>
+                <groupId>com.github.jbellis</groupId>
                 <artifactId>jamm</artifactId>
-                <version>0.2.5</version>
+                <version>0.3.1</version>
             </dependency>
 
             <!-- Third Party Non-Commercial Dependencies -->
@@ -1311,7 +1311,7 @@
                     <useSystemClassLoader>false</useSystemClassLoader>
                     <testFailureIgnore>false</testFailureIgnore>
                     <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin}
-                        -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                        -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                         ${ug.argline}
                     </argLine>
                     <systemPropertyVariables>
@@ -1548,7 +1548,7 @@
                         <version>${surefire.plugin.version}</version>
                         <configuration>
                             <argLine>
-                                -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                                -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                                 ${ug.argline}
                                 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec
                             </argLine>

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/rest/pom.xml
----------------------------------------------------------------------
diff --git a/stack/rest/pom.xml b/stack/rest/pom.xml
index 9bb83a6..e7aa68a 100644
--- a/stack/rest/pom.xml
+++ b/stack/rest/pom.xml
@@ -93,7 +93,7 @@
                     <argLine>-Dwebapp.directory=${basedir}/src/main/webapp
                         -Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true -Xmx${ug.heapmax}
                         -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
-                        -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                        -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                         -Djava.util.logging.config.file=${basedir}/src/test/resources/logging.properties ${ug.argline}
                     </argLine>
                     <includes>
@@ -460,7 +460,7 @@
                                 -Dwebapp.directory=${basedir}/src/main/webapp
                                 -Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true
                                 -Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
-                                -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                                -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                                 -Djava.util.logging.config.file=${basedir}/src/test/resources/logging.properties
                                 ${ug.argline}
                             </argLine>

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/rest/src/test/resources/project.properties
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/resources/project.properties b/stack/rest/src/test/resources/project.properties
index 94ef3bd..20a38f6 100644
--- a/stack/rest/src/test/resources/project.properties
+++ b/stack/rest/src/test/resources/project.properties
@@ -15,4 +15,4 @@
 # limitations under the License.
 
 target.directory=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/services/pom.xml
----------------------------------------------------------------------
diff --git a/stack/services/pom.xml b/stack/services/pom.xml
index b1df1b4..29fa311 100644
--- a/stack/services/pom.xml
+++ b/stack/services/pom.xml
@@ -102,7 +102,7 @@
                     <useSystemClassLoader>false</useSystemClassLoader>
                     <argLine>-Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true
                         -Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
-                        -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                        -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                         ${ug.argline} -Dlog4j.configuration=file:${basedir}/src/test/resources/log4j.properties
                     </argLine>
                     <includes>
@@ -499,7 +499,7 @@
                         <configuration>
                             <argLine>-Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true
                                 -Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
-                                -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+                                -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar
                                 ${ug.argline}
                                 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec
                             </argLine>

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/services/src/test/resources/project.properties
----------------------------------------------------------------------
diff --git a/stack/services/src/test/resources/project.properties b/stack/services/src/test/resources/project.properties
index d38e878..03736c0 100644
--- a/stack/services/src/test/resources/project.properties
+++ b/stack/services/src/test/resources/project.properties
@@ -16,4 +16,4 @@
 
 target.directory=${project.build.directory}
 resources.dir=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/test-utils/pom.xml
----------------------------------------------------------------------
diff --git a/stack/test-utils/pom.xml b/stack/test-utils/pom.xml
index bbcf1ff..f99d43a 100644
--- a/stack/test-utils/pom.xml
+++ b/stack/test-utils/pom.xml
@@ -59,7 +59,7 @@
                        <threadCount>${usergrid.it.threads}</threadCount>
                        <threadCountClasses></threadCountClasses>
                        <reuseForks>true</reuseForks>
-                       <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8  -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
+                       <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8  -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.0/jamm-0.3.0.jar ${ug.argline}</argLine>
                         <includes>
                            <include>**/CassandraResourceITSuite.java</include>
                         </includes>
@@ -290,7 +290,7 @@
                         <artifactId>maven-surefire-plugin</artifactId>
                         <version>${surefire.plugin.version}</version>
                         <configuration>
-                            <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
+                            <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.0/jamm-0.3.0.jar ${ug.argline}</argLine>
                         </configuration>
                     </plugin>
                 </plugins>

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/test-utils/src/test/resources/project.properties
----------------------------------------------------------------------
diff --git a/stack/test-utils/src/test/resources/project.properties b/stack/test-utils/src/test/resources/project.properties
index cd5b819..0bc9bb7 100644
--- a/stack/test-utils/src/test/resources/project.properties
+++ b/stack/test-utils/src/test/resources/project.properties
@@ -14,4 +14,4 @@
 # limitations under the License.
 
 target.directory=${project.build.directory}
-jamm.path=-javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar
+jamm.path=-javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.1/jamm-0.3.1.jar

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/tools/pom.xml
----------------------------------------------------------------------
diff --git a/stack/tools/pom.xml b/stack/tools/pom.xml
index cbd2c1e..b34b068 100644
--- a/stack/tools/pom.xml
+++ b/stack/tools/pom.xml
@@ -61,7 +61,7 @@
             <storage-config>${basedir}/src/test/conf</storage-config>
           </systemPropertyVariables>
           <forkMode>always</forkMode>
-          <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
+          <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.0/jamm-0.3.0.jar ${ug.argline}</argLine>
         </configuration>
 
       </plugin>

http://git-wip-us.apache.org/repos/asf/usergrid/blob/99ba349c/stack/websocket/pom.xml
----------------------------------------------------------------------
diff --git a/stack/websocket/pom.xml b/stack/websocket/pom.xml
index af5ed56..72a5c9d 100644
--- a/stack/websocket/pom.xml
+++ b/stack/websocket/pom.xml
@@ -70,7 +70,7 @@
             <storage-config>${basedir}/src/test/conf</storage-config>
           </systemPropertyVariables>
           <forkMode>always</forkMode>
-          <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline}</argLine>
+          <argLine>-Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/org/jacoco/org.jacoco.agent/${jacoco.version}/org.jacoco.agent-${jacoco.version}-runtime.jar=destfile=${project.build.directory}/jacoco.exec -javaagent:${settings.localRepository}/com/github/jbellis/jamm/0.3.0/jamm-0.3.0.jar ${ug.argline}</argLine>
         </configuration>
       </plugin>
     </plugins>