You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/10 00:04:34 UTC

[14/16] git commit: Refactored loader to group for batching, then call filters to perform the load, verification and result construction

Refactored loader to group for batching, then call filters to perform the load, verification and result construction


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6285e605
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6285e605
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6285e605

Branch: refs/heads/two-dot-o
Commit: 6285e6057832580d947106f845852254397eb56c
Parents: b3515f4
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 9 15:03:51 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 9 15:49:55 2014 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |   3 +
 .../corepersistence/CpEntityManagerFactory.java |   2 +
 .../corepersistence/CpEntityMapUtils.java       | 323 -------------------
 .../corepersistence/CpManagerCache.java         |   2 +-
 .../usergrid/corepersistence/CpNamingUtils.java | 106 ------
 .../corepersistence/CpRelationManager.java      | 140 +-------
 .../results/AbstractIdLoader.java               | 130 --------
 .../corepersistence/results/EntitiesLoader.java |  51 ---
 .../corepersistence/results/EntityVerifier.java | 142 ++++++++
 .../results/FilteringLoader.java                | 240 ++++++++++++++
 .../corepersistence/results/IdsLoader.java      |  46 ---
 .../corepersistence/results/IdsVerifier.java    |  65 ++++
 .../corepersistence/results/RefsLoader.java     |  46 ---
 .../corepersistence/results/RefsVerifier.java   |  62 ++++
 .../corepersistence/results/ResultsLoader.java  |   5 +-
 .../results/ResultsLoaderFactory.java           |   9 +-
 .../results/ResultsLoaderFactoryImpl.java       |  42 ++-
 .../results/ResultsVerifier.java                |  72 +++++
 .../results/VersionVerifier.java                | 103 ++++++
 .../corepersistence/util/CpEntityMapUtils.java  | 323 +++++++++++++++++++
 .../corepersistence/util/CpNamingUtils.java     | 106 ++++++
 .../corepersistence/CpEntityMapUtilsTest.java   |   2 +
 .../corepersistence/StaleIndexCleanupTest.java  |   1 +
 23 files changed, 1173 insertions(+), 848 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 1a8f17c..657ee2d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -54,6 +54,9 @@ import me.prettyprint.hector.api.query.QueryResult;
 import me.prettyprint.hector.api.query.SliceCounterQuery;
 import static org.apache.commons.lang.StringUtils.capitalize;
 import static org.apache.commons.lang.StringUtils.isBlank;
+
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.AggregateCounter;
 import org.apache.usergrid.persistence.AggregateCounterSet;
 import org.apache.usergrid.persistence.CollectionRef;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index f44eac3..17b33e4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 import org.apache.commons.lang.StringUtils;
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.AbstractEntity;
 import org.apache.usergrid.persistence.DynamicEntity;
 import org.apache.usergrid.persistence.Entity;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityMapUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityMapUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityMapUtils.java
