You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/09 21:23:43 UTC

[1/4] git commit: Add new "public Observable getLatestVersion( Id entityId )" method to the collection manager.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/collection_multiget f1a88aa90 -> 63094abc3


Add new "public Observable<UUID> getLatestVersion( Id entityId )" method to the collection manager.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/159e5fd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/159e5fd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/159e5fd4

Branch: refs/heads/collection_multiget
Commit: 159e5fd447ffbc0f3c6ee649dae48d0e44b8cef1
Parents: 185ecef
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Oct 9 11:32:16 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Oct 9 11:32:16 2014 -0400

----------------------------------------------------------------------
 .../collection/EntityCollectionManager.java     | 15 +++-
 .../exception/WriteUniqueVerifyException.java   |  1 -
 .../collection/guice/CollectionModule.java      | 12 +--
 .../impl/EntityCollectionManagerImpl.java       | 78 ++++++++++++--------
 .../collection/mvcc/stage/load/GetVersion.java  | 74 +++++++++++++++++++
 .../collection/EntityCollectionManagerIT.java   | 17 +++++
 6 files changed, 154 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index b49989f..6756cdc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -19,6 +19,7 @@
 package org.apache.usergrid.persistence.collection;
 
 
+import java.util.UUID;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -51,16 +52,22 @@ public interface EntityCollectionManager {
      */
     public Observable<Entity> load( Id entityId );
 
-    //TODO TN Change load to use multiget and return multiple entities.  Only supports loading 1k per load operation.
+    /**
+     * Return the latest version of the specified entity.
+     */
+    public Observable<UUID> getLatestVersion( Id entityId );
 
+    //TODO TN Change load to use multiget and return multiple entities.  
+    // Only supports loading 1k per load operation.
 
-    //TODO Dave add a load versions using a multiget that will return a latest version structure for a collection of entity Ids
+    //TODO Dave add a load versions using a multiget that will return a latest version 
+    //structure for a collection of entity Ids
 
 
     /**
      * Takes the change and reloads an entity with all changes applied in this entity applied.
-     * The resulting entity from calling load will be the previous version of this entity + the entity
-     * in this object applied to it.
+     * The resulting entity from calling load will be the previous version of this entity + the 
+     * entityin this object applied to it.
      * @param entity
      * @return
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
index 64ca777..7e7f05b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/exception/WriteUniqueVerifyException.java
@@ -22,7 +22,6 @@ import java.util.Map;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.field.Field;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 3336166..cb0087b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -64,11 +64,10 @@ public class CollectionModule extends AbstractModule {
         install( new ServiceModule() );
 
         // create a guice factor for getting our collection manager
-        install(
-                new FactoryModuleBuilder().implement( EntityCollectionManager.class, EntityCollectionManagerImpl.class )
-                                          .implement( EntityCollectionManagerSync.class,
-                                                  EntityCollectionManagerSyncImpl.class )
-                                          .build( EntityCollectionManagerFactory.class ) );
+        install( new FactoryModuleBuilder()
+            .implement( EntityCollectionManager.class, EntityCollectionManagerImpl.class )
+            .implement( EntityCollectionManagerSync.class, EntityCollectionManagerSyncImpl.class )
+            .build( EntityCollectionManagerFactory.class ) );
 
         bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );
 
@@ -101,7 +100,8 @@ public class CollectionModule extends AbstractModule {
     @Provides
     @CollectionTaskExecutor
     public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
-        return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
+        return new NamedTaskExecutorImpl( "collectiontasks", 
+                serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index b2b07e9..0efb21c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.usergrid.persistence.collection.impl;
 
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +32,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.load.Load;
+import org.apache.usergrid.persistence.collection.mvcc.stage.load.GetVersion;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
@@ -47,6 +47,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
+import java.util.UUID;
 
 import rx.Observable;
 import rx.functions.Action1;
@@ -75,6 +76,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final WriteOptimisticVerify writeOptimisticVerify;
     private final WriteCommit writeCommit;
     private final RollbackAction rollback;
+    private final GetVersion getVersion;
 
     //load stages
     private final Load load;
@@ -87,15 +89,21 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final TaskExecutor taskExecutor;
 
     @Inject
-    public EntityCollectionManagerImpl( final UUIDService uuidService, @Write final WriteStart writeStart,
-                                        @WriteUpdate final WriteStart writeUpdate,
-                                        final WriteUniqueVerify writeVerifyUnique,
-                                        final WriteOptimisticVerify writeOptimisticVerify,
-                                        final WriteCommit writeCommit, final RollbackAction rollback, final Load load,
-                                        final MarkStart markStart, final MarkCommit markCommit,
-                                        @CollectionTaskExecutor
-                                        final TaskExecutor taskExecutor,
-                                        @Assisted final CollectionScope collectionScope) {
+    public EntityCollectionManagerImpl( 
+            final UUIDService uuidService, 
+            @Write final WriteStart writeStart,
+            @WriteUpdate final WriteStart writeUpdate,
+            final WriteUniqueVerify writeVerifyUnique,
+            final WriteOptimisticVerify writeOptimisticVerify,
+            final WriteCommit writeCommit, 
+            final RollbackAction rollback, 
+            final Load load,
+            final MarkStart markStart, 
+            final MarkCommit markCommit,
+            final GetVersion getVersion,
+            @CollectionTaskExecutor
+            final TaskExecutor taskExecutor,
+            @Assisted final CollectionScope collectionScope) {
 
         Preconditions.checkNotNull( uuidService, "uuidService must be defined" );
 
@@ -111,6 +119,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         this.load = load;
         this.markStart = markStart;
         this.markCommit = markCommit;
+        this.getVersion = getVersion;
 
         this.uuidService = uuidService;
         this.collectionScope = collectionScope;
@@ -180,8 +189,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" );
         Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" );
 
-        return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) )
-                         .map( load );
+        return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) ).map(load);
     }
 
     @Override
@@ -212,7 +220,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                //we an update, signal the fix
 
                 //TODO T.N Change this to fire a task
-                Observable.from( new CollectionIoEvent<Id>(collectionScope, entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();
+                Observable.from( new CollectionIoEvent<Id>(collectionScope, entityId ) )
+                        .map( load ).subscribeOn( Schedulers.io() ).subscribe();
 
 
             }
@@ -220,34 +229,39 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     }
 
     // fire the stages
-    public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData, WriteStart writeState ) {
-
-        return Observable.from( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
-
-                    @Override
-                    public void call(
-                            final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
+    public Observable<CollectionIoEvent<MvccEntity>> stageRunner( 
+            CollectionIoEvent<Entity> writeData, WriteStart writeState ) {
 
-                        Observable<CollectionIoEvent<MvccEntity>> unique =
-                                Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
-                                          .doOnNext( writeVerifyUnique );
+        return Observable.from( writeData ).map( writeState ).doOnNext( 
+            new Action1<CollectionIoEvent<MvccEntity>>() {
 
+                @Override
+                public void call(
+                        final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
 
-                        // optimistic verification
-                        Observable<CollectionIoEvent<MvccEntity>> optimistic =
-                                Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
-                                          .doOnNext( writeOptimisticVerify );
+                    Observable<CollectionIoEvent<MvccEntity>> unique =
+                        Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io())
+                            .doOnNext( writeVerifyUnique );
 
 
-                        //wait for both to finish
-                        Observable.merge( unique, optimistic ).toBlocking().last();
+                    // optimistic verification
+                    Observable<CollectionIoEvent<MvccEntity>> optimistic =
+                        Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io())
+                            .doOnNext( writeOptimisticVerify );
 
 
-                    }
-                } );
+                    //wait for both to finish
+                    Observable.merge( unique, optimistic ).toBlocking().last();
+                }
+            } 
+        );
     }
 
 
-
+    @Override
+    public Observable<UUID> getLatestVersion(Id entityId) {
+        return Observable.from( 
+                new CollectionIoEvent<Id>( collectionScope, entityId ) ).map(getVersion);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/GetVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/GetVersion.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/GetVersion.java
new file mode 100644
index 0000000..2069818
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/load/GetVersion.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.mvcc.stage.load;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import java.util.List;
+import java.util.UUID;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.functions.Func1;
+
+
+/** 
+ * Gets the latest version UUID for an Entity without loading the Entity.
+ */
+public class GetVersion implements Func1<CollectionIoEvent<Id>, UUID> {
+
+    private static final Logger LOG = LoggerFactory.getLogger( GetVersion.class );
+
+    private final MvccLogEntrySerializationStrategy logStrat;
+
+
+    @Inject
+    public GetVersion( final MvccLogEntrySerializationStrategy logStrat ) {
+        Preconditions.checkNotNull( logStrat, "logStrat is required" );
+        this.logStrat = logStrat;
+    }
+
+
+    @Override
+    public UUID call( CollectionIoEvent<Id> idEvent ) {
+
+        Id id = idEvent.getEvent();
+        CollectionScope cs = idEvent.getEntityCollection();
+
+        final UUID latestVersion;
+        try {
+            List<MvccLogEntry> logEntries = logStrat.load( cs, id, UUIDGenerator.newTimeUUID(), 1 );
+            latestVersion = logEntries.get(0).getVersion();
+
+        } catch (ConnectionException ex) {
+            throw new RuntimeException("Unable to get latest version of entity " +
+                id.getType() + ":" + id.getUuid().toString(), ex );
+        }
+       
+        return latestVersion;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/159e5fd4/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index 9e8bbe3..34061ee 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -392,4 +392,21 @@ public class EntityCollectionManagerIT {
         assertTrue( UUIDComparator.staticCompare( newVersion, oldVersion ) > 0);
     }
 
+    @Test
+    public void testGetVersion() {
+
+        CollectionScope context = new CollectionScopeImpl(
+                new SimpleId( "organization" ),  new SimpleId( "test" ), "test" );
+
+        Entity newEntity = new Entity( new SimpleId( "test" ) );
+        EntityCollectionManager manager = factory.createCollectionManager( context );
+        Observable<Entity> observable = manager.write( newEntity );
+        Entity created = observable.toBlocking().lastOrDefault( null );
+
+        assertNotNull("Id was assigned", created.getId() );
+        assertNotNull("Version was assigned", created.getVersion() );
+
+        assertTrue(UUIDComparator.staticCompare(created.getVersion(), 
+            manager.getLatestVersion( created.getId() ).toBlocking().lastOrDefault(null)) == 0);
+    }
 }


[3/4] git commit: Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into collection_multiget

Posted by to...@apache.org.
Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into collection_multiget

Conflicts:
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
	stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ac61f132
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ac61f132
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ac61f132

Branch: refs/heads/collection_multiget
Commit: ac61f132372f653cf14561fe7a03cb7cb6d8a79c
Parents: f1a88aa 81d4e0e
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 9 11:47:51 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 9 11:47:51 2014 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 179 +++++++++++--------
 .../corepersistence/StaleIndexCleanupTest.java  |  43 ++++-
 .../collection/EntityCollectionManager.java     |   5 +
 .../exception/WriteUniqueVerifyException.java   |   1 -
 .../collection/guice/CollectionModule.java      |  12 +-
 .../impl/EntityCollectionManagerImpl.java       |  13 +-
 .../collection/EntityCollectionManagerIT.java   |  17 ++
 7 files changed, 175 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac61f132/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index e5917b0,6756cdc..2625078
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@@ -19,8 -19,7 +19,9 @@@
  package org.apache.usergrid.persistence.collection;
  
  
 +import java.util.Collection;
 +
+ import java.util.UUID;
  import org.apache.usergrid.persistence.model.entity.Entity;
  import org.apache.usergrid.persistence.model.entity.Id;
  
@@@ -53,17 -52,16 +54,21 @@@ public interface EntityCollectionManage
       */
      public Observable<Entity> load( Id entityId );
  
+     /**
+      * Return the latest version of the specified entity.
+      */
+     public Observable<UUID> getLatestVersion( Id entityId );
 +    //TODO TN Change load to use multiget and return multiple entities.  Only supports loading 1k per load operation.
 +
 +    /**
 +     * Load all the entityIds into the observable entity set
 +     * @param entityIds
 +     * @return
 +     */
 +    public Observable<EntitySet> load(Collection<Id> entityIds);
  
 -    //TODO TN Change load to use multiget and return multiple entities.  
 -    // Only supports loading 1k per load operation.
  
 -    //TODO Dave add a load versions using a multiget that will return a latest version 
 -    //structure for a collection of entity Ids
 +    //TODO Dave add a load versions using a multiget that will return a latest version structure for a collection of entity Ids
  
  
      /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac61f132/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 39df9cb,0efb21c..5fe2fbb
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@@ -80,7 -76,11 +83,8 @@@ public class EntityCollectionManagerImp
      private final WriteOptimisticVerify writeOptimisticVerify;
      private final WriteCommit writeCommit;
      private final RollbackAction rollback;
+     private final GetVersion getVersion;
  
 -    //load stages
 -    private final Load load;
 -
  
      //delete stages
      private final MarkStart markStart;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac61f132/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index 823cb87,34061ee..a7f9ea2
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@@ -402,130 -389,24 +402,147 @@@ public class EntityCollectionManagerIT 
          assertNotNull( "A new version must be assigned", newVersion );
  
          // new Version should be > old version
 -        assertTrue( UUIDComparator.staticCompare( newVersion, oldVersion ) > 0);
 +        assertTrue( UUIDComparator.staticCompare( newVersion, oldVersion ) > 0 );
 +    }
 +
 +
 +    @Test
 +    public void writeMultiget() {
 +
 +        final CollectionScope context =
 +                new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
 +        final EntityCollectionManager manager = factory.createCollectionManager( context );
 +
 +        final int multigetSize = serializationFig.getMaxLoadSize();
 +
 +        final List<Entity> writtenEntities = new ArrayList<>( multigetSize );
 +        final List<Id> entityIds = new ArrayList<>( multigetSize );
 +
 +        for ( int i = 0; i < multigetSize; i++ ) {
 +            final Entity entity = new Entity( new SimpleId( "test" ) );
 +
 +            final Entity written = manager.write( entity ).toBlocking().last();
 +
 +            writtenEntities.add( written );
 +            entityIds.add( written.getId() );
 +        }
 +
 +
 +        final EntitySet entitySet = manager.load( entityIds ).toBlocking().lastOrDefault( null );
 +
 +        assertNotNull( entitySet );
 +
 +        assertEquals(multigetSize, entitySet.size());
 +        assertFalse(entitySet.isEmpty());
 +
 +        /**
 +         * Validate every element exists
 +         */
 +        for(int i = 0; i < multigetSize; i ++){
 +            final Entity expected = writtenEntities.get( i );
 +
 +            final MvccEntity returned = entitySet.getEntity( expected.getId() );
 +
 +            assertEquals("Same entity returned", expected, returned.getEntity().get());
 +        }
 +
 +
      }
  
 +
 +    /**
 +     * Perform a multiget where every entity will need repaired on load
 +     */
 +    @Test
 +     public void writeMultigetRepair() {
 +
 +           final CollectionScope context =
 +                   new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
 +           final EntityCollectionManager manager = factory.createCollectionManager( context );
 +
 +           final int multigetSize = serializationFig.getMaxLoadSize();
 +
 +           final List<Entity> writtenEntities = new ArrayList<>( multigetSize );
 +           final List<Id> entityIds = new ArrayList<>( multigetSize );
 +
 +           for ( int i = 0; i < multigetSize; i++ ) {
 +               final Entity entity = new Entity( new SimpleId( "test" ) );
 +
 +               final Entity written = manager.write( entity ).toBlocking().last();
 +
 +               written.setField( new BooleanField( "updated", true ) );
 +
 +               final Entity updated  = manager.update( written ).toBlocking().last();
 +
 +               writtenEntities.add( updated );
 +               entityIds.add( updated.getId() );
 +           }
 +
 +
 +           final EntitySet entitySet = manager.load( entityIds ).toBlocking().lastOrDefault( null );
 +
 +           assertNotNull( entitySet );
 +
 +           assertEquals(multigetSize, entitySet.size());
 +           assertFalse(entitySet.isEmpty());
 +
 +           /**
 +            * Validate every element exists
 +            */
 +           for(int i = 0; i < multigetSize; i ++){
 +               final Entity expected = writtenEntities.get( i );
 +
 +               final MvccEntity returned = entitySet.getEntity( expected.getId() );
 +
 +               assertEquals("Same entity returned", expected, returned.getEntity().get());
 +
 +               assertTrue( ( Boolean ) returned.getEntity().get().getField( "updated" ).getValue() );
 +           }
 +
 +
 +       }
 +
 +
 +
 +    @Test(expected = IllegalArgumentException.class)
 +    public void readTooLarge() {
 +
 +        final CollectionScope context =
 +                new CollectionScopeImpl( new SimpleId( "organization" ), new SimpleId( "test" ), "test" );
 +        final EntityCollectionManager manager = factory.createCollectionManager( context );
 +
 +        final int multigetSize = serializationFig.getMaxLoadSize() +1;
 +
 +
 +        final List<Id> entityIds = new ArrayList<>( multigetSize );
 +
 +        for ( int i = 0; i < multigetSize; i++ ) {
 +
 +            entityIds.add( new SimpleId( "simple" ) );
 +        }
 +
 +
 +        //should throw an exception
 +        manager.load( entityIds ).toBlocking().lastOrDefault( null );
 +
 +
 +
 +    }
+     @Test
+     public void testGetVersion() {
+ 
+         CollectionScope context = new CollectionScopeImpl(
+                 new SimpleId( "organization" ),  new SimpleId( "test" ), "test" );
+ 
+         Entity newEntity = new Entity( new SimpleId( "test" ) );
+         EntityCollectionManager manager = factory.createCollectionManager( context );
+         Observable<Entity> observable = manager.write( newEntity );
+         Entity created = observable.toBlocking().lastOrDefault( null );
+ 
+         assertNotNull("Id was assigned", created.getId() );
+         assertNotNull("Version was assigned", created.getVersion() );
+ 
+         assertTrue(UUIDComparator.staticCompare(created.getVersion(), 
+             manager.getLatestVersion( created.getId() ).toBlocking().lastOrDefault(null)) == 0);
+     }
  }


