You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/06/14 17:15:14 UTC

[35/44] usergrid git commit: USERGRID-1295 - Re-introduce a more efficient de-index upon entity delete and entity updates. Remove the inefficient code as a safety measure so it can't be used again.

USERGRID-1295 - Re-introduce a more efficient de-index upon entity delete and entity updates.  Remove the inefficient code as a safety measure so it can't be used again.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7143cbaf
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7143cbaf
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7143cbaf

Branch: refs/heads/usergrid-1268-akka-211
Commit: 7143cbaf6c26b4db00d14fb3a1b9e3eb8a2e068e
Parents: 66bb5cd
Author: Michael Russo <mr...@apigee.com>
Authored: Fri Jun 3 22:18:12 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri Jun 3 22:18:12 2016 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |   2 +-
 .../corepersistence/EntityManagerFig.java       |   2 +-
 .../asyncevents/AsyncEventService.java          |   6 +-
 .../asyncevents/AsyncEventServiceImpl.java      |  11 +-
 .../asyncevents/EventBuilder.java               |   6 +-
 .../asyncevents/EventBuilderImpl.java           |  46 ++++---
 .../model/DeIndexOldVersionsEvent.java          |  12 +-
 .../corepersistence/index/IndexService.java     |  23 +++-
 .../corepersistence/index/IndexServiceImpl.java |  86 +++++++------
 .../collection/EntityCollectionManager.java     |  10 +-
 .../impl/EntityCollectionManagerImpl.java       |  32 ++++-
 .../serialization/SerializationFig.java         |   5 +
 .../serialization/impl/LogEntryIterator.java    | 128 +++++++++++++++++++
 .../usergrid/persistence/index/EntityIndex.java |   5 +-
 .../usergrid/persistence/index/IndexFig.java    |   2 +-
 .../index/impl/EsEntityIndexImpl.java           |  64 +++-------
 .../persistence/index/impl/EntityIndexTest.java |  41 ------
 17 files changed, 313 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 68f5d71..fd31cf6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -610,7 +610,7 @@ public class CpEntityManager implements EntityManager {
 
             // queue up an event to clean-up older versions than this one from the index
             if (entityManagerFig.getDeindexOnUpdate()) {
-                indexService.queueDeIndexOldVersion( applicationScope, entityId );
+                indexService.queueDeIndexOldVersion( applicationScope, cpEntity.getId(), cpEntity.getVersion());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
index 4c50aee..655a968 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/EntityManagerFig.java
@@ -39,6 +39,6 @@ public interface EntityManagerFig extends GuicyFig {
     int sleep();
 
     @Key( "usergrid.entityManager.enable_deindex_on_update" )
-    @Default( "false" )
+    @Default( "true" )
     boolean getDeindexOnUpdate();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 9f34604..d833cf7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -27,6 +27,8 @@ import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import java.util.UUID;
+
 
 /**
  * Low level queue service for events in the entity.  These events are fire and forget, and will always be asynchronous
@@ -83,11 +85,11 @@ public interface AsyncEventService extends ReIndexAction {
     void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage );
 
     /**
-     *
      * @param applicationScope
      * @param entityId
+     * @param markedVersion
      */
-    void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId);
+    void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId, UUID markedVersion);
 
     /**
      * current queue depth

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 3b01292..fa175ab 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -584,11 +584,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
 
     @Override
-    public void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId) {
+    public void queueDeIndexOldVersion(final ApplicationScope applicationScope, final Id entityId, UUID markedVersion) {
 
         // queue the de-index of old versions to the topic so cleanup happens in all regions
         offerTopic( new DeIndexOldVersionsEvent( queueFig.getPrimaryRegion(),
-            new EntityIdScope( applicationScope, entityId)) );
+            new EntityIdScope( applicationScope, entityId), markedVersion) );
 
     }
 
@@ -596,10 +596,11 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     public IndexOperationMessage handleDeIndexOldVersionEvent ( final DeIndexOldVersionsEvent deIndexOldVersionsEvent){
 
 
-        ApplicationScope applicationScope = deIndexOldVersionsEvent.getEntityIdScope().getApplicationScope();
-        Id entityId = deIndexOldVersionsEvent.getEntityIdScope().getId();
+        final ApplicationScope applicationScope = deIndexOldVersionsEvent.getEntityIdScope().getApplicationScope();
+        final Id entityId = deIndexOldVersionsEvent.getEntityIdScope().getId();
+        final UUID markedVersion = deIndexOldVersionsEvent.getMarkedVersion();
 
-        return eventBuilder.deIndexOlderVersions( applicationScope, entityId )
+        return eventBuilder.deIndexOldVersions( applicationScope, entityId, markedVersion )
             .toBlocking().lastOrDefault(null);
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index 1f62029..4db9f4b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -21,12 +21,12 @@ package org.apache.usergrid.corepersistence.asyncevents;
 
 
 import java.util.List;
+import java.util.UUID;
 
 import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.impl.IndexOperation;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -78,9 +78,11 @@ public interface EventBuilder {
      * Find all versions of the entity older than the latest and de-index them.
      * @param applicationScope
      * @param entityId
+     * @param markedVersion
      * @return
      */
-    Observable<IndexOperationMessage> deIndexOlderVersions(ApplicationScope applicationScope, Id entityId );
+    Observable<IndexOperationMessage> deIndexOldVersions( ApplicationScope applicationScope,
+                                                          Id entityId, UUID markedVersion );
 
     /**
      * A bean to hold both our observables so the caller can choose the subscription mechanism.  Note that

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 5c827c6..02a7588 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -20,11 +20,10 @@
 package org.apache.usergrid.corepersistence.asyncevents;
 
 
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.usergrid.persistence.collection.VersionSet;
 import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -118,7 +117,7 @@ public class EventBuilderImpl implements EventBuilder {
 
         //TODO USERGRID-1123: Implement so we don't iterate logs twice (latest DELETED version, then to get all DELETED)
 
-        MvccLogEntry mostRecentlyMarked = ecm.getVersions( entityId ).toBlocking()
+        MvccLogEntry mostRecentlyMarked = ecm.getVersionsFromMaxToMin( entityId, UUIDUtils.newTimeUUID() ).toBlocking()
             .firstOrDefault( null, mvccLogEntry -> mvccLogEntry.getState() == MvccLogEntry.State.DELETED );
 
         // De-indexing and entity deletes don't check log entries.  We must do that first. If no DELETED logs, then
@@ -127,13 +126,16 @@ public class EventBuilderImpl implements EventBuilder {
         Observable<List<MvccLogEntry>> ecmDeleteObservable = Observable.empty();
 
         if(mostRecentlyMarked != null){
+
+            // fetch entity versions to be de-index by looking in cassandra
             deIndexObservable =
-                indexService.deleteEntityIndexes( applicationScope, entityId, mostRecentlyMarked.getVersion() );
+                indexService.deIndexEntity(applicationScope, entityId, mostRecentlyMarked.getVersion(),
+                    getVersionsOlderThanMarked(ecm, entityId, mostRecentlyMarked.getVersion()));
 
             ecmDeleteObservable =
-                ecm.getVersions( entityId )
+                ecm.getVersionsFromMaxToMin( entityId, mostRecentlyMarked.getVersion() )
                     .filter( mvccLogEntry->
-                        UUIDUtils.compare(mvccLogEntry.getVersion(), mostRecentlyMarked.getVersion()) <= 0)
+                        mvccLogEntry.getVersion().timestamp() <= mostRecentlyMarked.getVersion().timestamp() )
                     .buffer( serializationFig.getBufferSize() )
                     .doOnNext( buffer -> ecm.delete( buffer ) );
         }
@@ -173,7 +175,8 @@ public class EventBuilderImpl implements EventBuilder {
 
 
     @Override
-    public Observable<IndexOperationMessage> deIndexOlderVersions(final ApplicationScope applicationScope, Id entityId ){
+    public Observable<IndexOperationMessage> deIndexOldVersions( final ApplicationScope applicationScope,
+                                                                 final Id entityId, final UUID markedVersion ){
 
         if (logger.isDebugEnabled()) {
             logger.debug("Removing old versions of entity {} from index in app scope {}", entityId, applicationScope );
@@ -181,24 +184,31 @@ public class EventBuilderImpl implements EventBuilder {
 
         final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
 
-        // find all versions of the entity that come before the provided entityId
-        VersionSet latestVersions = ecm.getLatestVersion(Collections.singletonList(entityId) ).toBlocking()
-            .firstOrDefault( null );
 
-        // If there are no versions before this, allow it to return an empty observable
-        Observable<IndexOperationMessage> deIndexObservable = Observable.empty();
+        return indexService.deIndexOldVersions( applicationScope, entityId,
+            getVersionsOlderThanMarked(ecm, entityId, markedVersion), markedVersion);
 
-        if(latestVersions.getMaxVersion(entityId) != null){
+    }
 
-            UUID latestVersion = latestVersions.getMaxVersion(entityId).getVersion();
 
-            deIndexObservable =
-                indexService.deleteEntityIndexes( applicationScope, entityId, latestVersion);
+    private List<UUID> getVersionsOlderThanMarked( final EntityCollectionManager ecm,
+                                                   final Id entityId, final UUID markedVersion ){
 
-        }
+        final List<UUID> versions = new ArrayList<>();
+
+        // only take last 5 versions to avoid eating memory. a tool can be built for massive cleanups for old usergrid
+        // clusters that do not have this in-line cleanup
+        ecm.getVersionsFromMaxToMin( entityId, markedVersion)
+            .take(5)
+            .forEach( mvccLogEntry -> {
+                if ( mvccLogEntry.getVersion().timestamp() < markedVersion.timestamp() ) {
+                    versions.add(mvccLogEntry.getVersion());
+                }
+
+            });
 
-        return  deIndexObservable;
 
+        return versions;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java
index 59694d5..1f00e14 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/DeIndexOldVersionsEvent.java
@@ -21,6 +21,8 @@ package org.apache.usergrid.corepersistence.asyncevents.model;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 
+import java.util.UUID;
+
 
 /**
  * An index event de-indexing documents for Entity versions older than the provided Entity
@@ -31,12 +33,16 @@ public final class DeIndexOldVersionsEvent extends AsyncEvent {
     @JsonProperty
     protected EntityIdScope entityIdScope;
 
+    @JsonProperty
+    protected UUID markedVersion;
+
     public DeIndexOldVersionsEvent() {
     }
 
-    public DeIndexOldVersionsEvent(String sourceRegion, EntityIdScope entityIdScope) {
+    public DeIndexOldVersionsEvent(String sourceRegion, EntityIdScope entityIdScope, UUID markedVersion) {
         super(sourceRegion);
         this.entityIdScope = entityIdScope;
+        this.markedVersion = markedVersion;
     }
 
 
@@ -47,4 +53,8 @@ public final class DeIndexOldVersionsEvent extends AsyncEvent {
     public EntityIdScope getEntityIdScope() {
         return entityIdScope;
     }
+
+    public UUID getMarkedVersion() {
+        return  markedVersion;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
index 54eb464..b989a9c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexService.java
@@ -20,17 +20,16 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import rx.Observable;
-import rx.observables.ConnectableObservable;
 
 
 /**
@@ -71,18 +70,28 @@ public interface IndexService {
     Observable<IndexOperationMessage> deleteIndexEdge(final ApplicationScope applicationScope, final Edge edge);
 
 
-
-
     /**
-     * Delete all indexes with the specified entityId
+     * De-index all documents with the specified entityId and versions provided.  This will also remove any documents
+     * where the entity is a source/target node ( index docs where this entityId is a part of connections).
      *
      * @param applicationScope
      * @param entityId
+     * @param markedVersion
      * @return
      */
-    Observable<IndexOperationMessage> deleteEntityIndexes(final ApplicationScope applicationScope, final Id entityId,
-                                                         final UUID markedVersion);
+    Observable<IndexOperationMessage> deIndexEntity(final ApplicationScope applicationScope, final Id entityId,
+                                                    final UUID markedVersion, final List<UUID> allVersionsBeforeMarked);
 
 
+    /**
+     * De-index all documents with the specified entityId and versions of the entityId provided
+     *
+     * @param applicationScope
+     * @param entityId
+     * @param markedVersion
+     * @return
+     */
+    Observable<IndexOperationMessage> deIndexOldVersions(final ApplicationScope applicationScope, final Id entityId,
+                                                         final List<UUID> versions, UUID markedVersion);
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 9509626..54b18bb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -20,12 +20,7 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,8 +51,6 @@ import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.utils.InflectionUtils;
-import org.apache.usergrid.utils.JsonUtils;
-import org.apache.usergrid.utils.UUIDUtils;
 
 import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
@@ -66,9 +59,7 @@ import com.google.inject.Singleton;
 
 import rx.Observable;
 
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createSearchEdgeFromSource;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromTarget;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.*;
 import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 
 
@@ -278,42 +269,61 @@ public class IndexServiceImpl implements IndexService {
         return ObservableTimer.time( batches, addTimer );
     }
 
-    //This should look up the entityId and delete any documents with a timestamp that comes before
-    //The edges that are connected will be compacted away from the graph.
+
     @Override
-    public Observable<IndexOperationMessage> deleteEntityIndexes( final ApplicationScope applicationScope,
-                                                                  final Id entityId, final UUID markedVersion ) {
+    public Observable<IndexOperationMessage> deIndexEntity( final ApplicationScope applicationScope, final Id entityId,
+                                                            final UUID markedVersion,
+                                                            final List<UUID> allVersionsBeforeMarked ) {
 
-        //bootstrap the lower modules from their caches
-        final EntityIndex ei = entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
+        final EntityIndex ei = entityIndexFactory.
+            createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
 
-        CandidateResults crs = ei.getAllEntityVersionsBeforeMarkedVersion( entityId, markedVersion );
 
-        //If we get no search results, its possible that something was already deleted or
-        //that it wasn't indexed yet. In either case we can't delete anything and return an empty observable..
-        if(crs.isEmpty()) {
-            return Observable.empty();
-        }
+        final SearchEdge searchEdgeFromSource = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
+            CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId,
+            entityId.getUuid().timestamp() ) );
+
+
+        final EntityIndexBatch batch = ei.createBatch();
+
+        // de-index each version of the entity before the marked version
+        allVersionsBeforeMarked.forEach(version -> batch.deindex(searchEdgeFromSource, entityId, version));
 
-        UUID timeUUID = UUIDUtils.isTimeBased(entityId.getUuid()) ? entityId.getUuid() : UUIDUtils.newTimeUUID();
-        //not actually sure about the timestamp but ah well. works.
-        SearchEdge searchEdge = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
+
+        // for now, query the index to remove docs where the entity is source/target node and older than markedVersion
+        // TODO: investigate getting this information from graph
+        CandidateResults candidateResults = ei.getNodeDocsOlderThanMarked(entityId, markedVersion );
+        candidateResults.forEach(candidateResult -> batch.deindex(candidateResult));
+
+        return Observable.just(batch.build());
+
+    }
+
+    @Override
+    public Observable<IndexOperationMessage> deIndexOldVersions(final ApplicationScope applicationScope,
+                                                                final Id entityId,
+                                                                final List<UUID> versions,
+                                                                UUID markedVersion) {
+
+        final EntityIndex ei = entityIndexFactory.
+            createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
+
+
+        final SearchEdge searchEdgeFromSource = createSearchEdgeFromSource( new SimpleEdge( applicationScope.getApplication(),
             CpNamingUtils.getEdgeTypeFromCollectionName( InflectionUtils.pluralize( entityId.getType() ) ), entityId,
-            timeUUID.timestamp() ) );
+             entityId.getUuid().timestamp() ) );
 
 
-        final Observable<IndexOperationMessage>  batches = Observable.from( crs )
-                //collect results into a single batch
-                .collect( () -> ei.createBatch(), ( batch, candidateResult ) -> {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Deindexing on edge {} for entity {} added to batch", searchEdge, entityId);
-                    }
-                    batch.deindex( candidateResult );
-                } )
-                    //return the future from the batch execution
-                .map( batch ->batch.build() );
+        final EntityIndexBatch batch = ei.createBatch();
+
+        versions.forEach( version -> {
+
+            batch.deindex(searchEdgeFromSource, entityId, version);
+
+        });
+
+        return Observable.just(batch.build());
 