deleted file mode 100644
index efbef0b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityMapUtils.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-
-
-package org.apache.usergrid.corepersistence;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.Schema;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.field.ArrayField;
-import org.apache.usergrid.persistence.model.field.BooleanField;
-import org.apache.usergrid.persistence.model.field.ByteArrayField;
-import org.apache.usergrid.persistence.model.field.DoubleField;
-import org.apache.usergrid.persistence.model.field.EntityObjectField;
-import org.apache.usergrid.persistence.model.field.Field;
-import org.apache.usergrid.persistence.model.field.FloatField;
-import org.apache.usergrid.persistence.model.field.IntegerField;
-import org.apache.usergrid.persistence.model.field.ListField;
-import org.apache.usergrid.persistence.model.field.LocationField;
-import org.apache.usergrid.persistence.model.field.LongField;
-import org.apache.usergrid.persistence.model.field.SetField;
-import org.apache.usergrid.persistence.model.field.StringField;
-import org.apache.usergrid.persistence.model.field.UUIDField;
-import org.apache.usergrid.persistence.model.field.value.EntityObject;
-import org.apache.usergrid.persistence.model.field.value.Location;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utilities for converting entities to/from maps suitable for Core Persistence.
- * Aware of unique properties via Schema.
- */
-public class CpEntityMapUtils {
-    private static final Logger logger = LoggerFactory.getLogger( CpEntityMapUtils.class );
-
-    public static ObjectMapper objectMapper = new ObjectMapper(  );
-
-    public static Entity fromMap( Map<String, Object> map, String entityType, boolean topLevel ) {
-        return fromMap( null, map, entityType, topLevel );
-    }
-
-    public static Entity fromMap( Entity entity, Map<String, Object> map, String entityType, boolean topLevel ) {
-
-        if ( entity == null ) {
-            entity = new Entity();
-        }
-
-        for ( String fieldName : map.keySet() ) {
-
-            Object value = map.get( fieldName );
-            boolean unique = Schema.getDefaultSchema().isPropertyUnique(entityType, fieldName);
-
-//            if ( unique ) {
-//                logger.debug("{} is a unique property", fieldName );
-//            }
-
-            if ( value instanceof String ) {
-                entity.setField( new StringField( fieldName, (String)value, unique && topLevel ));
-
-            } else if ( value instanceof Boolean ) {
-                entity.setField( new BooleanField( fieldName, (Boolean)value, unique && topLevel ));
-                        
-            } else if ( value instanceof Integer ) {
-                entity.setField( new IntegerField( fieldName, (Integer)value, unique && topLevel ));
-
-            } else if ( value instanceof Double ) {
-                entity.setField( new DoubleField( fieldName, (Double)value, unique && topLevel ));
-
-		    } else if ( value instanceof Float ) {
-                entity.setField( new FloatField( fieldName, (Float)value, unique && topLevel ));
-				
-            } else if ( value instanceof Long ) {
-                entity.setField( new LongField( fieldName, (Long)value, unique && topLevel ));
-
-            } else if ( value instanceof List) {
-                entity.setField( listToListField( fieldName, (List)value, entityType ));  
-            
-            } else if ( value instanceof UUID) {
-                entity.setField( new UUIDField( fieldName, (UUID)value, unique && topLevel ));
-
-            } else if ( value instanceof Map ) {
-                processMapValue( value, fieldName, entity, entityType);
-
-            } else if ( value instanceof Enum ) {
-                entity.setField( new StringField( fieldName, value.toString(), unique && topLevel ));
-	
-			} else if ( value != null ) {
-                byte[] valueSerialized;
-                try {
-                    valueSerialized = objectMapper.writeValueAsBytes( value );
-                }
-                catch ( JsonProcessingException e ) {
-                    throw new RuntimeException( "Can't serialize object ",e );
-                }
-                catch ( IOException e ) {
-                    throw new RuntimeException( "Can't serialize object ",e );
-                }
-                ByteBuffer byteBuffer = ByteBuffer.wrap( valueSerialized );
-                ByteArrayField bf = new ByteArrayField( fieldName, byteBuffer.array(), value.getClass() );
-                entity.setField( bf );
-            }
-        }
-
-        return entity;
-    }
-
-    private static void processMapValue(
-            Object value, String fieldName, Entity entity, String entityType) {
-
-        Field field = null;
-
-        // is the map really a location element?
-        Map<String, Object> m = (Map<String, Object>)value;
-        if ( m.size() == 2) {
-            Double lat = null;
-            Double lon = null;
-            try {
-                if ( m.get("latitude") != null && m.get("longitude") != null ) {
-                    lat = Double.parseDouble( m.get("latitude").toString() );
-                    lon = Double.parseDouble( m.get("longitude").toString() );
-                    
-                } else if ( m.get("lat") != null && m.get("lon") != null ) {
-                    lat = Double.parseDouble( m.get("lat").toString() );
-                    lon = Double.parseDouble( m.get("lon").toString() );
-                }
-            } catch ( NumberFormatException ignored ) {}
-            
-            if ( lat != null && lon != null ) {
-                field = new LocationField( fieldName, new Location( lat, lon ));
-            }
-        }
-        
-        if ( field == null ) {
-            
-            // not a location element, process it as map
-            entity.setField( new EntityObjectField( fieldName,
-                    fromMap( (Map<String, Object>)value, entityType, false ))); // recursion
-            
-        } else {
-            entity.setField( field );
-        }
-    }
-
-    
-    private static ListField listToListField( String fieldName, List list, String entityType ) {
-
-        if (list.isEmpty()) {
-            return new ListField( fieldName );
-        }
-
-        Object sample = list.get(0);
-
-        if ( sample instanceof Map ) {
-            return new ListField<Entity>( fieldName, processListForField( list, entityType ));
-
-        } else if ( sample instanceof List ) {
-            return new ListField<List>( fieldName, processListForField( list, entityType ));
-            
-        } else if ( sample instanceof String ) {
-            return new ListField<String>( fieldName, (List<String>)list );
-                    
-        } else if ( sample instanceof Boolean ) {
-            return new ListField<Boolean>( fieldName, (List<Boolean>)list );
-                    
-        } else if ( sample instanceof Integer ) {
-            return new ListField<Integer>( fieldName, (List<Integer>)list );
-
-        } else if ( sample instanceof Double ) {
-            return new ListField<Double>( fieldName, (List<Double>)list );
-
-        } else if ( sample instanceof Long ) {
-            return new ListField<Long>( fieldName, (List<Long>)list );
-
-        } else {
-            throw new RuntimeException("Unknown type " + sample.getClass().getName());
-        }
-    }
-
-    
-    private static List processListForField( List list, String entityType ) {
-        if ( list.isEmpty() ) {
-            return list;
-        }
-        Object sample = list.get(0);
-
-        if ( sample instanceof Map ) {
-            List<Entity> newList = new ArrayList<Entity>();
-            for ( Map<String, Object> map : (List<Map<String, Object>>)list ) {
-                newList.add( fromMap( map, entityType, false ) );
-            }
-            return newList;
-
-        } else if ( sample instanceof List ) {
-            return processListForField( list, entityType ); // recursion
-            
-        } else { 
-            return list;
-        } 
-    }
-
-
-    /**
-     * Convert Entity to Map, adding version_ug_field and a {name}_ug_analyzed field for each
-     * StringField.
-     */
-    public static Map toMap(EntityObject entity) {
-
-        Map<String, Object> entityMap = new TreeMap<>();
-
-        for (Object f : entity.getFields().toArray()) {
-            Field field = (Field) f;
-
-            if (f instanceof ListField || f instanceof ArrayField) {
-                List list = (List) field.getValue();
-                entityMap.put(field.getName(),
-                        new ArrayList( processCollectionForMap(list)));
-
-            } else if (f instanceof SetField) {
-                Set set = (Set) field.getValue();
-                entityMap.put(field.getName(),
-                        new ArrayList( processCollectionForMap(set)));
-
-            } else if (f instanceof EntityObjectField) {
-                EntityObject eo = (EntityObject) field.getValue();
-                entityMap.put( field.getName(), toMap(eo)); // recursion
-
-            } else if (f instanceof StringField) {
-                entityMap.put(field.getName(), ((String) field.getValue()));
-
-            } else if (f instanceof LocationField) {
-                LocationField locField = (LocationField) f;
-                Map<String, Object> locMap = new HashMap<String, Object>();
-
-                // field names lat and lon trigger ElasticSearch geo location 
-                locMap.put("lat", locField.getValue().getLatitude());
-                locMap.put("lon", locField.getValue().getLongitude());
-                 entityMap.put( field.getName(), field.getValue());
-
-            } else if (f instanceof ByteArrayField) {
-                    ByteArrayField bf = ( ByteArrayField ) f;
-
-                    byte[] serilizedObj =  bf.getValue();
-                    Object o;
-                    try {
-                        o = objectMapper.readValue( serilizedObj, bf.getClassinfo() );
-                    }
-                    catch ( IOException e ) {
-                        throw new RuntimeException( "Can't deserialize object ",e );
-                    }
-                    entityMap.put( bf.getName(), o );
-            }
-            else {
-                entityMap.put( field.getName(), field.getValue());
-            }
-        }
-
-        return entityMap;
-    }
-
-    
-    private static Collection processCollectionForMap(Collection c) {
-        if (c.isEmpty()) {
-            return c;
-        }
-        List processed = new ArrayList();
-        Object sample = c.iterator().next();
-
-        if (sample instanceof Entity) {
-            for (Object o : c.toArray()) {
-                Entity e = (Entity) o;
-                processed.add(toMap(e));
-            }
-
-        } else if (sample instanceof List) {
-            for (Object o : c.toArray()) {
-                List list = (List) o;
-                processed.add(processCollectionForMap(list)); // recursion;
-            }
-
-        } else if (sample instanceof Set) {
-            for (Object o : c.toArray()) {
-                Set set = (Set) o;
-                processed.add(processCollectionForMap(set)); // recursion;
-            }
-
-        } else {
-            for (Object o : c.toArray()) {
-                processed.add(o);
-            }
-        }
-        return processed;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
index d10025c..62ea81f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
@@ -28,7 +28,7 @@ import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.utils.LRUCache2;
 
-class CpManagerCache {
+public class CpManagerCache {
 
     private final EntityCollectionManagerFactory ecmf;
     private final EntityIndexFactory eif;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpNamingUtils.java
deleted file mode 100644
index 45c39ab..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpNamingUtils.java
+++ /dev/null
@@ -1,106 +0,0 @@
-package org.apache.usergrid.corepersistence;/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import org.apache.usergrid.persistence.Schema;
-
-
-/**
- * Utilises for constructing standard naming conventions for collections and connections
- */
-public class CpNamingUtils {
-
-    /**
-     * Edge types for all types
-     */
-    static final String ALL_TYPES = "zzzalltypeszzz";
-
-    /**
-     * Edge types for collection suffix
-     */
-    static final String EDGE_COLL_SUFFIX = "zzzcollzzz";
-
-    /**
-     * Edge types for connection suffix
-     */
-    static final String EDGE_CONN_SUFFIX = "zzzconnzzz";
-
-
-    static String getCollectionScopeNameFromEntityType( String type ) {
-        String csn = EDGE_COLL_SUFFIX + Schema.defaultCollectionName( type );
-        return csn.toLowerCase();
-    }
-
-
-    static String getCollectionScopeNameFromCollectionName( String name ) {
-        String csn = EDGE_COLL_SUFFIX + name;
-        return csn.toLowerCase();
-    }
-
-
-    static String getConnectionScopeName( String entityType, String connectionType ) {
-        String csn = EDGE_CONN_SUFFIX + connectionType + entityType;
-        return csn.toLowerCase();
-    }
-
-
-    static boolean isCollectionEdgeType( String type ) {
-        return type.startsWith( EDGE_COLL_SUFFIX );
-    }
-
-
-    static boolean isConnectionEdgeType( String type ) {
-        return type.startsWith( EDGE_CONN_SUFFIX );
-    }
-
-
-    static public String getConnectionType( String edgeType ) {
-        String[] parts = edgeType.split( "\\|" );
-        return parts[1];
-    }
-
-
-    static public String getCollectionName( String edgeType ) {
-        String[] parts = edgeType.split( "\\|" );
-        return parts[1];
-    }
-
-
-    static String getEdgeTypeFromConnectionType( String connectionType ) {
-
-        if ( connectionType != null ) {
-            String csn = EDGE_CONN_SUFFIX + "|" + connectionType;
-            return csn;
-        }
-
-        return null;
-    }
-
-
-    static String getEdgeTypeFromCollectionName( String collectionName ) {
-
-        if ( collectionName != null ) {
-            String csn = EDGE_COLL_SUFFIX + "|" + collectionName;
-            return csn;
-        }
-
-
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 03aada3..0f31c83 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -32,6 +32,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.corepersistence.results.ResultsLoaderFactory;
+import org.apache.usergrid.corepersistence.results.ResultsLoaderFactoryImpl;
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
@@ -175,6 +179,8 @@ public class CpRelationManager implements RelationManager {
 
     private IndexBucketLocator indexBucketLocator;
 
+    private ResultsLoaderFactory resultsLoaderFactory;
+
 
 
     public CpRelationManager() {}
@@ -230,6 +236,8 @@ public class CpRelationManager implements RelationManager {
         // commented out because it is possible that CP entity has not been created yet
         Assert.notNull( cpHeadEntity, "cpHeadEntity cannot be null" );
 
+        this.resultsLoaderFactory = new ResultsLoaderFactoryImpl( managerCache );
+
         return this;
     }
 
@@ -625,7 +633,7 @@ public class CpRelationManager implements RelationManager {
                     memberScope.getApplication(), 
                     memberScope.getOwner(), 
                     memberScope.getName(),
-                    CpEntityMapUtils.toMap(memberEntity)
+                    CpEntityMapUtils.toMap( memberEntity )
             });
         }
 
@@ -1555,136 +1563,8 @@ public class CpRelationManager implements RelationManager {
         logger.debug("buildResults() for {} from {} candidates", collName, crs.size());
 
 
-        //TODO T.N Change to results loader here
-
-        Results results = null;
-
-        EntityIndex index = managerCache.getEntityIndex(applicationScope);
-        EntityIndexBatch indexBatch = index.createBatch();
-
-
-        // map of the latest versions, we will discard stale indexes
-        Map<Id, CandidateResult> latestVersions = new LinkedHashMap<Id, CandidateResult>();
-
-        Iterator<CandidateResult> iter = crs.iterator();
-        while ( iter.hasNext() ) {
-
-            CandidateResult cr = iter.next();
-
-            CollectionScope collScope = new CollectionScopeImpl( 
-                applicationScope.getApplication(), 
-                applicationScope.getApplication(), 
-                CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-
-            EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
-
-            UUID latestVersion = ecm.getLatestVersion( Collections.singleton( cr.getId()) ).toBlocking().last().getMaxVersion( cr.getId() ).getVersion();
-
-            if ( logger.isDebugEnabled() ) {
-                logger.debug("Getting version for entity {} from scope\n   app {}\n   owner {}\n   name {}", 
-                new Object[] { 
-                    cr.getId(),
-                    collScope.getApplication(), 
-                    collScope.getOwner(), 
-                    collScope.getName() 
-                });
-            }
-
-            if ( latestVersion == null ) {
-                logger.error("Version for Entity {}:{} not found", 
-                        cr.getId().getType(), cr.getId().getUuid());
-                continue;
-            }
-
-            if ( cr.getVersion().compareTo( latestVersion) < 0 )  {
-                logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", 
-                    new Object[] { cr.getId().getUuid(), cr.getId().getType(), 
-                        cr.getVersion(), latestVersion});
-
-                IndexScope indexScope = new IndexScopeImpl(
-                    cpHeadEntity.getId(),
-                    CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-                indexBatch.deindex( indexScope, cr);
-
-                continue;
-            }
-
-            CandidateResult alreadySeen = latestVersions.get( cr.getId() ); 
-
-            if ( alreadySeen == null ) { // never seen it, so add to map
-                latestVersions.put( cr.getId(), cr );
-
-            } else {
-                // we seen this id before, only add entity if we now have newer version
-                if ( latestVersion.compareTo( alreadySeen.getVersion() ) > 0 ) {
-
-                    latestVersions.put( cr.getId(), cr);
+        final Results results = this.resultsLoaderFactory.getLoader( applicationScope, this.headEntity, query.getResultsLevel() ).getResults( crs );
 
-                    IndexScope indexScope = new IndexScopeImpl(
-                        cpHeadEntity.getId(),
-                        CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-                    indexBatch.deindex( indexScope, alreadySeen);
-                }
-            }
-        }
-
-        indexBatch.execute();
-
-        if (query.getLevel().equals(Level.IDS)) {
-
-            List<UUID> ids = new ArrayList<UUID>();
-            for ( Id id : latestVersions.keySet() ) {
-                CandidateResult cr = latestVersions.get(id);
-                ids.add( cr.getId().getUuid() );
-            }
-            results = Results.fromIdList(ids);
-
-        } else if (query.getLevel().equals(Level.REFS)) {
-
-            List<EntityRef> refs = new ArrayList<EntityRef>();
-            for ( Id id : latestVersions.keySet() ) {
-                CandidateResult cr = latestVersions.get(id);
-                refs.add( new SimpleEntityRef( cr.getId().getType(), cr.getId().getUuid()));
-            }
-            results = Results.fromRefList( refs );
-
-        } else {
-
-            List<Entity> entities = new ArrayList<Entity>();
-            for (Id id : latestVersions.keySet()) {
-
-                CandidateResult cr = latestVersions.get(id);
-
-                CollectionScope collScope = new CollectionScopeImpl( 
-                    applicationScope.getApplication(), 
-                    applicationScope.getApplication(), 
-                    CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-
-                EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
-
-                org.apache.usergrid.persistence.model.entity.Entity e = 
-                        ecm.load( cr.getId() ).toBlocking().lastOrDefault(null);
-
-                if ( e == null ) {
-                    logger.error("Entity {}:{} not found", 
-                            cr.getId().getType(), cr.getId().getUuid());
-                    continue;
-                }
-
-                Entity entity = EntityFactory.newEntity(
-                        e.getId().getUuid(), e.getField("type").getValue().toString());
-
-                Map<String, Object> entityMap = CpEntityMapUtils.toMap(e);
-                entity.addProperties(entityMap);
-                entities.add(entity);
-            }
-
-            if (entities.size() == 1) {
-                results = Results.fromEntity(entities.get(0));
-            } else {
-                results = Results.fromEntities(entities);
-            }
-        }
 
         results.setCursor( crs.getCursor() );
         results.setQueryProcessor( new CpQueryProcessor(em, query, headEntity, collName) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java
deleted file mode 100644
index b034a53..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-
-package org.apache.usergrid.corepersistence.results;/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.usergrid.corepersistence.CpNamingUtils;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
-import org.apache.usergrid.persistence.index.query.CandidateResult;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public abstract class AbstractIdLoader implements  ResultsLoader{
-
-    @Override
-    public Results getResults( final ApplicationScope applicationScope, final CandidateResults crs ) {
-//        Map<Id, CandidateResult> latestVersions = new LinkedHashMap<Id, CandidateResult>();
-//
-//               Iterator<CandidateResult> iter = crs.iterator();
-//               while ( iter.hasNext() ) {
-//
-//                   CandidateResult cr = iter.next();
-//
-//                   CollectionScope collScope = new CollectionScopeImpl(
-//                       applicationScope.getApplication(),
-//                       applicationScope.getApplication(),
-//                       CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-//
-//                   EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
-//
-//                   UUID latestVersion = ecm.getLatestVersion( cr.getId() ).toBlocking().lastOrDefault(null);
-//
-//                   if ( logger.isDebugEnabled() ) {
-//                       logger.debug("Getting version for entity {} from scope\n   app {}\n   owner {}\n   name {}",
-//                       new Object[] {
-//                           cr.getId(),
-//                           collScope.getApplication(),
-//                           collScope.getOwner(),
-//                           collScope.getName()
-//                       });
-//                   }
-//
-//                   if ( latestVersion == null ) {
-//                       logger.error("Version for Entity {}:{} not found",
-//                               cr.getId().getType(), cr.getId().getUuid());
-//                       continue;
-//                   }
-//
-//                   if ( cr.getVersion().compareTo( latestVersion) < 0 )  {
-//                       logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
-//                           new Object[] { cr.getId().getUuid(), cr.getId().getType(),
-//                               cr.getVersion(), latestVersion});
-//
-//                       IndexScope indexScope = new IndexScopeImpl(
-//                           cpHeadEntity.getId(),
-//                           CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-//                       indexBatch.deindex( indexScope, cr);
-//
-//                       continue;
-//                   }
-//
-//                   CandidateResult alreadySeen = latestVersions.get( cr.getId() );
-//
-//                   if ( alreadySeen == null ) { // never seen it, so add to map
-//                       latestVersions.put( cr.getId(), cr );
-//
-//                   } else {
-//                       // we seen this id before, only add entity if we now have newer version
-//                       if ( latestVersion.compareTo( alreadySeen.getVersion() ) > 0 ) {
-//
-//                           latestVersions.put( cr.getId(), cr);
-//
-//                           IndexScope indexScope = new IndexScopeImpl(
-//                               cpHeadEntity.getId(),
-//                               CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-//                           indexBatch.deindex( indexScope, alreadySeen);
-//                       }
-//                   }
-//               }
-//
-//               indexBatch.execute();
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java
deleted file mode 100644
index d20f677..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-
-package org.apache.usergrid.corepersistence.results;/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
-
-
-public class EntitiesLoader implements ResultsLoader {
-    @Override
-    public Results getResults( final ApplicationScope scope, final CandidateResults crs ) {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityVerifier.java
new file mode 100644
index 0000000..40cf499
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityVerifier.java
@@ -0,0 +1,142 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Optional;
+
+
+/**
+ * A loader that verifies versions are correct in cassandra and match elasticsearch
+ */
+public class EntityVerifier implements ResultsVerifier {
+
+    private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+
+    private EntitySet ids;
+
+    private Map<Id, org.apache.usergrid.persistence.model.entity.Entity> entityMapping;
+
+
+    public EntityVerifier( final int maxSize ) {
+        this.entityMapping = new HashMap<>( maxSize );
+    }
+
+
+    @Override
+    public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) {
+        ids = ecm.load( idsToLoad ).toBlocking().last();
+    }
+
+
+    @Override
+    public boolean isValid( final CandidateResult candidateResult ) {
+        final Id entityId = candidateResult.getId();
+
+        final MvccEntity version = ids.getEntity( entityId );
+
+        //version wasn't found ,deindex
+        if ( version == null ) {
+            logger.warn( "Version for Entity {}:{} not found", entityId.getUuid(), entityId.getUuid() );
+
+
+            return false;
+        }
+
+        final UUID savedVersion = version.getVersion();
+
+        if ( UUIDComparator.staticCompare( savedVersion, candidateResult.getVersion() ) > 0 ) {
+            logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
+                    entityId.getUuid(), entityId.getType(), savedVersion, candidateResult.getVersion()
+            } );
+
+            return false;
+        }
+
+
+        final Optional<org.apache.usergrid.persistence.model.entity.Entity> entity = version.getEntity();
+
+        if ( !entity.isPresent() ) {
+            return false;
+        }
+
+        entityMapping.put( entityId, entity.get() );
+
+        return true;
+    }
+
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+
+        final List<Entity> ugEntities = new ArrayList<>( ids.size() );
+
+        for ( final Id id : ids ) {
+            final org.apache.usergrid.persistence.model.entity.Entity cpEntity = entityMapping.get( id );
+
+            Entity entity = EntityFactory.newEntity( id.getUuid(), id.toString() );
+
+            Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
+            entity.addProperties( entityMap );
+            ugEntities.add( entity );
+        }
+
+        return Results.fromEntities( ugEntities );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
new file mode 100644
index 0000000..3db772b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
@@ -0,0 +1,240 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.CpManagerCache;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+
+
+public class FilteringLoader implements ResultsLoader {
+
+    private static final Logger logger = LoggerFactory.getLogger( FilteringLoader.class );
+
+    private final CpManagerCache managerCache;
+    private final ResultsVerifier resultsLoader;
+    private final Id ownerId;
+    private final ApplicationScope applicationScope;
+
+
+     protected FilteringLoader( final CpManagerCache managerCache, final ResultsVerifier resultsLoader,
+                                final EntityRef ownerId, final ApplicationScope applicationScope ) {
+         this.managerCache = managerCache;
+         this.resultsLoader = resultsLoader;
+         this.ownerId = new SimpleId( ownerId.getUuid(), ownerId.getType());
+         this.applicationScope = applicationScope;
+     }
+
+
+    @Override
+    public Results getResults( final CandidateResults crs ) {
+
+
+        final EntityIndex index = managerCache.getEntityIndex( applicationScope );
+
+        final EntityIndexBatch indexBatch = index.createBatch();
+
+        final Map<Id, Integer> orderIndex = new HashMap<>( crs.size() );
+
+        final Map<Id, CandidateResult> idResultMapping = new HashMap<>( crs.size() );
+
+        final HashMultimap<String, CandidateResult> groupedByScopes = HashMultimap.create( crs.size(), crs.size() );
+
+        final Iterator<CandidateResult> iter = crs.iterator();
+
+
+        /**
+         * Go through the candidates and group them by scope for more efficient retrieval
+         */
+        for ( int i = 0; iter.hasNext(); i++ ) {
+
+            final CandidateResult cr = iter.next();
+
+            final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() );
+
+            final Id entityId = cr.getId();
+
+            //if we've already seen this one, put which ever is greater
+
+            final CandidateResult seen = idResultMapping.get( entityId );
+
+            if ( seen == null ) {
+                idResultMapping.put( entityId, cr );
+                orderIndex.put( entityId, i );
+                groupedByScopes.put( collectionType, cr );
+            }
+
+            //we have seen it, compare them
+            else {
+                final UUID seenVersion = seen.getVersion();
+                final UUID currentVersion = cr.getVersion();
+
+                //this is a newer version, we know we already have a stale entity, add it to be cleaned up
+                if ( UUIDComparator.staticCompare( currentVersion, seenVersion ) > 0 ) {
+
+                    //de-index it
+                    logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
+                            entityId.getUuid(), entityId.getType(), seen, currentVersion
+                    } );
+
+                    //deindex
+                    deIndex( indexBatch, ownerId, cr );
+
+
+                    //TODO, fire the entity repair cleanup task here instead of de-indexing
+
+                    //replace the value with a more current version
+                    idResultMapping.put( entityId, cr );
+                    orderIndex.put( entityId, i );
+                    groupedByScopes.put( collectionType, cr );
+                }
+            }
+        }
+
+
+        //now everything is ordered, and older versions are removed.  Batch fetch versions to verify existence and
+        // correct versions
+
+        final TreeMap<Integer, Id> sortedResults = new TreeMap<>();
+
+        for ( final String scopeName : groupedByScopes.keys() ) {
+
+
+            final Set<CandidateResult> candidateResults = groupedByScopes.get( scopeName );
+
+            final Collection<Id> idsToLoad =
+                    Collections2.transform( candidateResults, new Function<CandidateResult, Id>() {
+                        @Nullable
+                        @Override
+                        public Id apply( @Nullable final CandidateResult input ) {
+                            if ( input == null ) {
+                                return null;
+                            }
+
+                            return input.getId();
+                        }
+                    } );
+
+
+            //now using the scope, load the collection
+
+            /**
+             * Get the collection scope and batch load all the versions
+             */
+            final CollectionScope collScope =
+                    new CollectionScopeImpl( applicationScope.getApplication(), applicationScope.getApplication(),
+                            scopeName );
+
+
+            final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collScope );
+
+
+            //load the results into the loader for this ech
+            resultsLoader.loadResults( idsToLoad, ecm );
+
+
+
+
+
+            //now compare them
+
+            for ( final Id requestedId : idsToLoad ) {
+
+                final CandidateResult cr = idResultMapping.get( requestedId );
+
+                //ask the loader if this is valid, if not discard it
+                if(!resultsLoader.isValid( cr )){
+                    deIndex( indexBatch, ownerId, cr );
+                    continue;
+                }
+
+                //if we get here we're good, we need to add this to our results
+                final int candidateIndex = orderIndex.get( requestedId  );
+
+                sortedResults.put( candidateIndex, requestedId );
+
+            }
+        }
+
+
+        return resultsLoader.getResults(sortedResults.values());
+    }
+
+
+
+
+
+    protected void deIndex( final EntityIndexBatch batch, final Id ownerId, final CandidateResult candidateResult ) {
+        IndexScope indexScope = new IndexScopeImpl( ownerId,
+                CpNamingUtils.getCollectionScopeNameFromEntityType( candidateResult.getId().getType() ) );
+
+        batch.deindex( indexScope, candidateResult );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java
deleted file mode 100644
index 7796028..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-
-package org.apache.usergrid.corepersistence.results;/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
-
-
-public class IdsLoader extends AbstractIdLoader{
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsVerifier.java
new file mode 100644
index 0000000..cf13c74
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsVerifier.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public class IdsVerifier extends VersionVerifier {
+
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+
+        final List<UUID> returnIds = new ArrayList<>( ids.size() );
+
+        for ( final Id id : ids ) {
+            returnIds.add( id.getUuid() );
+        }
+
+
+        return Results.fromIdList( returnIds );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java
deleted file mode 100644
index ad852d5..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-
-package org.apache.usergrid.corepersistence.results;/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
-
-
-public class RefsLoader extends AbstractIdLoader{
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java
new file mode 100644
index 0000000..77443ee
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java
@@ -0,0 +1,62 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public class RefsVerifier extends VersionVerifier {
+
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+        List<EntityRef> refs = new ArrayList<EntityRef>();
+        for ( Id id : ids ) {
+            refs.add( new SimpleEntityRef( id.getType(), id.getUuid() ) );
+        }
+        return Results.fromRefList( refs );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java
index 8a17c6e..299327d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java
@@ -41,6 +41,7 @@ package org.apache.usergrid.corepersistence.results;/*
 import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.model.entity.Id;
 
 
 /**
@@ -50,8 +51,8 @@ public interface ResultsLoader {
 
     /**
      * Using the candidate results, get the results
-     * @param crs
+     * @param  crs The candidate result set
      * @return
      */
-    public Results getResults(final ApplicationScope scope, final CandidateResults crs);
+    public Results getResults( final CandidateResults crs);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
index 5c5892c..9954597 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
@@ -38,6 +38,12 @@ package org.apache.usergrid.corepersistence.results;/*
  */
 
 
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
 /**
  * Factory for creating results
  */
@@ -49,5 +55,6 @@ public interface ResultsLoaderFactory {
      * Get the load for results
      * @return
      */
-    public ResultsLoader getLoader();
+    public ResultsLoader getLoader(final ApplicationScope applicationScope, final EntityRef ownerId,
+                                        final Query.Level resultsLevel );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
index edaca78..7839a13 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
@@ -38,33 +38,51 @@ package org.apache.usergrid.corepersistence.results;/*
  */
 
 
+import org.junit.rules.Verifier;
+
+import org.apache.usergrid.corepersistence.CpManagerCache;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.Id;
+
 import com.google.inject.Inject;
-import com.google.inject.Singleton;
 
 
 /**
  * Factory for creating results
  */
-@Singleton
+
 public class ResultsLoaderFactoryImpl implements ResultsLoaderFactory {
 
 
-    private final EntitiesLoader entityLoader;
-    private final IdsLoader idsLoader;
-    private final RefsLoader refsLoader;
+    private final CpManagerCache managerCache;
 
 
     @Inject
-    public ResultsLoaderFactoryImpl( final EntitiesLoader entityLoader, final IdsLoader idsLoader,
-                                      final RefsLoader refsLoader ) {
-        this.entityLoader = entityLoader;
-        this.idsLoader = idsLoader;
-        this.refsLoader = refsLoader;
+    public ResultsLoaderFactoryImpl( final CpManagerCache managerCache ) {
+        this.managerCache = managerCache;
     }
 
 
     @Override
-    public ResultsLoader getLoader() {
-        return null;
+    public ResultsLoader getLoader( final ApplicationScope applicationScope, final EntityRef ownerId,
+                                    final Query.Level resultsLevel ) {
+
+
+        ResultsVerifier verifier;
+
+        if ( resultsLevel == Query.Level.REFS ) {
+            verifier = new RefsVerifier();
+        }
+        else if ( resultsLevel == Query.Level.IDS ) {
+            verifier = new RefsVerifier();
+        }
+        else {
+            verifier = new EntityVerifier(Query.MAX_LIMIT);
+        }
+
+
+        return new FilteringLoader( managerCache, verifier, ownerId, applicationScope );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java
new file mode 100644
index 0000000..1180ece
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java
@@ -0,0 +1,72 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.Collection;
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public interface ResultsVerifier {
+
+    /**
+     * Load all the candidate ides for verification
+     * @param ids The Id's to load
+     * @param ecm The entity collection manager
+     */
+    public void loadResults(Collection<Id> ids, EntityCollectionManager ecm);
+
+    /**
+     * Return true if the candidate result is a valid result that should be retained.  If it should not
+     * it should also be removed from the list of possible return values in this loader
+     * @param candidateResult
+     * @return
+     */
+    public boolean isValid(CandidateResult candidateResult);
+
+
+    /**
+     * Load the result set with the given ids
+     * @return
+     */
+    public Results getResults(Collection<Id> ids);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java
new file mode 100644
index 0000000..778b0c3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java
@@ -0,0 +1,103 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.Collection;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+
+
+/**
+ * A loader that verifies versions are correct in cassandra and match elasticsearch
+ */
+public abstract class VersionVerifier implements ResultsVerifier {
+
+    private static final Logger logger = LoggerFactory.getLogger( VersionVerifier.class );
+
+    private VersionSet ids;
+
+
+    @Override
+    public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) {
+        ids = ecm.getLatestVersion( idsToLoad ).toBlocking().last();
+    }
+
+
+    @Override
+    public boolean isValid( final CandidateResult candidateResult ) {
+        final Id entityId = candidateResult.getId();
+
+        final MvccLogEntry version = ids.getMaxVersion( entityId );
+
+        //version wasn't found ,deindex
+        if ( version == null ) {
+            logger.warn( "Version for Entity {}:{} not found", entityId.getUuid(), entityId.getUuid() );
+
+
+            return false;
+        }
+
+        final UUID savedVersion = version.getVersion();
+
+        if ( UUIDComparator.staticCompare( savedVersion, candidateResult.getVersion() ) > 0 ) {
+            logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
+                    entityId.getUuid(), entityId.getType(), savedVersion, candidateResult.getVersion()
+            } );
+
+            return false;
+        }
+
+
+        return true;
+    }
+
+
+}