You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/09 19:04:39 UTC

[2/2] git commit: Change query result building logic to discard stale CandidateResults in all cases, and to do repair by reindexing each stale candidate found.

Change query result building logic to discard stale CandidateResults in all cases, and to do repair by reindexing each stale candidate found.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/81d4e0ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/81d4e0ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/81d4e0ea

Branch: refs/heads/two-dot-o
Commit: 81d4e0ea24d7f18e60718fefc33086ced5f5900c
Parents: 159e5fd
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Oct 9 11:33:53 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Oct 9 11:33:53 2014 -0400

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 179 +++++++++++--------
 .../corepersistence/StaleIndexCleanupTest.java  |  43 ++++-
 2 files changed, 136 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/81d4e0ea/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 5f595f4..bcfe215 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -1542,118 +1542,143 @@ public class CpRelationManager implements RelationManager {
     }
     
 
+    /**
+     * Build results from a set of candidates, and discard those that represent stale indexes.
+     * 
+     * @param query Query that was executed
+     * @param crs Candidates to be considered for results
+     * @param collName Name of collection or null if querying all types
+     */
     private Results buildResults(Query query, CandidateResults crs, String collName ) {
 
         logger.debug("buildResults() for {} from {} candidates", collName, crs.size());
 
         Results results = null;
 
-        if ( query.getLevel().equals( Level.IDS )) {
+        EntityIndex index = managerCache.getEntityIndex(applicationScope);
+        EntityIndexBatch indexBatch = index.createBatch();
 
-            // TODO: add stale entity logic here
-            
-            // TODO: replace this with List<Id> someday
-            List<UUID> ids = new ArrayList<UUID>();
-            Iterator<CandidateResult> iter = crs.iterator();
-            while ( iter.hasNext() ) {
-                ids.add( iter.next().getId().getUuid() );
+
+        // map of the latest versions, we will discard stale indexes
+        Map<Id, CandidateResult> latestVersions = new LinkedHashMap<Id, CandidateResult>();
+
+        Iterator<CandidateResult> iter = crs.iterator();
+        while ( iter.hasNext() ) {
+
+            CandidateResult cr = iter.next();
+
+            CollectionScope collScope = new CollectionScopeImpl( 
+                applicationScope.getApplication(), 
+                applicationScope.getApplication(), 
+                CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
+
+            EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
+
+            UUID latestVersion = ecm.getLatestVersion( cr.getId() ).toBlocking().lastOrDefault(null);
+
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Getting version for entity {} from scope\n   app {}\n   owner {}\n   name {}", 
+                new Object[] { 
+                    cr.getId(),
+                    collScope.getApplication(), 
+                    collScope.getOwner(), 
+                    collScope.getName() 
+                });
+            }
+
+            if ( latestVersion == null ) {
+                logger.error("Version for Entity {}:{} not found", 
+                        cr.getId().getType(), cr.getId().getUuid());
+                continue;
             }
-            results = Results.fromIdList( ids );
 
-        } else if ( query.getLevel().equals( Level.REFS )) {
+            if ( cr.getVersion().compareTo( latestVersion) < 0 )  {
+                logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", 
+                    new Object[] { cr.getId().getUuid(), cr.getId().getType(), 
+                        cr.getVersion(), latestVersion});
 
-            // TODO: add stale entity logic here
-            
-            if ( crs.size() == 1 ) {
-                CandidateResult cr = crs.iterator().next();
-                results = Results.fromRef( 
-                    new SimpleEntityRef( cr.getId().getType(), cr.getId().getUuid()));
+                IndexScope indexScope = new IndexScopeImpl(
+                    cpHeadEntity.getId(),
+                    CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
+                indexBatch.deindex( indexScope, cr);
+
+                continue;
+            }
+
+            CandidateResult alreadySeen = latestVersions.get( cr.getId() ); 
+
+            if ( alreadySeen == null ) { // never seen it, so add to map
+                latestVersions.put( cr.getId(), cr );
 
             } else {
+                // we seen this id before, only add entity if we now have newer version
+                if ( latestVersion.compareTo( alreadySeen.getVersion() ) > 0 ) {
 
-                List<EntityRef> entityRefs = new ArrayList<EntityRef>();
-                Iterator<CandidateResult> iter = crs.iterator();
-                while ( iter.hasNext() ) {
-                    Id id = iter.next().getId();
-                    entityRefs.add( new SimpleEntityRef( id.getType(), id.getUuid() ));
-                } 
-                results = Results.fromRefList(entityRefs);
+                    latestVersions.put( cr.getId(), cr);
+
+                    IndexScope indexScope = new IndexScopeImpl(
+                        cpHeadEntity.getId(),
+                        CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
+                    indexBatch.deindex( indexScope, alreadySeen);
+                }
             }
+        }
 
-        } else {
+        indexBatch.execute();
 
-            // first, build map of latest versions of entities
-            Map<Id, org.apache.usergrid.persistence.model.entity.Entity> latestVersions = 
-                new LinkedHashMap<Id, org.apache.usergrid.persistence.model.entity.Entity>();
+        if (query.getLevel().equals(Level.IDS)) {
 
-            Iterator<CandidateResult> iter = crs.iterator();
-            while ( iter.hasNext() ) {
+            List<UUID> ids = new ArrayList<UUID>();
+            for ( Id id : latestVersions.keySet() ) {
+                CandidateResult cr = latestVersions.get(id);
+                ids.add( cr.getId().getUuid() );
+            }
+            results = Results.fromIdList(ids);
+
+        } else if (query.getLevel().equals(Level.REFS)) {
+
+            List<EntityRef> refs = new ArrayList<EntityRef>();
+            for ( Id id : latestVersions.keySet() ) {
+                CandidateResult cr = latestVersions.get(id);
+                refs.add( new SimpleEntityRef( cr.getId().getType(), cr.getId().getUuid()));
+            }
+            results = Results.fromRefList( refs );
 
-                CandidateResult cr = iter.next();
+        } else {
+
+            List<Entity> entities = new ArrayList<Entity>();
+            for (Id id : latestVersions.keySet()) {
+
+                CandidateResult cr = latestVersions.get(id);
 
                 CollectionScope collScope = new CollectionScopeImpl( 
                     applicationScope.getApplication(), 
                     applicationScope.getApplication(), 
                     CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-                EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
 
-                if ( logger.isDebugEnabled() ) {
-                    logger.debug("Loading entity {} from scope\n   app {}\n   owner {}\n   name {}", 
-                    new Object[] { 
-                        cr.getId(),
-                        collScope.getApplication(), 
-                        collScope.getOwner(), 
-                        collScope.getName() 
-                    });
-                }
+                EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
 
-                org.apache.usergrid.persistence.model.entity.Entity e =
-                    ecm.load( cr.getId() ).toBlockingObservable().last();
+                org.apache.usergrid.persistence.model.entity.Entity e = 
+                        ecm.load( cr.getId() ).toBlocking().lastOrDefault(null);
 
                 if ( e == null ) {
-                    logger.error("Entity {}:{} not found", cr.getId().getType(), cr.getId().getUuid());
+                    logger.error("Entity {}:{} not found", 
+                            cr.getId().getType(), cr.getId().getUuid());
                     continue;
                 }
 
-                if ( cr.getVersion().compareTo( e.getVersion()) < 0 )  {
-                    logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", 
-                        new Object[] { cr.getId().getUuid(), cr.getId().getType(), 
-                            cr.getVersion(), e.getVersion()});
-                    continue;
-                }
-
-                org.apache.usergrid.persistence.model.entity.Entity alreadySeen = 
-                    latestVersions.get( e.getId() ); 
-                if ( alreadySeen == null ) { // never seen it, so add to map
-                    latestVersions.put( e.getId(), e);
-
-                } else {
-                    // we seen this id before, only add entity if newer version
-                    if ( e.getVersion().compareTo( alreadySeen.getVersion() ) > 0 ) {
-                        latestVersions.put( e.getId(), e);
-                    }
-                }
-            }
-
-            // now build collection of old-school entities
-            List<Entity> entities = new ArrayList<Entity>();
-            for ( Id id : latestVersions.keySet() ) {
-
-                org.apache.usergrid.persistence.model.entity.Entity e =
-                    latestVersions.get( id );
-
                 Entity entity = EntityFactory.newEntity(
-                    e.getId().getUuid(), e.getField("type").getValue().toString() );
+                        e.getId().getUuid(), e.getField("type").getValue().toString());
 
-                Map<String, Object> entityMap = CpEntityMapUtils.toMap( e );
-                entity.addProperties( entityMap ); 
-                entities.add( entity );
+                Map<String, Object> entityMap = CpEntityMapUtils.toMap(e);
+                entity.addProperties(entityMap);
+                entities.add(entity);
             }
 
-            if ( entities.size() == 1 ) {
-                results = Results.fromEntity( entities.get(0));
+            if (entities.size() == 1) {
+                results = Results.fromEntity(entities.get(0));
             } else {
-                results = Results.fromEntities( entities );
+                results = Results.fromEntities(entities);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/81d4e0ea/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 5fc9af3..c5d5782 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
 import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
@@ -53,6 +54,9 @@ import org.slf4j.LoggerFactory;
 public class StaleIndexCleanupTest extends AbstractCoreIT {
     private static final Logger logger = LoggerFactory.getLogger(StaleIndexCleanupTest.class );
 
+    private static final long writeDelayMs = 80;
+    //private static final long readDelayMs = 7;
+
 
     @Test
     public void testUpdateVersioning() throws Exception {
@@ -92,45 +96,66 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         logger.info("Started testStaleIndexCleanup()");
 
-        final EntityManager em = app.getEntityManager();
+        // TODO: turn off post processing stuff that cleans up stale entities 
 
-        final List<Entity> things = new ArrayList<Entity>();
+        final EntityManager em = app.getEntityManager();
 
-        int numEntities = 1;
-        int numUpdates = 3;
+        int numEntities = 100;
+        int numUpdates = 10;
 
-        // create 100 entities
+        // create lots of entities
+        final List<Entity> things = new ArrayList<Entity>();
         for ( int i=0; i<numEntities; i++) {
             final String thingName = "thing" + i;
             things.add( em.create("thing", new HashMap<String, Object>() {{
                 put("name", thingName);
             }}));
+            Thread.sleep( writeDelayMs );
         }
         em.refreshIndex();
 
         CandidateResults crs = queryCollectionCp( "things", "select *");
         Assert.assertEquals( numEntities, crs.size() );
 
-        // update each one 10 times
+        // update each one a bunch of times
+        int count = 0;
         for ( Entity thing : things ) {
 
             for ( int j=0; j<numUpdates; j++) {
+
                 Entity toUpdate = em.get( thing.getUuid() );
                 thing.setProperty( "property"  + j, RandomStringUtils.randomAlphanumeric(10));
                 em.update(toUpdate);
+
+                Thread.sleep( writeDelayMs );
                 em.refreshIndex();
+                count++;
+
+                if ( count % 100 == 0 ) {
+                    logger.info("Updated {} of {} times", count, numEntities * numUpdates);
+                }
             }
         }
 
-        // new query for total number of result candidates = 1000
+        // query Core Persistence directly for total number of result candidates
+        // should be entities X updates because of stale indexes 
         crs = queryCollectionCp("things", "select *");
         Assert.assertEquals( numEntities * numUpdates, crs.size() );
 
-        // query for results, should be 100 (and it triggers background clean up of stale indexes)
+        // query EntityManager for results
+        // should return 100 becuase it filters out the stale entities
+        Query q = Query.fromQL("select *");
+        q.setLimit( 10000 );
+        Results results = em.searchCollection( em.getApplicationRef(), "things", q);
+        assertEquals( numEntities, results.size() );
 
+        // EntityManager should have kicked off a batch cleanup of those stale indexes
         // wait a second for batch cleanup to complete
+        Thread.sleep(600);
 
-        // query for total number of result candidates = 1000
+        // query for total number of result candidates = 100
+        crs = queryCollectionCp("things", "select *");
+        Assert.assertEquals( numEntities, crs.size() );
     }