-        return ObservableTimer.time(batches, indexTimer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 9de8f41..22fbb5f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.collection;
 
 
 import java.util.Collection;
+import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -98,13 +99,20 @@ public interface EntityCollectionManager {
     Observable<EntitySet> load( Collection<Id> entityIds );
 
     /**
-     * Get all versions of the log entry, from Max to min
+     * Get all versions of the log entry, from min to max
      * @param entityId
      * @return An observable stream of mvccLog entries
      */
     Observable<MvccLogEntry> getVersions(final Id entityId);
 
     /**
+     * Get all versions of the log entry, from max to min
+     * @param entityId
+     * @return An observable stream of mvccLog entries
+     */
+    Observable<MvccLogEntry> getVersionsFromMaxToMin(final Id entityId, final UUID startVersion);
+
+    /**
      * Delete these versions from cassandra.  Must be atomic so that read log entries are only removed.  Entity data
      * and log entry will be deleted
      * @param entries

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index e71e6bb..70b06ba 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.UUID;
 
 import com.netflix.astyanax.model.ConsistencyLevel;
+import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -279,6 +280,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         } );
     }
 
+    @Override
+    public Observable<MvccLogEntry> getVersionsFromMaxToMin( final Id entityId, final UUID startVersion ) {
+        ValidationUtils.verifyIdentity( entityId );
+
+        return Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) {
+            @Override
+            protected Iterator<MvccLogEntry> getIterator() {
+                return new LogEntryIterator( mvccLogEntrySerializationStrategy, applicationScope, entityId, startVersion,
+                    serializationFig.getBufferSize() );
+            }
+        } );
+    }
+
 
     @Override
     public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) {
@@ -359,6 +373,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                     if ( entity == null || !entity.getEntity().isPresent() ) {
                         final MutationBatch valueDelete =
                             uniqueValueSerializationStrategy.delete( applicationScope, expectedUnique );
+
                         deleteBatch.mergeShallow( valueDelete );
                         continue;
                     }
@@ -370,10 +385,23 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                     response.addEntity( expectedUnique.getField(), entity );
                 }
 
-                //TODO: explore making this an Async process
-                //We'll repair it again if we have to
                 deleteBatch.execute();
 
+                // optionally sleep after read repair as some tasks immediately try to write after the delete
+                if ( serializationFig.getReadRepairDelay() > 0 ){
+
+                    try {
+
+                        Thread.sleep(Math.min(serializationFig.getReadRepairDelay(), 200L));
+
+                    } catch (InterruptedException e) {
+
+                        // do nothing if sleep fails; log and continue on
+                        logger.warn("Sleep during unique value read repair failed.");
+                    }
+
+                }
+
                 return response;
             }
             catch ( ConnectionException e ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
index 96759ba..12033fe 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
@@ -57,4 +57,9 @@ public interface SerializationFig extends GuicyFig {
     @Key ( "usergrid.uniqueverify.poolsize" )
     @Default( "150" )
     int getUniqueVerifyPoolSize();
+
+
+    @Key ( "collection.readrepair.delay" )
+    @Default( "0" ) // in milliseconds
+    int getReadRepairDelay();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
new file mode 100644
index 0000000..de6b2bc
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/LogEntryIterator.java
@@ -0,0 +1,128 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Iterator that will iterate all versions of the entity from max to min
+ */
+public class LogEntryIterator implements Iterator<MvccLogEntry> {
+
+
+    private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private final ApplicationScope scope;
+    private final Id entityId;
+    private final int pageSize;
+
+
+    private Iterator<MvccLogEntry> elementItr;
+    private UUID nextStart;
+    private UUID startVersion;
+
+
+    /**
+     * @param logEntrySerializationStrategy The serialization strategy to get the log entries
+     * @param scope The scope of the entity
+     * @param entityId The id of the entity
+     * @param pageSize The fetch size to get when querying the serialization strategy
+     */
+    public LogEntryIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                             final ApplicationScope scope, final Id entityId,
+                             final UUID startVersion, final int pageSize ) {
+
+        Preconditions.checkArgument( pageSize > 0, "pageSize must be > 0" );
+
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.scope = scope;
+        this.entityId = entityId;
+        this.pageSize = pageSize;
+        this.startVersion = startVersion;
+
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
+            try {
+                advance();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to query cassandra", e );
+            }
+        }
+
+        return elementItr.hasNext();
+    }
+
+
+    @Override
+    public MvccLogEntry next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more elements exist" );
+        }
+
+        return elementItr.next();
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported" );
+    }
+
+
+    /**
+     * Advance our iterator
+     */
+    public void advance() throws ConnectionException {
+
+        final int requestedSize;
+        UUID start;
+
+        if ( nextStart != null ) {
+            requestedSize = pageSize + 1;
+            start = nextStart;
+        }
+        else {
+            requestedSize = pageSize;
+            start = startVersion;
+        }
+
+        //loop through even entry that's < this one and remove it
+        List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, start, requestedSize );
+
+        //we always remove the first version if it's equal since it's returned
+        if ( nextStart != null && results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
+            results.remove( 0 );
+        }
+
+
+
+        //we have results, set our next start.  If we miss our start version (due to deletion) and we request a +1, we want to ensure we set our next, hence the >=
+        if ( results.size() >= pageSize ) {
+            nextStart = results.get( results.size() - 1 ).getVersion();
+        }
+        //nothing left to do
+        else {
+            nextStart = null;
+        }
+
+
+
+
+        elementItr = results.iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index 81f900c..8ab2d41 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -113,14 +113,13 @@ public interface EntityIndex extends CPManager {
     CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId);
 
     /**
-     * Returns all entity documents that match the entityId and come before the marked version
+     * Returns all entity docs that match the entityId being the nodeId ( aka connections where entityId = sourceNode)
      *
      * @param entityId      The entityId to match when searching
      * @param markedVersion The version that has been marked for deletion. All version before this one must be deleted.
      * @return
      */
