You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/09 23:50:17 UTC

[1/3] Refactored loader to group for batching, then call filters to perform the load, verification and result construction

Repository: incubator-usergrid
Updated Branches:
  refs/heads/collection_multiget f43e4f5c3 -> 454ef4ff1


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpEntityMapUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpEntityMapUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpEntityMapUtils.java
new file mode 100644
index 0000000..feed396
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpEntityMapUtils.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+
+
+package org.apache.usergrid.corepersistence.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.field.ArrayField;
+import org.apache.usergrid.persistence.model.field.BooleanField;
+import org.apache.usergrid.persistence.model.field.ByteArrayField;
+import org.apache.usergrid.persistence.model.field.DoubleField;
+import org.apache.usergrid.persistence.model.field.EntityObjectField;
+import org.apache.usergrid.persistence.model.field.Field;
+import org.apache.usergrid.persistence.model.field.FloatField;
+import org.apache.usergrid.persistence.model.field.IntegerField;
+import org.apache.usergrid.persistence.model.field.ListField;
+import org.apache.usergrid.persistence.model.field.LocationField;
+import org.apache.usergrid.persistence.model.field.LongField;
+import org.apache.usergrid.persistence.model.field.SetField;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.persistence.model.field.UUIDField;
+import org.apache.usergrid.persistence.model.field.value.EntityObject;
+import org.apache.usergrid.persistence.model.field.value.Location;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utilities for converting entities to/from maps suitable for Core Persistence.
+ * Aware of unique properties via Schema.
+ */
+public class CpEntityMapUtils {
+    private static final Logger logger = LoggerFactory.getLogger( CpEntityMapUtils.class );
+
+    public static ObjectMapper objectMapper = new ObjectMapper(  );
+
+    public static Entity fromMap( Map<String, Object> map, String entityType, boolean topLevel ) {
+        return fromMap( null, map, entityType, topLevel );
+    }
+
+    public static Entity fromMap( Entity entity, Map<String, Object> map, String entityType, boolean topLevel ) {
+
+        if ( entity == null ) {
+            entity = new Entity();
+        }
+
+        for ( String fieldName : map.keySet() ) {
+
+            Object value = map.get( fieldName );
+            boolean unique = Schema.getDefaultSchema().isPropertyUnique(entityType, fieldName);
+
+//            if ( unique ) {
+//                logger.debug("{} is a unique property", fieldName );
+//            }
+
+            if ( value instanceof String ) {
+                entity.setField( new StringField( fieldName, (String)value, unique && topLevel ));
+
+            } else if ( value instanceof Boolean ) {
+                entity.setField( new BooleanField( fieldName, (Boolean)value, unique && topLevel ));
+                        
+            } else if ( value instanceof Integer ) {
+                entity.setField( new IntegerField( fieldName, (Integer)value, unique && topLevel ));
+
+            } else if ( value instanceof Double ) {
+                entity.setField( new DoubleField( fieldName, (Double)value, unique && topLevel ));
+
+		    } else if ( value instanceof Float ) {
+                entity.setField( new FloatField( fieldName, (Float)value, unique && topLevel ));
+				
+            } else if ( value instanceof Long ) {
+                entity.setField( new LongField( fieldName, (Long)value, unique && topLevel ));
+
+            } else if ( value instanceof List) {
+                entity.setField( listToListField( fieldName, (List)value, entityType ));  
+            
+            } else if ( value instanceof UUID) {
+                entity.setField( new UUIDField( fieldName, (UUID)value, unique && topLevel ));
+
+            } else if ( value instanceof Map ) {
+                processMapValue( value, fieldName, entity, entityType);
+
+            } else if ( value instanceof Enum ) {
+                entity.setField( new StringField( fieldName, value.toString(), unique && topLevel ));
+	
+			} else if ( value != null ) {
+                byte[] valueSerialized;
+                try {
+                    valueSerialized = objectMapper.writeValueAsBytes( value );
+                }
+                catch ( JsonProcessingException e ) {
+                    throw new RuntimeException( "Can't serialize object ",e );
+                }
+                catch ( IOException e ) {
+                    throw new RuntimeException( "Can't serialize object ",e );
+                }
+                ByteBuffer byteBuffer = ByteBuffer.wrap( valueSerialized );
+                ByteArrayField bf = new ByteArrayField( fieldName, byteBuffer.array(), value.getClass() );
+                entity.setField( bf );
+            }
+        }
+
+        return entity;
+    }
+
+    private static void processMapValue(
+            Object value, String fieldName, Entity entity, String entityType) {
+
+        Field field = null;
+
+        // is the map really a location element?
+        Map<String, Object> m = (Map<String, Object>)value;
+        if ( m.size() == 2) {
+            Double lat = null;
+            Double lon = null;
+            try {
+                if ( m.get("latitude") != null && m.get("longitude") != null ) {
+                    lat = Double.parseDouble( m.get("latitude").toString() );
+                    lon = Double.parseDouble( m.get("longitude").toString() );
+                    
+                } else if ( m.get("lat") != null && m.get("lon") != null ) {
+                    lat = Double.parseDouble( m.get("lat").toString() );
+                    lon = Double.parseDouble( m.get("lon").toString() );
+                }
+            } catch ( NumberFormatException ignored ) {}
+            
+            if ( lat != null && lon != null ) {
+                field = new LocationField( fieldName, new Location( lat, lon ));
+            }
+        }
+        
+        if ( field == null ) {
+            
+            // not a location element, process it as map
+            entity.setField( new EntityObjectField( fieldName,
+                    fromMap( (Map<String, Object>)value, entityType, false ))); // recursion
+            
+        } else {
+            entity.setField( field );
+        }
+    }
+
+    
+    private static ListField listToListField( String fieldName, List list, String entityType ) {
+
+        if (list.isEmpty()) {
+            return new ListField( fieldName );
+        }
+
+        Object sample = list.get(0);
+
+        if ( sample instanceof Map ) {
+            return new ListField<Entity>( fieldName, processListForField( list, entityType ));
+
+        } else if ( sample instanceof List ) {
+            return new ListField<List>( fieldName, processListForField( list, entityType ));
+            
+        } else if ( sample instanceof String ) {
+            return new ListField<String>( fieldName, (List<String>)list );
+                    
+        } else if ( sample instanceof Boolean ) {
+            return new ListField<Boolean>( fieldName, (List<Boolean>)list );
+                    
+        } else if ( sample instanceof Integer ) {
+            return new ListField<Integer>( fieldName, (List<Integer>)list );
+
+        } else if ( sample instanceof Double ) {
+            return new ListField<Double>( fieldName, (List<Double>)list );
+
+        } else if ( sample instanceof Long ) {
+            return new ListField<Long>( fieldName, (List<Long>)list );
+
+        } else {
+            throw new RuntimeException("Unknown type " + sample.getClass().getName());
+        }
+    }
+
+    
+    private static List processListForField( List list, String entityType ) {
+        if ( list.isEmpty() ) {
+            return list;
+        }
+        Object sample = list.get(0);
+
+        if ( sample instanceof Map ) {
+            List<Entity> newList = new ArrayList<Entity>();
+            for ( Map<String, Object> map : (List<Map<String, Object>>)list ) {
+                newList.add( fromMap( map, entityType, false ) );
+            }
+            return newList;
+
+        } else if ( sample instanceof List ) {
+            return processListForField( list, entityType ); // recursion
+            
+        } else { 
+            return list;
+        } 
+    }
+
+
+    /**
+     * Convert Entity to Map, adding version_ug_field and a {name}_ug_analyzed field for each
+     * StringField.
+     */
+    public static Map toMap(EntityObject entity) {
+
+        Map<String, Object> entityMap = new TreeMap<>();
+
+        for (Object f : entity.getFields().toArray()) {
+            Field field = (Field) f;
+
+            if (f instanceof ListField || f instanceof ArrayField) {
+                List list = (List) field.getValue();
+                entityMap.put(field.getName(),
+                        new ArrayList( processCollectionForMap(list)));
+
+            } else if (f instanceof SetField) {
+                Set set = (Set) field.getValue();
+                entityMap.put(field.getName(),
+                        new ArrayList( processCollectionForMap(set)));
+
+            } else if (f instanceof EntityObjectField) {
+                EntityObject eo = (EntityObject) field.getValue();
+                entityMap.put( field.getName(), toMap(eo)); // recursion
+
+            } else if (f instanceof StringField) {
+                entityMap.put(field.getName(), ((String) field.getValue()));
+
+            } else if (f instanceof LocationField) {
+                LocationField locField = (LocationField) f;
+                Map<String, Object> locMap = new HashMap<String, Object>();
+
+                // field names lat and lon trigger ElasticSearch geo location 
+                locMap.put("lat", locField.getValue().getLatitude());
+                locMap.put("lon", locField.getValue().getLongitude());
+                 entityMap.put( field.getName(), field.getValue());
+
+            } else if (f instanceof ByteArrayField) {
+                    ByteArrayField bf = ( ByteArrayField ) f;
+
+                    byte[] serilizedObj =  bf.getValue();
+                    Object o;
+                    try {
+                        o = objectMapper.readValue( serilizedObj, bf.getClassinfo() );
+                    }
+                    catch ( IOException e ) {
+                        throw new RuntimeException( "Can't deserialize object ",e );
+                    }
+                    entityMap.put( bf.getName(), o );
+            }
+            else {
+                entityMap.put( field.getName(), field.getValue());
+            }
+        }
+
+        return entityMap;
+    }
+
+    
+    private static Collection processCollectionForMap(Collection c) {
+        if (c.isEmpty()) {
+            return c;
+        }
+        List processed = new ArrayList();
+        Object sample = c.iterator().next();
+
+        if (sample instanceof Entity) {
+            for (Object o : c.toArray()) {
+                Entity e = (Entity) o;
+                processed.add(toMap(e));
+            }
+
+        } else if (sample instanceof List) {
+            for (Object o : c.toArray()) {
+                List list = (List) o;
+                processed.add(processCollectionForMap(list)); // recursion;
+            }
+
+        } else if (sample instanceof Set) {
+            for (Object o : c.toArray()) {
+                Set set = (Set) o;
+                processed.add(processCollectionForMap(set)); // recursion;
+            }
+
+        } else {
+            for (Object o : c.toArray()) {
+                processed.add(o);
+            }
+        }
+        return processed;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
new file mode 100644
index 0000000..f2fce47
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -0,0 +1,106 @@
+package org.apache.usergrid.corepersistence.util;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.usergrid.persistence.Schema;
+
+
+/**
+ * Utilises for constructing standard naming conventions for collections and connections
+ */
+public class CpNamingUtils {
+
+    /**
+     * Edge types for all types
+     */
+    public static final String ALL_TYPES = "zzzalltypeszzz";
+
+    /**
+     * Edge types for collection suffix
+     */
+    public static final String EDGE_COLL_SUFFIX = "zzzcollzzz";
+
+    /**
+     * Edge types for connection suffix
+     */
+    public static final String EDGE_CONN_SUFFIX = "zzzconnzzz";
+
+
+    public static String getCollectionScopeNameFromEntityType( String type ) {
+        String csn = EDGE_COLL_SUFFIX + Schema.defaultCollectionName( type );
+        return csn.toLowerCase();
+    }
+
+
+    public static String getCollectionScopeNameFromCollectionName( String name ) {
+        String csn = EDGE_COLL_SUFFIX + name;
+        return csn.toLowerCase();
+    }
+
+
+    public static String getConnectionScopeName( String entityType, String connectionType ) {
+        String csn = EDGE_CONN_SUFFIX + connectionType + entityType;
+        return csn.toLowerCase();
+    }
+
+
+    public static boolean isCollectionEdgeType( String type ) {
+        return type.startsWith( EDGE_COLL_SUFFIX );
+    }
+
+
+    public  static boolean isConnectionEdgeType( String type ) {
+        return type.startsWith( EDGE_CONN_SUFFIX );
+    }
+
+
+    static public String getConnectionType( String edgeType ) {
+        String[] parts = edgeType.split( "\\|" );
+        return parts[1];
+    }
+
+
+    static public String getCollectionName( String edgeType ) {
+        String[] parts = edgeType.split( "\\|" );
+        return parts[1];
+    }
+
+
+    public static String getEdgeTypeFromConnectionType( String connectionType ) {
+
+        if ( connectionType != null ) {
+            String csn = EDGE_CONN_SUFFIX + "|" + connectionType;
+            return csn;
+        }
+
+        return null;
+    }
+
+
+    public static String getEdgeTypeFromCollectionName( String collectionName ) {
+
+        if ( collectionName != null ) {
+            String csn = EDGE_COLL_SUFFIX + "|" + collectionName;
+            return csn;
+        }
+
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityMapUtilsTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityMapUtilsTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityMapUtilsTest.java
index 87f6b46..a75b9fd 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityMapUtilsTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityMapUtilsTest.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.field.ListField;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index c5d5782..5a1eca2 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.UUID;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityRef;


[3/3] git commit: Merge branch 'collection_multiget' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into collection_multiget

Posted by to...@apache.org.
Merge branch 'collection_multiget' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into collection_multiget


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/454ef4ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/454ef4ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/454ef4ff

Branch: refs/heads/collection_multiget
Commit: 454ef4ff1456ed378d098054fadfde07d4145a72
Parents: 6285e60 f43e4f5
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 9 15:50:10 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 9 15:50:10 2014 -0600

----------------------------------------------------------------------
 .../collection/EntityCollectionManager.java     |   8 +
 .../impl/EntityCollectionManagerImpl.java       | 193 +++++++++++--------
 .../collection/EntityCollectionManagerIT.java   |  31 +++
 3 files changed, 147 insertions(+), 85 deletions(-)
----------------------------------------------------------------------



[2/3] git commit: Refactored loader to group for batching, then call filters to perform the load, verification and result construction

Posted by to...@apache.org.
Refactored loader to group for batching, then call filters to perform the load, verification and result construction


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6285e605
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6285e605
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6285e605

Branch: refs/heads/collection_multiget
Commit: 6285e6057832580d947106f845852254397eb56c
Parents: b3515f4
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 9 15:03:51 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 9 15:49:55 2014 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |   3 +
 .../corepersistence/CpEntityManagerFactory.java |   2 +
 .../corepersistence/CpEntityMapUtils.java       | 323 -------------------
 .../corepersistence/CpManagerCache.java         |   2 +-
 .../usergrid/corepersistence/CpNamingUtils.java | 106 ------
 .../corepersistence/CpRelationManager.java      | 140 +-------
 .../results/AbstractIdLoader.java               | 130 --------
 .../corepersistence/results/EntitiesLoader.java |  51 ---
 .../corepersistence/results/EntityVerifier.java | 142 ++++++++
 .../results/FilteringLoader.java                | 240 ++++++++++++++
 .../corepersistence/results/IdsLoader.java      |  46 ---
 .../corepersistence/results/IdsVerifier.java    |  65 ++++
 .../corepersistence/results/RefsLoader.java     |  46 ---
 .../corepersistence/results/RefsVerifier.java   |  62 ++++
 .../corepersistence/results/ResultsLoader.java  |   5 +-
 .../results/ResultsLoaderFactory.java           |   9 +-
 .../results/ResultsLoaderFactoryImpl.java       |  42 ++-
 .../results/ResultsVerifier.java                |  72 +++++
 .../results/VersionVerifier.java                | 103 ++++++
 .../corepersistence/util/CpEntityMapUtils.java  | 323 +++++++++++++++++++
 .../corepersistence/util/CpNamingUtils.java     | 106 ++++++
 .../corepersistence/CpEntityMapUtilsTest.java   |   2 +
 .../corepersistence/StaleIndexCleanupTest.java  |   1 +
 23 files changed, 1173 insertions(+), 848 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 1a8f17c..657ee2d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -54,6 +54,9 @@ import me.prettyprint.hector.api.query.QueryResult;
 import me.prettyprint.hector.api.query.SliceCounterQuery;
 import static org.apache.commons.lang.StringUtils.capitalize;
 import static org.apache.commons.lang.StringUtils.isBlank;
+
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.AggregateCounter;
 import org.apache.usergrid.persistence.AggregateCounterSet;
 import org.apache.usergrid.persistence.CollectionRef;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index f44eac3..17b33e4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 import org.apache.commons.lang.StringUtils;
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.AbstractEntity;
 import org.apache.usergrid.persistence.DynamicEntity;
 import org.apache.usergrid.persistence.Entity;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityMapUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityMapUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityMapUtils.java
deleted file mode 100644
index efbef0b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityMapUtils.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-
-
-package org.apache.usergrid.corepersistence;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.Schema;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.field.ArrayField;
-import org.apache.usergrid.persistence.model.field.BooleanField;
-import org.apache.usergrid.persistence.model.field.ByteArrayField;
-import org.apache.usergrid.persistence.model.field.DoubleField;
-import org.apache.usergrid.persistence.model.field.EntityObjectField;
-import org.apache.usergrid.persistence.model.field.Field;
-import org.apache.usergrid.persistence.model.field.FloatField;
-import org.apache.usergrid.persistence.model.field.IntegerField;
-import org.apache.usergrid.persistence.model.field.ListField;
-import org.apache.usergrid.persistence.model.field.LocationField;
-import org.apache.usergrid.persistence.model.field.LongField;
-import org.apache.usergrid.persistence.model.field.SetField;
-import org.apache.usergrid.persistence.model.field.StringField;
-import org.apache.usergrid.persistence.model.field.UUIDField;
-import org.apache.usergrid.persistence.model.field.value.EntityObject;
-import org.apache.usergrid.persistence.model.field.value.Location;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utilities for converting entities to/from maps suitable for Core Persistence.
- * Aware of unique properties via Schema.
- */
-public class CpEntityMapUtils {
-    private static final Logger logger = LoggerFactory.getLogger( CpEntityMapUtils.class );
-
-    public static ObjectMapper objectMapper = new ObjectMapper(  );
-
-    public static Entity fromMap( Map<String, Object> map, String entityType, boolean topLevel ) {
-        return fromMap( null, map, entityType, topLevel );
-    }
-
-    public static Entity fromMap( Entity entity, Map<String, Object> map, String entityType, boolean topLevel ) {
-
-        if ( entity == null ) {
-            entity = new Entity();
-        }
-
-        for ( String fieldName : map.keySet() ) {
-
-            Object value = map.get( fieldName );
-            boolean unique = Schema.getDefaultSchema().isPropertyUnique(entityType, fieldName);
-
-//            if ( unique ) {
-//                logger.debug("{} is a unique property", fieldName );
-//            }
-
-            if ( value instanceof String ) {
-                entity.setField( new StringField( fieldName, (String)value, unique && topLevel ));
-
-            } else if ( value instanceof Boolean ) {
-                entity.setField( new BooleanField( fieldName, (Boolean)value, unique && topLevel ));
-                        
-            } else if ( value instanceof Integer ) {
-                entity.setField( new IntegerField( fieldName, (Integer)value, unique && topLevel ));
-
-            } else if ( value instanceof Double ) {
-                entity.setField( new DoubleField( fieldName, (Double)value, unique && topLevel ));
-
-		    } else if ( value instanceof Float ) {
-                entity.setField( new FloatField( fieldName, (Float)value, unique && topLevel ));
-				
-            } else if ( value instanceof Long ) {
-                entity.setField( new LongField( fieldName, (Long)value, unique && topLevel ));
-
-            } else if ( value instanceof List) {
-                entity.setField( listToListField( fieldName, (List)value, entityType ));  
-            
-            } else if ( value instanceof UUID) {
-                entity.setField( new UUIDField( fieldName, (UUID)value, unique && topLevel ));
-
-            } else if ( value instanceof Map ) {
-                processMapValue( value, fieldName, entity, entityType);
-
-            } else if ( value instanceof Enum ) {
-                entity.setField( new StringField( fieldName, value.toString(), unique && topLevel ));
-	
-			} else if ( value != null ) {
-                byte[] valueSerialized;
-                try {
-                    valueSerialized = objectMapper.writeValueAsBytes( value );
-                }
-                catch ( JsonProcessingException e ) {
-                    throw new RuntimeException( "Can't serialize object ",e );
-                }
-                catch ( IOException e ) {
-                    throw new RuntimeException( "Can't serialize object ",e );
-                }
-                ByteBuffer byteBuffer = ByteBuffer.wrap( valueSerialized );
-                ByteArrayField bf = new ByteArrayField( fieldName, byteBuffer.array(), value.getClass() );
-                entity.setField( bf );
-            }
-        }
-
-        return entity;
-    }
-
-    private static void processMapValue(
-            Object value, String fieldName, Entity entity, String entityType) {
-
-        Field field = null;
-
-        // is the map really a location element?
-        Map<String, Object> m = (Map<String, Object>)value;
-        if ( m.size() == 2) {
-            Double lat = null;
-            Double lon = null;
-            try {
-                if ( m.get("latitude") != null && m.get("longitude") != null ) {
-                    lat = Double.parseDouble( m.get("latitude").toString() );
-                    lon = Double.parseDouble( m.get("longitude").toString() );
-                    
-                } else if ( m.get("lat") != null && m.get("lon") != null ) {
-                    lat = Double.parseDouble( m.get("lat").toString() );
-                    lon = Double.parseDouble( m.get("lon").toString() );
-                }
-            } catch ( NumberFormatException ignored ) {}
-            
-            if ( lat != null && lon != null ) {
-                field = new LocationField( fieldName, new Location( lat, lon ));
-            }
-        }
-        
-        if ( field == null ) {
-            
-            // not a location element, process it as map
-            entity.setField( new EntityObjectField( fieldName,
-                    fromMap( (Map<String, Object>)value, entityType, false ))); // recursion
-            
-        } else {
-            entity.setField( field );
-        }
-    }
-
-    
-    private static ListField listToListField( String fieldName, List list, String entityType ) {
-
-        if (list.isEmpty()) {
-            return new ListField( fieldName );
-        }
-
-        Object sample = list.get(0);
-
-        if ( sample instanceof Map ) {
-            return new ListField<Entity>( fieldName, processListForField( list, entityType ));
-
-        } else if ( sample instanceof List ) {
-            return new ListField<List>( fieldName, processListForField( list, entityType ));
-            
-        } else if ( sample instanceof String ) {
-            return new ListField<String>( fieldName, (List<String>)list );
-                    
-        } else if ( sample instanceof Boolean ) {
-            return new ListField<Boolean>( fieldName, (List<Boolean>)list );
-                    
-        } else if ( sample instanceof Integer ) {
-            return new ListField<Integer>( fieldName, (List<Integer>)list );
-
-        } else if ( sample instanceof Double ) {
-            return new ListField<Double>( fieldName, (List<Double>)list );
-
-        } else if ( sample instanceof Long ) {
-            return new ListField<Long>( fieldName, (List<Long>)list );
-
-        } else {
-            throw new RuntimeException("Unknown type " + sample.getClass().getName());
-        }
-    }
-
-    
-    private static List processListForField( List list, String entityType ) {
-        if ( list.isEmpty() ) {
-            return list;
-        }
-        Object sample = list.get(0);
-
-        if ( sample instanceof Map ) {
-            List<Entity> newList = new ArrayList<Entity>();
-            for ( Map<String, Object> map : (List<Map<String, Object>>)list ) {
-                newList.add( fromMap( map, entityType, false ) );
-            }
-            return newList;
-
-        } else if ( sample instanceof List ) {
-            return processListForField( list, entityType ); // recursion
-            
-        } else { 
-            return list;
-        } 
-    }
-
-
-    /**
-     * Convert Entity to Map, adding version_ug_field and a {name}_ug_analyzed field for each
-     * StringField.
-     */
-    public static Map toMap(EntityObject entity) {
-
-        Map<String, Object> entityMap = new TreeMap<>();
-
-        for (Object f : entity.getFields().toArray()) {
-            Field field = (Field) f;
-
-            if (f instanceof ListField || f instanceof ArrayField) {
-                List list = (List) field.getValue();
-                entityMap.put(field.getName(),
-                        new ArrayList( processCollectionForMap(list)));
-
-            } else if (f instanceof SetField) {
-                Set set = (Set) field.getValue();
-                entityMap.put(field.getName(),
-                        new ArrayList( processCollectionForMap(set)));
-
-            } else if (f instanceof EntityObjectField) {
-                EntityObject eo = (EntityObject) field.getValue();
-                entityMap.put( field.getName(), toMap(eo)); // recursion
-
-            } else if (f instanceof StringField) {
-                entityMap.put(field.getName(), ((String) field.getValue()));
-
-            } else if (f instanceof LocationField) {
-                LocationField locField = (LocationField) f;
-                Map<String, Object> locMap = new HashMap<String, Object>();
-
-                // field names lat and lon trigger ElasticSearch geo location 
-                locMap.put("lat", locField.getValue().getLatitude());
-                locMap.put("lon", locField.getValue().getLongitude());
-                 entityMap.put( field.getName(), field.getValue());
-
-            } else if (f instanceof ByteArrayField) {
-                    ByteArrayField bf = ( ByteArrayField ) f;
-
-                    byte[] serilizedObj =  bf.getValue();
-                    Object o;
-                    try {
-                        o = objectMapper.readValue( serilizedObj, bf.getClassinfo() );
-                    }
-                    catch ( IOException e ) {
-                        throw new RuntimeException( "Can't deserialize object ",e );
-                    }
-                    entityMap.put( bf.getName(), o );
-            }
-            else {
-                entityMap.put( field.getName(), field.getValue());
-            }
-        }
-
-        return entityMap;
-    }
-
-    
-    private static Collection processCollectionForMap(Collection c) {
-        if (c.isEmpty()) {
-            return c;
-        }
-        List processed = new ArrayList();
-        Object sample = c.iterator().next();
-
-        if (sample instanceof Entity) {
-            for (Object o : c.toArray()) {
-                Entity e = (Entity) o;
-                processed.add(toMap(e));
-            }
-
-        } else if (sample instanceof List) {
-            for (Object o : c.toArray()) {
-                List list = (List) o;
-                processed.add(processCollectionForMap(list)); // recursion;
-            }
-
-        } else if (sample instanceof Set) {
-            for (Object o : c.toArray()) {
-                Set set = (Set) o;
-                processed.add(processCollectionForMap(set)); // recursion;
-            }
-
-        } else {
-            for (Object o : c.toArray()) {
-                processed.add(o);
-            }
-        }
-        return processed;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
index d10025c..62ea81f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
@@ -28,7 +28,7 @@ import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.utils.LRUCache2;
 
-class CpManagerCache {
+public class CpManagerCache {
 
     private final EntityCollectionManagerFactory ecmf;
     private final EntityIndexFactory eif;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpNamingUtils.java
deleted file mode 100644
index 45c39ab..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpNamingUtils.java
+++ /dev/null
@@ -1,106 +0,0 @@
-package org.apache.usergrid.corepersistence;/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import org.apache.usergrid.persistence.Schema;
-
-
-/**
- * Utilises for constructing standard naming conventions for collections and connections
- */
-public class CpNamingUtils {
-
-    /**
-     * Edge types for all types
-     */
-    static final String ALL_TYPES = "zzzalltypeszzz";
-
-    /**
-     * Edge types for collection suffix
-     */
-    static final String EDGE_COLL_SUFFIX = "zzzcollzzz";
-
-    /**
-     * Edge types for connection suffix
-     */
-    static final String EDGE_CONN_SUFFIX = "zzzconnzzz";
-
-
-    static String getCollectionScopeNameFromEntityType( String type ) {
-        String csn = EDGE_COLL_SUFFIX + Schema.defaultCollectionName( type );
-        return csn.toLowerCase();
-    }
-
-
-    static String getCollectionScopeNameFromCollectionName( String name ) {
-        String csn = EDGE_COLL_SUFFIX + name;
-        return csn.toLowerCase();
-    }
-
-
-    static String getConnectionScopeName( String entityType, String connectionType ) {
-        String csn = EDGE_CONN_SUFFIX + connectionType + entityType;
-        return csn.toLowerCase();
-    }
-
-
-    static boolean isCollectionEdgeType( String type ) {
-        return type.startsWith( EDGE_COLL_SUFFIX );
-    }
-
-
-    static boolean isConnectionEdgeType( String type ) {
-        return type.startsWith( EDGE_CONN_SUFFIX );
-    }
-
-
-    static public String getConnectionType( String edgeType ) {
-        String[] parts = edgeType.split( "\\|" );
-        return parts[1];
-    }
-
-
-    static public String getCollectionName( String edgeType ) {
-        String[] parts = edgeType.split( "\\|" );
-        return parts[1];
-    }
-
-
-    static String getEdgeTypeFromConnectionType( String connectionType ) {
-
-        if ( connectionType != null ) {
-            String csn = EDGE_CONN_SUFFIX + "|" + connectionType;
-            return csn;
-        }
-
-        return null;
-    }
-
-
-    static String getEdgeTypeFromCollectionName( String collectionName ) {
-
-        if ( collectionName != null ) {
-            String csn = EDGE_COLL_SUFFIX + "|" + collectionName;
-            return csn;
-        }
-
-
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 03aada3..0f31c83 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -32,6 +32,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.corepersistence.results.ResultsLoaderFactory;
+import org.apache.usergrid.corepersistence.results.ResultsLoaderFactoryImpl;
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
@@ -175,6 +179,8 @@ public class CpRelationManager implements RelationManager {
 
     private IndexBucketLocator indexBucketLocator;
 
+    private ResultsLoaderFactory resultsLoaderFactory;
+
 
 
     public CpRelationManager() {}
@@ -230,6 +236,8 @@ public class CpRelationManager implements RelationManager {
         // commented out because it is possible that CP entity has not been created yet
         Assert.notNull( cpHeadEntity, "cpHeadEntity cannot be null" );
 
+        this.resultsLoaderFactory = new ResultsLoaderFactoryImpl( managerCache );
+
         return this;
     }
 
@@ -625,7 +633,7 @@ public class CpRelationManager implements RelationManager {
                     memberScope.getApplication(), 
                     memberScope.getOwner(), 
                     memberScope.getName(),
-                    CpEntityMapUtils.toMap(memberEntity)
+                    CpEntityMapUtils.toMap( memberEntity )
             });
         }
 
@@ -1555,136 +1563,8 @@ public class CpRelationManager implements RelationManager {
         logger.debug("buildResults() for {} from {} candidates", collName, crs.size());
 
 
-        //TODO T.N Change to results loader here
-
-        Results results = null;
-
-        EntityIndex index = managerCache.getEntityIndex(applicationScope);
-        EntityIndexBatch indexBatch = index.createBatch();
-
-
-        // map of the latest versions, we will discard stale indexes
-        Map<Id, CandidateResult> latestVersions = new LinkedHashMap<Id, CandidateResult>();
-
-        Iterator<CandidateResult> iter = crs.iterator();
-        while ( iter.hasNext() ) {
-
-            CandidateResult cr = iter.next();
-
-            CollectionScope collScope = new CollectionScopeImpl( 
-                applicationScope.getApplication(), 
-                applicationScope.getApplication(), 
-                CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-
-            EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
-
-            UUID latestVersion = ecm.getLatestVersion( Collections.singleton( cr.getId()) ).toBlocking().last().getMaxVersion( cr.getId() ).getVersion();
-
-            if ( logger.isDebugEnabled() ) {
-                logger.debug("Getting version for entity {} from scope\n   app {}\n   owner {}\n   name {}", 
-                new Object[] { 
-                    cr.getId(),
-                    collScope.getApplication(), 
-                    collScope.getOwner(), 
-                    collScope.getName() 
-                });
-            }
-
-            if ( latestVersion == null ) {
-                logger.error("Version for Entity {}:{} not found", 
-                        cr.getId().getType(), cr.getId().getUuid());
-                continue;
-            }
-
-            if ( cr.getVersion().compareTo( latestVersion) < 0 )  {
-                logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", 
-                    new Object[] { cr.getId().getUuid(), cr.getId().getType(), 
-                        cr.getVersion(), latestVersion});
-
-                IndexScope indexScope = new IndexScopeImpl(
-                    cpHeadEntity.getId(),
-                    CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-                indexBatch.deindex( indexScope, cr);
-
-                continue;
-            }
-
-            CandidateResult alreadySeen = latestVersions.get( cr.getId() ); 
-
-            if ( alreadySeen == null ) { // never seen it, so add to map
-                latestVersions.put( cr.getId(), cr );
-
-            } else {
-                // we seen this id before, only add entity if we now have newer version
-                if ( latestVersion.compareTo( alreadySeen.getVersion() ) > 0 ) {
-
-                    latestVersions.put( cr.getId(), cr);
+        final Results results = this.resultsLoaderFactory.getLoader( applicationScope, this.headEntity, query.getResultsLevel() ).getResults( crs );
 
-                    IndexScope indexScope = new IndexScopeImpl(
-                        cpHeadEntity.getId(),
-                        CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-                    indexBatch.deindex( indexScope, alreadySeen);
-                }
-            }
-        }
-
-        indexBatch.execute();
-
-        if (query.getLevel().equals(Level.IDS)) {
-
-            List<UUID> ids = new ArrayList<UUID>();
-            for ( Id id : latestVersions.keySet() ) {
-                CandidateResult cr = latestVersions.get(id);
-                ids.add( cr.getId().getUuid() );
-            }
-            results = Results.fromIdList(ids);
-
-        } else if (query.getLevel().equals(Level.REFS)) {
-
-            List<EntityRef> refs = new ArrayList<EntityRef>();
-            for ( Id id : latestVersions.keySet() ) {
-                CandidateResult cr = latestVersions.get(id);
-                refs.add( new SimpleEntityRef( cr.getId().getType(), cr.getId().getUuid()));
-            }
-            results = Results.fromRefList( refs );
-
-        } else {
-
-            List<Entity> entities = new ArrayList<Entity>();
-            for (Id id : latestVersions.keySet()) {
-
-                CandidateResult cr = latestVersions.get(id);
-
-                CollectionScope collScope = new CollectionScopeImpl( 
-                    applicationScope.getApplication(), 
-                    applicationScope.getApplication(), 
-                    CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-
-                EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
-
-                org.apache.usergrid.persistence.model.entity.Entity e = 
-                        ecm.load( cr.getId() ).toBlocking().lastOrDefault(null);
-
-                if ( e == null ) {
-                    logger.error("Entity {}:{} not found", 
-                            cr.getId().getType(), cr.getId().getUuid());
-                    continue;
-                }
-
-                Entity entity = EntityFactory.newEntity(
-                        e.getId().getUuid(), e.getField("type").getValue().toString());
-
-                Map<String, Object> entityMap = CpEntityMapUtils.toMap(e);
-                entity.addProperties(entityMap);
-                entities.add(entity);
-            }
-
-            if (entities.size() == 1) {
-                results = Results.fromEntity(entities.get(0));
-            } else {
-                results = Results.fromEntities(entities);
-            }
-        }
 
         results.setCursor( crs.getCursor() );
         results.setQueryProcessor( new CpQueryProcessor(em, query, headEntity, collName) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java
deleted file mode 100644
index b034a53..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-
-package org.apache.usergrid.corepersistence.results;/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.usergrid.corepersistence.CpNamingUtils;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
-import org.apache.usergrid.persistence.index.query.CandidateResult;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public abstract class AbstractIdLoader implements  ResultsLoader{
-
-    @Override
-    public Results getResults( final ApplicationScope applicationScope, final CandidateResults crs ) {
-//        Map<Id, CandidateResult> latestVersions = new LinkedHashMap<Id, CandidateResult>();
-//
-//               Iterator<CandidateResult> iter = crs.iterator();
-//               while ( iter.hasNext() ) {
-//
-//                   CandidateResult cr = iter.next();
-//
-//                   CollectionScope collScope = new CollectionScopeImpl(
-//                       applicationScope.getApplication(),
-//                       applicationScope.getApplication(),
-//                       CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-//
-//                   EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
-//
-//                   UUID latestVersion = ecm.getLatestVersion( cr.getId() ).toBlocking().lastOrDefault(null);
-//
-//                   if ( logger.isDebugEnabled() ) {
-//                       logger.debug("Getting version for entity {} from scope\n   app {}\n   owner {}\n   name {}",
-//                       new Object[] {
-//                           cr.getId(),
-//                           collScope.getApplication(),
-//                           collScope.getOwner(),
-//                           collScope.getName()
-//                       });
-//                   }
-//
-//                   if ( latestVersion == null ) {
-//                       logger.error("Version for Entity {}:{} not found",
-//                               cr.getId().getType(), cr.getId().getUuid());
-//                       continue;
-//                   }
-//
-//                   if ( cr.getVersion().compareTo( latestVersion) < 0 )  {
-//                       logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
-//                           new Object[] { cr.getId().getUuid(), cr.getId().getType(),
-//                               cr.getVersion(), latestVersion});
-//
-//                       IndexScope indexScope = new IndexScopeImpl(
-//                           cpHeadEntity.getId(),
-//                           CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-//                       indexBatch.deindex( indexScope, cr);
-//
-//                       continue;
-//                   }
-//
-//                   CandidateResult alreadySeen = latestVersions.get( cr.getId() );
-//
-//                   if ( alreadySeen == null ) { // never seen it, so add to map
-//                       latestVersions.put( cr.getId(), cr );
-//
-//                   } else {
-//                       // we seen this id before, only add entity if we now have newer version
-//                       if ( latestVersion.compareTo( alreadySeen.getVersion() ) > 0 ) {
-//
-//                           latestVersions.put( cr.getId(), cr);
-//
-//                           IndexScope indexScope = new IndexScopeImpl(
-//                               cpHeadEntity.getId(),
-//                               CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-//                           indexBatch.deindex( indexScope, alreadySeen);
-//                       }
-//                   }
-//               }
-//
-//               indexBatch.execute();
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java
deleted file mode 100644
index d20f677..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-
-package org.apache.usergrid.corepersistence.results;/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
-
-
-public class EntitiesLoader implements ResultsLoader {
-    @Override
-    public Results getResults( final ApplicationScope scope, final CandidateResults crs ) {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityVerifier.java
new file mode 100644
index 0000000..40cf499
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntityVerifier.java
@@ -0,0 +1,142 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityFactory;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Optional;
+
+
+/**
+ * A loader that verifies versions are correct in cassandra and match elasticsearch
+ */
+public class EntityVerifier implements ResultsVerifier {
+
+    private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
+
+    private EntitySet ids;
+
+    private Map<Id, org.apache.usergrid.persistence.model.entity.Entity> entityMapping;
+
+
+    public EntityVerifier( final int maxSize ) {
+        this.entityMapping = new HashMap<>( maxSize );
+    }
+
+
+    @Override
+    public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) {
+        ids = ecm.load( idsToLoad ).toBlocking().last();
+    }
+
+
+    @Override
+    public boolean isValid( final CandidateResult candidateResult ) {
+        final Id entityId = candidateResult.getId();
+
+        final MvccEntity version = ids.getEntity( entityId );
+
+        //version wasn't found ,deindex
+        if ( version == null ) {
+            logger.warn( "Version for Entity {}:{} not found", entityId.getUuid(), entityId.getUuid() );
+
+
+            return false;
+        }
+
+        final UUID savedVersion = version.getVersion();
+
+        if ( UUIDComparator.staticCompare( savedVersion, candidateResult.getVersion() ) > 0 ) {
+            logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
+                    entityId.getUuid(), entityId.getType(), savedVersion, candidateResult.getVersion()
+            } );
+
+            return false;
+        }
+
+
+        final Optional<org.apache.usergrid.persistence.model.entity.Entity> entity = version.getEntity();
+
+        if ( !entity.isPresent() ) {
+            return false;
+        }
+
+        entityMapping.put( entityId, entity.get() );
+
+        return true;
+    }
+
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+
+        final List<Entity> ugEntities = new ArrayList<>( ids.size() );
+
+        for ( final Id id : ids ) {
+            final org.apache.usergrid.persistence.model.entity.Entity cpEntity = entityMapping.get( id );
+
+            Entity entity = EntityFactory.newEntity( id.getUuid(), id.toString() );
+
+            Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
+            entity.addProperties( entityMap );
+            ugEntities.add( entity );
+        }
+
+        return Results.fromEntities( ugEntities );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
new file mode 100644
index 0000000..3db772b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
@@ -0,0 +1,240 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.CpManagerCache;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+
+
+public class FilteringLoader implements ResultsLoader {
+
+    private static final Logger logger = LoggerFactory.getLogger( FilteringLoader.class );
+
+    private final CpManagerCache managerCache;
+    private final ResultsVerifier resultsLoader;
+    private final Id ownerId;
+    private final ApplicationScope applicationScope;
+
+
+     protected FilteringLoader( final CpManagerCache managerCache, final ResultsVerifier resultsLoader,
+                                final EntityRef ownerId, final ApplicationScope applicationScope ) {
+         this.managerCache = managerCache;
+         this.resultsLoader = resultsLoader;
+         this.ownerId = new SimpleId( ownerId.getUuid(), ownerId.getType());
+         this.applicationScope = applicationScope;
+     }
+
+
+    @Override
+    public Results getResults( final CandidateResults crs ) {
+
+
+        final EntityIndex index = managerCache.getEntityIndex( applicationScope );
+
+        final EntityIndexBatch indexBatch = index.createBatch();
+
+        final Map<Id, Integer> orderIndex = new HashMap<>( crs.size() );
+
+        final Map<Id, CandidateResult> idResultMapping = new HashMap<>( crs.size() );
+
+        final HashMultimap<String, CandidateResult> groupedByScopes = HashMultimap.create( crs.size(), crs.size() );
+
+        final Iterator<CandidateResult> iter = crs.iterator();
+
+
+        /**
+         * Go through the candidates and group them by scope for more efficient retrieval
+         */
+        for ( int i = 0; iter.hasNext(); i++ ) {
+
+            final CandidateResult cr = iter.next();
+
+            final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() );
+
+            final Id entityId = cr.getId();
+
+            //if we've already seen this one, put which ever is greater
+
+            final CandidateResult seen = idResultMapping.get( entityId );
+
+            if ( seen == null ) {
+                idResultMapping.put( entityId, cr );
+                orderIndex.put( entityId, i );
+                groupedByScopes.put( collectionType, cr );
+            }
+
+            //we have seen it, compare them
+            else {
+                final UUID seenVersion = seen.getVersion();
+                final UUID currentVersion = cr.getVersion();
+
+                //this is a newer version, we know we already have a stale entity, add it to be cleaned up
+                if ( UUIDComparator.staticCompare( currentVersion, seenVersion ) > 0 ) {
+
+                    //de-index it
+                    logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
+                            entityId.getUuid(), entityId.getType(), seen, currentVersion
+                    } );
+
+                    //deindex
+                    deIndex( indexBatch, ownerId, cr );
+
+
+                    //TODO, fire the entity repair cleanup task here instead of de-indexing
+
+                    //replace the value with a more current version
+                    idResultMapping.put( entityId, cr );
+                    orderIndex.put( entityId, i );
+                    groupedByScopes.put( collectionType, cr );
+                }
+            }
+        }
+
+
+        //now everything is ordered, and older versions are removed.  Batch fetch versions to verify existence and
+        // correct versions
+
+        final TreeMap<Integer, Id> sortedResults = new TreeMap<>();
+
+        for ( final String scopeName : groupedByScopes.keys() ) {
+
+
+            final Set<CandidateResult> candidateResults = groupedByScopes.get( scopeName );
+
+            final Collection<Id> idsToLoad =
+                    Collections2.transform( candidateResults, new Function<CandidateResult, Id>() {
+                        @Nullable
+                        @Override
+                        public Id apply( @Nullable final CandidateResult input ) {
+                            if ( input == null ) {
+                                return null;
+                            }
+
+                            return input.getId();
+                        }
+                    } );
+
+
+            //now using the scope, load the collection
+
+            /**
+             * Get the collection scope and batch load all the versions
+             */
+            final CollectionScope collScope =
+                    new CollectionScopeImpl( applicationScope.getApplication(), applicationScope.getApplication(),
+                            scopeName );
+
+
+            final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collScope );
+
+
+            //load the results into the loader for this ech
+            resultsLoader.loadResults( idsToLoad, ecm );
+
+
+
+
+
+            //now compare them
+
+            for ( final Id requestedId : idsToLoad ) {
+
+                final CandidateResult cr = idResultMapping.get( requestedId );
+
+                //ask the loader if this is valid, if not discard it
+                if(!resultsLoader.isValid( cr )){
+                    deIndex( indexBatch, ownerId, cr );
+                    continue;
+                }
+
+                //if we get here we're good, we need to add this to our results
+                final int candidateIndex = orderIndex.get( requestedId  );
+
+                sortedResults.put( candidateIndex, requestedId );
+
+            }
+        }
+
+
+        return resultsLoader.getResults(sortedResults.values());
+    }
+
+
+
+
+
+    protected void deIndex( final EntityIndexBatch batch, final Id ownerId, final CandidateResult candidateResult ) {
+        IndexScope indexScope = new IndexScopeImpl( ownerId,
+                CpNamingUtils.getCollectionScopeNameFromEntityType( candidateResult.getId().getType() ) );
+
+        batch.deindex( indexScope, candidateResult );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java
deleted file mode 100644
index 7796028..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-
-package org.apache.usergrid.corepersistence.results;/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
-
-
-public class IdsLoader extends AbstractIdLoader{
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsVerifier.java
new file mode 100644
index 0000000..cf13c74
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsVerifier.java
@@ -0,0 +1,65 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public class IdsVerifier extends VersionVerifier {
+
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+
+        final List<UUID> returnIds = new ArrayList<>( ids.size() );
+
+        for ( final Id id : ids ) {
+            returnIds.add( id.getUuid() );
+        }
+
+
+        return Results.fromIdList( returnIds );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java
deleted file mode 100644
index ad852d5..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-
-package org.apache.usergrid.corepersistence.results;/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
-
-
-public class RefsLoader extends AbstractIdLoader{
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java
new file mode 100644
index 0000000..77443ee
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java
@@ -0,0 +1,62 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public class RefsVerifier extends VersionVerifier {
+
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+        List<EntityRef> refs = new ArrayList<EntityRef>();
+        for ( Id id : ids ) {
+            refs.add( new SimpleEntityRef( id.getType(), id.getUuid() ) );
+        }
+        return Results.fromRefList( refs );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java
index 8a17c6e..299327d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java
@@ -41,6 +41,7 @@ package org.apache.usergrid.corepersistence.results;/*
 import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.model.entity.Id;
 
 
 /**
@@ -50,8 +51,8 @@ public interface ResultsLoader {
 
     /**
      * Using the candidate results, get the results
-     * @param crs
+     * @param  crs The candidate result set
      * @return
      */
-    public Results getResults(final ApplicationScope scope, final CandidateResults crs);
+    public Results getResults( final CandidateResults crs);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
index 5c5892c..9954597 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
@@ -38,6 +38,12 @@ package org.apache.usergrid.corepersistence.results;/*
  */
 
 
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
 /**
  * Factory for creating results
  */
@@ -49,5 +55,6 @@ public interface ResultsLoaderFactory {
      * Get the load for results
      * @return
      */
-    public ResultsLoader getLoader();
+    public ResultsLoader getLoader(final ApplicationScope applicationScope, final EntityRef ownerId,
+                                        final Query.Level resultsLevel );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
index edaca78..7839a13 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
@@ -38,33 +38,51 @@ package org.apache.usergrid.corepersistence.results;/*
  */
 
 
+import org.junit.rules.Verifier;
+
+import org.apache.usergrid.corepersistence.CpManagerCache;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.Id;
+
 import com.google.inject.Inject;
-import com.google.inject.Singleton;
 
 
 /**
  * Factory for creating results
  */
-@Singleton
+
 public class ResultsLoaderFactoryImpl implements ResultsLoaderFactory {
 
 
-    private final EntitiesLoader entityLoader;
-    private final IdsLoader idsLoader;
-    private final RefsLoader refsLoader;
+    private final CpManagerCache managerCache;
 
 
     @Inject
-    public ResultsLoaderFactoryImpl( final EntitiesLoader entityLoader, final IdsLoader idsLoader,
-                                      final RefsLoader refsLoader ) {
-        this.entityLoader = entityLoader;
-        this.idsLoader = idsLoader;
-        this.refsLoader = refsLoader;
+    public ResultsLoaderFactoryImpl( final CpManagerCache managerCache ) {
+        this.managerCache = managerCache;
     }
 
 
     @Override
-    public ResultsLoader getLoader() {
-        return null;
+    public ResultsLoader getLoader( final ApplicationScope applicationScope, final EntityRef ownerId,
+                                    final Query.Level resultsLevel ) {
+
+
+        ResultsVerifier verifier;
+
+        if ( resultsLevel == Query.Level.REFS ) {
+            verifier = new RefsVerifier();
+        }
+        else if ( resultsLevel == Query.Level.IDS ) {
+            verifier = new RefsVerifier();
+        }
+        else {
+            verifier = new EntityVerifier(Query.MAX_LIMIT);
+        }
+
+
+        return new FilteringLoader( managerCache, verifier, ownerId, applicationScope );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java
new file mode 100644
index 0000000..1180ece
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsVerifier.java
@@ -0,0 +1,72 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.Collection;
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public interface ResultsVerifier {
+
+    /**
+     * Load all the candidate ides for verification
+     * @param ids The Id's to load
+     * @param ecm The entity collection manager
+     */
+    public void loadResults(Collection<Id> ids, EntityCollectionManager ecm);
+
+    /**
+     * Return true if the candidate result is a valid result that should be retained.  If it should not
+     * it should also be removed from the list of possible return values in this loader
+     * @param candidateResult
+     * @return
+     */
+    public boolean isValid(CandidateResult candidateResult);
+
+
+    /**
+     * Load the result set with the given ids
+     * @return
+     */
+    public Results getResults(Collection<Id> ids);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6285e605/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java
new file mode 100644
index 0000000..778b0c3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/VersionVerifier.java
@@ -0,0 +1,103 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.Collection;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+
+
+/**
+ * A loader that verifies versions are correct in cassandra and match elasticsearch
+ */
+public abstract class VersionVerifier implements ResultsVerifier {
+
+    private static final Logger logger = LoggerFactory.getLogger( VersionVerifier.class );
+
+    private VersionSet ids;
+
+
+    @Override
+    public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) {
+        ids = ecm.getLatestVersion( idsToLoad ).toBlocking().last();
+    }
+
+
+    @Override
+    public boolean isValid( final CandidateResult candidateResult ) {
+        final Id entityId = candidateResult.getId();
+
+        final MvccLogEntry version = ids.getMaxVersion( entityId );
+
+        //version wasn't found ,deindex
+        if ( version == null ) {
+            logger.warn( "Version for Entity {}:{} not found", entityId.getUuid(), entityId.getUuid() );
+
+
+            return false;
+        }
+
+        final UUID savedVersion = version.getVersion();
+
+        if ( UUIDComparator.staticCompare( savedVersion, candidateResult.getVersion() ) > 0 ) {
+            logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
+                    entityId.getUuid(), entityId.getType(), savedVersion, candidateResult.getVersion()
+            } );
+
+            return false;
+        }
+
+
+        return true;
+    }
+
+
+}