[4/4] git commit: Initial refactor of results loading to use multiget. Need to push load logic down to impelmentations.

Posted by to...@apache.org.
Initial refactor of results loading to use multiget.  Need to push load logic down to impelmentations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/63094abc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/63094abc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/63094abc

Branch: refs/heads/collection_multiget
Commit: 63094abc3d52fb6139d8a0828328d0bc4c111e0c
Parents: ac61f13
Author: Todd Nine <to...@apache.org>
Authored: Thu Oct 9 13:23:35 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Oct 9 13:23:35 2014 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityDeleteListener.java |   2 +-
 .../corepersistence/CpRelationManager.java      |   6 +-
 .../results/AbstractIdLoader.java               | 130 +++++++++++++++++++
 .../corepersistence/results/EntitiesLoader.java |  51 ++++++++
 .../corepersistence/results/IdsLoader.java      |  46 +++++++
 .../corepersistence/results/RefsLoader.java     |  46 +++++++
 .../corepersistence/results/ResultsLoader.java  |  57 ++++++++
 .../results/ResultsLoaderFactory.java           |  53 ++++++++
 .../results/ResultsLoaderFactoryImpl.java       |  70 ++++++++++
 .../CpEntityDeleteListenerTest.java             |   2 +-
 .../CpEntityIndexDeleteListenerTest.java        |   3 +-
 11 files changed, 461 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java