-    CandidateResults getAllEntityVersionsBeforeMarkedVersion(final Id entityId, final UUID markedVersion);
-
+    CandidateResults getNodeDocsOlderThanMarked(final Id entityId, final UUID markedVersion);
     /**
      * delete all application records
      *

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index e81219a..3dbf5e5 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -202,7 +202,7 @@ public interface IndexFig extends GuicyFig {
     @Key( "elasticsearch_queue_error_sleep_ms" )
     long getSleepTimeForQueueError();
 
-    @Default("1000")
+    @Default("100")
     @Key( ELASTICSEARCH_VERSION_QUERY_LIMIT )
     int getVersionQueryLimit();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 10ee91e..3b60b57 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -533,7 +533,9 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
 
 
     @Override
-    public CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID markedVersion ) {
+    public CandidateResults getNodeDocsOlderThanMarked(final Id entityId, final UUID markedVersion ) {
+
+        // TODO: investigate if functionality via iterator so a caller can page the deletion until all is gone
 
         Preconditions.checkNotNull( entityId, "entityId cannot be null" );
         Preconditions.checkNotNull(markedVersion, "markedVersion cannot be null");
@@ -544,12 +546,8 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
 
         final long markedTimestamp = markedVersion.timestamp();
 
-        // never let the limit be less than 2 as there are potential indefinite paging issues
-        final int searchLimit = Math.max(2, indexFig.getVersionQueryLimit());
-
-        // this query will find the document for the entity itself
-        final QueryBuilder entityQuery = QueryBuilders
-            .termQuery(IndexingUtils.ENTITY_ID_FIELDNAME, IndexingUtils.entityId(entityId));
+        // never let this fetch more than 100 to save memory
+        final int searchLimit = Math.min(100, indexFig.getVersionQueryLimit());
 
         // this query will find all the documents where this entity is a source/target node
         final QueryBuilder nodeQuery = QueryBuilders
@@ -562,49 +560,25 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
 
             long queryTimestamp = 0L;
 
-            while(true){
+            QueryBuilder timestampQuery =  QueryBuilders
+                .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
+                .gte(queryTimestamp)
+                .lt(markedTimestamp);
 
-                QueryBuilder timestampQuery =  QueryBuilders
-                    .rangeQuery(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME)
-                    .gte(queryTimestamp)
-                    .lte(markedTimestamp);
+            QueryBuilder finalQuery = QueryBuilders
+                .boolQuery()
+                .must(timestampQuery)
+                .must(nodeQuery);
 
-                QueryBuilder entityQueryWithTimestamp = QueryBuilders
-                    .boolQuery()
-                    .must(entityQuery)
-                    .must(timestampQuery);
+            searchResponse = srb
+                .setQuery(finalQuery)
+                .setSize(searchLimit)
+                .execute()
+                .actionGet();
 
-                QueryBuilder finalQuery = QueryBuilders
-                    .boolQuery()
-                    .should(entityQueryWithTimestamp)
-                    .should(nodeQuery)
-                    .minimumNumberShouldMatch(1);
 
-                searchResponse = srb
-                    .setQuery(finalQuery)
-                    .setSize(searchLimit)
-                    .execute()
-                    .actionGet();
+            candidates = aggregateScrollResults(candidates, searchResponse, markedVersion);
 
-                int responseSize = searchResponse.getHits().getHits().length;
-                if(responseSize == 0){
-                    break;
-                }
-
-                candidates = aggregateScrollResults(candidates, searchResponse, markedVersion);
-
-                // update queryTimestamp to be the timestamp of the last entity returned from the query
-                queryTimestamp = (long) searchResponse
-                    .getHits().getAt(responseSize - 1)
-                    .getSource().get(IndexingUtils.EDGE_TIMESTAMP_FIELDNAME);
-
-
-                if(responseSize < searchLimit){
-
-                    break;
-                }
-
-            }
         }
         catch ( Throwable t ) {
             logger.error( "Unable to communicate with Elasticsearch", t.getMessage() );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7143cbaf/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 008ec80..c84635d 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -383,47 +383,6 @@ public class EntityIndexTest extends BaseIT {
     }
 
 
-    /**
-     * Tests that we aggregate results only before the halfway version point.
-     */
-    @Test
-    public void testScollingDeindex() {
-
-        int numberOfEntities = 1000;
-        int versionToSearchFor = numberOfEntities / 2;
-
-
-        UUID entityUUID = UUID.randomUUID();
-        Id entityId = new SimpleId( "mehCar" );
-
-        Map entityMap = new HashMap() {{
-            put( "name", "Toyota Corolla" );
-            put( "introduced", 1966 );
-            put( "topspeed", 111 );
-        }};
-
-        Entity[] entity = new Entity[numberOfEntities];
-        for(int i = 0; i < numberOfEntities; i++) {
-            entity[i] = EntityIndexMapUtils.fromMap( entityMap );
-            EntityUtils.setId(entity[i], entityId);
-            EntityUtils.setVersion(entity[i], UUIDGenerator.newTimeUUID());
-            entity[i].setField(new UUIDField(IndexingUtils.ENTITY_ID_FIELDNAME, entityUUID));
-
-            IndexEdge searchEdge = new IndexEdgeImpl( appId, "mehCars", SearchEdge.NodeType.SOURCE, System.currentTimeMillis()*1000 );
-
-            //index the new entity. This is where the loop will be set to create like 100 entities.
-            indexProducer.put(entityIndex.createBatch().index( searchEdge, entity[i] ).build()).subscribe();
-
-        }
-        entityIndex.refreshAsync().toBlocking().first();
-
-        CandidateResults candidateResults = entityIndex
-            .getAllEntityVersionsBeforeMarkedVersion( entity[versionToSearchFor].getId(),
-                entity[versionToSearchFor].getVersion() );
-        assertEquals( 501, candidateResults.size() );
-    }
-
-
 
     private CandidateResults testQuery( final SearchEdge scope, final SearchTypes searchTypes,
                                       final String queryString,