index 6b6fa59..70df7d5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityDeleteEvent;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.core.entity.EntityVersion;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index bcfe215..03aada3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -1551,8 +1551,12 @@ public class CpRelationManager implements RelationManager {
      */
     private Results buildResults(Query query, CandidateResults crs, String collName ) {
 
+
         logger.debug("buildResults() for {} from {} candidates", collName, crs.size());
 
+
+        //TODO T.N Change to results loader here
+
         Results results = null;
 
         EntityIndex index = managerCache.getEntityIndex(applicationScope);
@@ -1574,7 +1578,7 @@ public class CpRelationManager implements RelationManager {
 
             EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
 
-            UUID latestVersion = ecm.getLatestVersion( cr.getId() ).toBlocking().lastOrDefault(null);
+            UUID latestVersion = ecm.getLatestVersion( Collections.singleton( cr.getId()) ).toBlocking().last().getMaxVersion( cr.getId() ).getVersion();
 
             if ( logger.isDebugEnabled() ) {
                 logger.debug("Getting version for entity {} from scope\n   app {}\n   owner {}\n   name {}", 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java
new file mode 100644
index 0000000..b034a53
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/AbstractIdLoader.java
@@ -0,0 +1,130 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.usergrid.corepersistence.CpNamingUtils;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public abstract class AbstractIdLoader implements  ResultsLoader{
+
+    @Override
+    public Results getResults( final ApplicationScope applicationScope, final CandidateResults crs ) {
+//        Map<Id, CandidateResult> latestVersions = new LinkedHashMap<Id, CandidateResult>();
+//
+//               Iterator<CandidateResult> iter = crs.iterator();
+//               while ( iter.hasNext() ) {
+//
+//                   CandidateResult cr = iter.next();
+//
+//                   CollectionScope collScope = new CollectionScopeImpl(
+//                       applicationScope.getApplication(),
+//                       applicationScope.getApplication(),
+//                       CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
+//
+//                   EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
+//
+//                   UUID latestVersion = ecm.getLatestVersion( cr.getId() ).toBlocking().lastOrDefault(null);
+//
+//                   if ( logger.isDebugEnabled() ) {
+//                       logger.debug("Getting version for entity {} from scope\n   app {}\n   owner {}\n   name {}",
+//                       new Object[] {
+//                           cr.getId(),
+//                           collScope.getApplication(),
+//                           collScope.getOwner(),
+//                           collScope.getName()
+//                       });
+//                   }
+//
+//                   if ( latestVersion == null ) {
+//                       logger.error("Version for Entity {}:{} not found",
+//                               cr.getId().getType(), cr.getId().getUuid());
+//                       continue;
+//                   }
+//
+//                   if ( cr.getVersion().compareTo( latestVersion) < 0 )  {
+//                       logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
+//                           new Object[] { cr.getId().getUuid(), cr.getId().getType(),
+//                               cr.getVersion(), latestVersion});
+//
+//                       IndexScope indexScope = new IndexScopeImpl(
+//                           cpHeadEntity.getId(),
+//                           CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
+//                       indexBatch.deindex( indexScope, cr);
+//
+//                       continue;
+//                   }
+//
+//                   CandidateResult alreadySeen = latestVersions.get( cr.getId() );
+//
+//                   if ( alreadySeen == null ) { // never seen it, so add to map
+//                       latestVersions.put( cr.getId(), cr );
+//
+//                   } else {
+//                       // we seen this id before, only add entity if we now have newer version
+//                       if ( latestVersion.compareTo( alreadySeen.getVersion() ) > 0 ) {
+//
+//                           latestVersions.put( cr.getId(), cr);
+//
+//                           IndexScope indexScope = new IndexScopeImpl(
+//                               cpHeadEntity.getId(),
+//                               CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
+//                           indexBatch.deindex( indexScope, alreadySeen);
+//                       }
+//                   }
+//               }
+//
+//               indexBatch.execute();
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java
new file mode 100644
index 0000000..d20f677
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/EntitiesLoader.java
@@ -0,0 +1,51 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+
+
+public class EntitiesLoader implements ResultsLoader {
+    @Override
+    public Results getResults( final ApplicationScope scope, final CandidateResults crs ) {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java
new file mode 100644
index 0000000..7796028
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdsLoader.java
@@ -0,0 +1,46 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+
+
+public class IdsLoader extends AbstractIdLoader{
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java
new file mode 100644
index 0000000..ad852d5
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsLoader.java
@@ -0,0 +1,46 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+
+
+public class RefsLoader extends AbstractIdLoader{
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java
new file mode 100644
index 0000000..8a17c6e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoader.java
@@ -0,0 +1,57 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+
+
+/**
+ * Interface for loading results
+ */
+public interface ResultsLoader {
+
+    /**
+     * Using the candidate results, get the results
+     * @param crs
+     * @return
+     */
+    public Results getResults(final ApplicationScope scope, final CandidateResults crs);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
new file mode 100644
index 0000000..5c5892c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
@@ -0,0 +1,53 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+/**
+ * Factory for creating results
+ */
+
+public interface ResultsLoaderFactory {
+
+
+    /**
+     * Get the load for results
+     * @return
+     */
+    public ResultsLoader getLoader();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
new file mode 100644
index 0000000..edaca78
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
@@ -0,0 +1,70 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.corepersistence.results;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * Factory for creating results
+ */
+@Singleton
+public class ResultsLoaderFactoryImpl implements ResultsLoaderFactory {
+
+
+    private final EntitiesLoader entityLoader;
+    private final IdsLoader idsLoader;
+    private final RefsLoader refsLoader;
+
+
+    @Inject
+    public ResultsLoaderFactoryImpl( final EntitiesLoader entityLoader, final IdsLoader idsLoader,
+                                      final RefsLoader refsLoader ) {
+        this.entityLoader = entityLoader;
+        this.idsLoader = idsLoader;
+        this.refsLoader = refsLoader;
+    }
+
+
+    @Override
+    public ResultsLoader getLoader() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityDeleteListenerTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityDeleteListenerTest.java
index dfea003..16d2e79 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityDeleteListenerTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityDeleteListenerTest.java
@@ -28,7 +28,7 @@ import org.junit.runner.RunWith;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityDeleteEvent;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.core.entity.EntityVersion;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/63094abc/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
index 77c62c5..6b92d90 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
@@ -29,9 +29,8 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
-import org.apache.usergrid.Application;
 import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityDeleteEvent;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;


[2/4] git commit: Change query result building logic to discard stale CandidateResults in all cases, and to do repair by reindexing each stale candidate found.

Posted by to...@apache.org.
Change query result building logic to discard stale CandidateResults in all cases, and to do repair by reindexing each stale candidate found.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/81d4e0ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/81d4e0ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/81d4e0ea

Branch: refs/heads/collection_multiget
Commit: 81d4e0ea24d7f18e60718fefc33086ced5f5900c
Parents: 159e5fd
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Oct 9 11:33:53 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Oct 9 11:33:53 2014 -0400

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 179 +++++++++++--------
 .../corepersistence/StaleIndexCleanupTest.java  |  43 ++++-
 2 files changed, 136 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/81d4e0ea/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 5f595f4..bcfe215 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -1542,118 +1542,143 @@ public class CpRelationManager implements RelationManager {
     }
     
 
+    /**
+     * Build results from a set of candidates, and discard those that represent stale indexes.
+     * 
+     * @param query Query that was executed
+     * @param crs Candidates to be considered for results
+     * @param collName Name of collection or null if querying all types
+     */
     private Results buildResults(Query query, CandidateResults crs, String collName ) {
 
         logger.debug("buildResults() for {} from {} candidates", collName, crs.size());
 
         Results results = null;
 
-        if ( query.getLevel().equals( Level.IDS )) {
+        EntityIndex index = managerCache.getEntityIndex(applicationScope);
+        EntityIndexBatch indexBatch = index.createBatch();
 
-            // TODO: add stale entity logic here
-            
-            // TODO: replace this with List<Id> someday
-            List<UUID> ids = new ArrayList<UUID>();
-            Iterator<CandidateResult> iter = crs.iterator();
-            while ( iter.hasNext() ) {
-                ids.add( iter.next().getId().getUuid() );
+
+        // map of the latest versions, we will discard stale indexes
+        Map<Id, CandidateResult> latestVersions = new LinkedHashMap<Id, CandidateResult>();
+
+        Iterator<CandidateResult> iter = crs.iterator();
+        while ( iter.hasNext() ) {
+
+            CandidateResult cr = iter.next();
+
+            CollectionScope collScope = new CollectionScopeImpl( 
+                applicationScope.getApplication(), 
+                applicationScope.getApplication(), 
+                CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
+
+            EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
+
+            UUID latestVersion = ecm.getLatestVersion( cr.getId() ).toBlocking().lastOrDefault(null);
+
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Getting version for entity {} from scope\n   app {}\n   owner {}\n   name {}", 
+                new Object[] { 
+                    cr.getId(),
+                    collScope.getApplication(), 
+                    collScope.getOwner(), 
+                    collScope.getName() 
+                });
+            }
+
+            if ( latestVersion == null ) {
+                logger.error("Version for Entity {}:{} not found", 
+                        cr.getId().getType(), cr.getId().getUuid());
+                continue;
             }
-            results = Results.fromIdList( ids );
 
-        } else if ( query.getLevel().equals( Level.REFS )) {
+            if ( cr.getVersion().compareTo( latestVersion) < 0 )  {
+                logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", 
+                    new Object[] { cr.getId().getUuid(), cr.getId().getType(), 
+                        cr.getVersion(), latestVersion});
 
-            // TODO: add stale entity logic here
-            
-            if ( crs.size() == 1 ) {
-                CandidateResult cr = crs.iterator().next();
-                results = Results.fromRef( 
-                    new SimpleEntityRef( cr.getId().getType(), cr.getId().getUuid()));
+                IndexScope indexScope = new IndexScopeImpl(
+                    cpHeadEntity.getId(),
+                    CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
+                indexBatch.deindex( indexScope, cr);
+
+                continue;
+            }
+
+            CandidateResult alreadySeen = latestVersions.get( cr.getId() ); 
+
+            if ( alreadySeen == null ) { // never seen it, so add to map
+                latestVersions.put( cr.getId(), cr );
 
             } else {
+                // we seen this id before, only add entity if we now have newer version
+                if ( latestVersion.compareTo( alreadySeen.getVersion() ) > 0 ) {
 
-                List<EntityRef> entityRefs = new ArrayList<EntityRef>();
-                Iterator<CandidateResult> iter = crs.iterator();
-                while ( iter.hasNext() ) {
-                    Id id = iter.next().getId();
-                    entityRefs.add( new SimpleEntityRef( id.getType(), id.getUuid() ));
-                } 
-                results = Results.fromRefList(entityRefs);
+                    latestVersions.put( cr.getId(), cr);
+
+                    IndexScope indexScope = new IndexScopeImpl(
+                        cpHeadEntity.getId(),
+                        CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
+                    indexBatch.deindex( indexScope, alreadySeen);
+                }
             }
+        }
 
-        } else {
+        indexBatch.execute();
 
-            // first, build map of latest versions of entities
-            Map<Id, org.apache.usergrid.persistence.model.entity.Entity> latestVersions = 
-                new LinkedHashMap<Id, org.apache.usergrid.persistence.model.entity.Entity>();
+        if (query.getLevel().equals(Level.IDS)) {
 
-            Iterator<CandidateResult> iter = crs.iterator();
-            while ( iter.hasNext() ) {
+            List<UUID> ids = new ArrayList<UUID>();
+            for ( Id id : latestVersions.keySet() ) {
+                CandidateResult cr = latestVersions.get(id);
+                ids.add( cr.getId().getUuid() );
+            }
+            results = Results.fromIdList(ids);
+
+        } else if (query.getLevel().equals(Level.REFS)) {
+
+            List<EntityRef> refs = new ArrayList<EntityRef>();
+            for ( Id id : latestVersions.keySet() ) {
+                CandidateResult cr = latestVersions.get(id);
+                refs.add( new SimpleEntityRef( cr.getId().getType(), cr.getId().getUuid()));
+            }
+            results = Results.fromRefList( refs );
 
-                CandidateResult cr = iter.next();
+        } else {
+
+            List<Entity> entities = new ArrayList<Entity>();
+            for (Id id : latestVersions.keySet()) {
+
+                CandidateResult cr = latestVersions.get(id);
 
                 CollectionScope collScope = new CollectionScopeImpl( 
                     applicationScope.getApplication(), 
                     applicationScope.getApplication(), 
                     CpNamingUtils.getCollectionScopeNameFromEntityType( cr.getId().getType() ));
-                EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
 
-                if ( logger.isDebugEnabled() ) {
-                    logger.debug("Loading entity {} from scope\n   app {}\n   owner {}\n   name {}", 
-                    new Object[] { 
-                        cr.getId(),
-                        collScope.getApplication(), 
-                        collScope.getOwner(), 
-                        collScope.getName() 
-                    });
-                }
+                EntityCollectionManager ecm = managerCache.getEntityCollectionManager(collScope);
 
-                org.apache.usergrid.persistence.model.entity.Entity e =
-                    ecm.load( cr.getId() ).toBlockingObservable().last();
+                org.apache.usergrid.persistence.model.entity.Entity e = 
+                        ecm.load( cr.getId() ).toBlocking().lastOrDefault(null);
 
                 if ( e == null ) {
-                    logger.error("Entity {}:{} not found", cr.getId().getType(), cr.getId().getUuid());
+                    logger.error("Entity {}:{} not found", 
+                            cr.getId().getType(), cr.getId().getUuid());
                     continue;
                 }
 
-                if ( cr.getVersion().compareTo( e.getVersion()) < 0 )  {
-                    logger.debug("Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", 
-                        new Object[] { cr.getId().getUuid(), cr.getId().getType(), 
-                            cr.getVersion(), e.getVersion()});
-                    continue;
-                }
-
-                org.apache.usergrid.persistence.model.entity.Entity alreadySeen = 
-                    latestVersions.get( e.getId() ); 
-                if ( alreadySeen == null ) { // never seen it, so add to map
-                    latestVersions.put( e.getId(), e);
-
-                } else {
-                    // we seen this id before, only add entity if newer version
-                    if ( e.getVersion().compareTo( alreadySeen.getVersion() ) > 0 ) {
-                        latestVersions.put( e.getId(), e);
-                    }
-                }
-            }
-
-            // now build collection of old-school entities
-            List<Entity> entities = new ArrayList<Entity>();
-            for ( Id id : latestVersions.keySet() ) {
-
-                org.apache.usergrid.persistence.model.entity.Entity e =
-                    latestVersions.get( id );
-
                 Entity entity = EntityFactory.newEntity(
-                    e.getId().getUuid(), e.getField("type").getValue().toString() );
+                        e.getId().getUuid(), e.getField("type").getValue().toString());
 
-                Map<String, Object> entityMap = CpEntityMapUtils.toMap( e );
-                entity.addProperties( entityMap ); 
-                entities.add( entity );
+                Map<String, Object> entityMap = CpEntityMapUtils.toMap(e);
+                entity.addProperties(entityMap);
+                entities.add(entity);
             }
 
-            if ( entities.size() == 1 ) {
-                results = Results.fromEntity( entities.get(0));
+            if (entities.size() == 1) {
+                results = Results.fromEntity(entities.get(0));
             } else {
-                results = Results.fromEntities( entities );
+                results = Results.fromEntities(entities);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/81d4e0ea/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 5fc9af3..c5d5782 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.persistence.Entity;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
 import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
@@ -53,6 +54,9 @@ import org.slf4j.LoggerFactory;
 public class StaleIndexCleanupTest extends AbstractCoreIT {
     private static final Logger logger = LoggerFactory.getLogger(StaleIndexCleanupTest.class );
 
+    private static final long writeDelayMs = 80;
+    //private static final long readDelayMs = 7;
+
 
     @Test
     public void testUpdateVersioning() throws Exception {
@@ -92,45 +96,66 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         logger.info("Started testStaleIndexCleanup()");
 
-        final EntityManager em = app.getEntityManager();
+        // TODO: turn off post processing stuff that cleans up stale entities 
 
-        final List<Entity> things = new ArrayList<Entity>();
+        final EntityManager em = app.getEntityManager();
 
-        int numEntities = 1;
-        int numUpdates = 3;
+        int numEntities = 100;
+        int numUpdates = 10;
 
-        // create 100 entities
+        // create lots of entities
+        final List<Entity> things = new ArrayList<Entity>();
         for ( int i=0; i<numEntities; i++) {
             final String thingName = "thing" + i;
             things.add( em.create("thing", new HashMap<String, Object>() {{
                 put("name", thingName);
             }}));
+            Thread.sleep( writeDelayMs );
         }
         em.refreshIndex();
 
         CandidateResults crs = queryCollectionCp( "things", "select *");
         Assert.assertEquals( numEntities, crs.size() );
 
-        // update each one 10 times
+        // update each one a bunch of times
+        int count = 0;
         for ( Entity thing : things ) {
 
             for ( int j=0; j<numUpdates; j++) {
+
                 Entity toUpdate = em.get( thing.getUuid() );
                 thing.setProperty( "property"  + j, RandomStringUtils.randomAlphanumeric(10));
                 em.update(toUpdate);
+
+                Thread.sleep( writeDelayMs );
                 em.refreshIndex();
+                count++;
+
+                if ( count % 100 == 0 ) {
+                    logger.info("Updated {} of {} times", count, numEntities * numUpdates);
+                }
             }
         }
 
-        // new query for total number of result candidates = 1000
+        // query Core Persistence directly for total number of result candidates
+        // should be entities X updates because of stale indexes 
         crs = queryCollectionCp("things", "select *");
         Assert.assertEquals( numEntities * numUpdates, crs.size() );
 
-        // query for results, should be 100 (and it triggers background clean up of stale indexes)
+        // query EntityManager for results
+        // should return 100 becuase it filters out the stale entities
+        Query q = Query.fromQL("select *");
+        q.setLimit( 10000 );
+        Results results = em.searchCollection( em.getApplicationRef(), "things", q);
+        assertEquals( numEntities, results.size() );
 
+        // EntityManager should have kicked off a batch cleanup of those stale indexes
         // wait a second for batch cleanup to complete
+        Thread.sleep(600);
 
-        // query for total number of result candidates = 1000
+        // query for total number of result candidates = 100
+        crs = queryCollectionCp("things", "select *");
+        Assert.assertEquals( numEntities, crs.size() );
     }