You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/20 00:26:53 UTC

[36/50] [abbrv] incubator-usergrid git commit: Merge branch 'two-dot-o' into USERGRID-405

Merge branch 'two-dot-o' into USERGRID-405

Conflicts:
	stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
	stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
	stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
	stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
	stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
	stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
	stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7b5d2224
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7b5d2224
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7b5d2224

Branch: refs/heads/USERGRID-480
Commit: 7b5d2224512f0043336acbd9b9323e1e32edf888
Parents: 5c7a5f8 b53cb07
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 17 11:36:08 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 17 11:36:08 2015 -0600

----------------------------------------------------------------------
 .../main/dist/init_instance/init_rest_server.sh |   2 +
 .../dist/init_instance/install_elasticsearch.sh |  22 +-
 .../main/groovy/configure_elasticsearch.groovy  |  80 ++--
 .../src/main/groovy/configure_usergrid.groovy   |  12 +-
 stack/awscluster/ugcluster-cf.json              | 466 +++++++++++++------
 stack/core/pom.xml                              |  10 -
 .../batch/service/JobSchedulerService.java      |   5 -
 .../usergrid/corepersistence/CoreModule.java    |  36 +-
 .../corepersistence/CpEntityManager.java        |  64 ++-
 .../corepersistence/CpEntityManagerFactory.java |   4 -
 .../corepersistence/CpRelationManager.java      |  14 -
 .../events/EntityDeletedHandler.java            |  58 ++-
 .../events/EntityVersionCreatedHandler.java     |  60 ++-
 .../events/EntityVersionDeletedHandler.java     |  53 ++-
 .../results/FilteringLoader.java                |   2 +-
 .../usergrid/persistence/EntityManager.java     |  10 +-
 .../persistence/EntityManagerFactory.java       |   2 -
 .../cassandra/EntityManagerFactoryImpl.java     |   3 -
 .../cassandra/EntityManagerImpl.java            |  31 +-
 .../cassandra/RelationManagerImpl.java          |  58 +--
 .../cassandra/index/ConnectedIndexScanner.java  |   2 -
 .../cassandra/index/IndexBucketScanner.java     |   2 -
 .../corepersistence/StaleIndexCleanupTest.java  |  38 +-
 .../usergrid/persistence/EntityManagerIT.java   |   1 +
 .../PerformanceEntityRebuildIndexTest.java      |   2 +
 .../collection/EntityCollectionManager.java     |  20 +-
 .../collection/EntityDeletedFactory.java        |  34 --
 .../collection/EntityVersionCleanupFactory.java |  35 --
 .../collection/EntityVersionCreatedFactory.java |  31 --
 .../persistence/collection/FieldSet.java        |  48 ++
 .../cache/CachedEntityCollectionManager.java    |  11 +-
 .../collection/guice/CollectionModule.java      |  11 +-
 .../EntityCollectionManagerFactoryImpl.java     |  32 +-
 .../impl/EntityCollectionManagerImpl.java       | 251 ++++++++--
 .../collection/impl/EntityDeletedTask.java      |   9 +-
 .../impl/EntityVersionCleanupTask.java          |  34 +-
 .../impl/EntityVersionTaskFactory.java          |  65 +++
 .../mvcc/stage/write/WriteUniqueVerify.java     | 140 +++---
 .../UniqueValueSerializationStrategy.java       |  12 +
 .../serialization/impl/MutableFieldSet.java     |  63 +++
 .../UniqueValueSerializationStrategyImpl.java   |  27 +-
 .../collection/util/EntityUtils.java            |   4 +-
 .../collection/EntityCollectionManagerIT.java   |  68 +++
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |   6 +-
 .../core/astyanax/CassandraConfig.java          |   6 +
 .../core/astyanax/CassandraConfigImpl.java      |   8 +-
 .../persistence/core/astyanax/CassandraFig.java |   9 +-
 .../persistence/core/future/BetterFuture.java   |  43 +-
 .../core/metrics/MetricsFactory.java            |   9 +
 .../core/metrics/MetricsFactoryImpl.java        | 121 +++--
 .../core/astyanax/ColumnNameIteratorTest.java   |   7 +-
 .../MultiKeyColumnNameIteratorTest.java         |   7 +-
 .../astyanax/MultiRowColumnIteratorTest.java    |   7 +-
 stack/corepersistence/graph/pom.xml             |  28 +-
 .../graph/impl/GraphManagerImpl.java            | 288 ++++++++++--
 .../usergrid/persistence/map/MapManager.java    |  14 +-
 .../persistence/map/impl/MapManagerImpl.java    |   8 +
 .../persistence/map/impl/MapSerialization.java  |   9 +
 .../map/impl/MapSerializationImpl.java          | 130 +++++-
 .../persistence/map/MapManagerTest.java         |  49 +-
 stack/corepersistence/queryindex/pom.xml        |  17 +-
 .../usergrid/persistence/index/EntityIndex.java |  26 +-
 .../persistence/index/IndexBufferConsumer.java  |  11 +
 .../persistence/index/IndexBufferProducer.java  |   1 -
 .../usergrid/persistence/index/IndexFig.java    |  73 ++-
 .../index/IndexOperationMessage.java            | 115 ++++-
 .../persistence/index/guice/IndexModule.java    |   7 +
 .../persistence/index/guice/QueueProvider.java  | 116 +++++
 .../persistence/index/impl/BatchRequest.java    |  41 ++
 .../persistence/index/impl/BufferQueue.java     |  68 +++
 .../index/impl/BufferQueueInMemoryImpl.java     | 108 +++++
 .../index/impl/BufferQueueSQSImpl.java          | 307 ++++++++++++
 .../persistence/index/impl/DeIndexRequest.java  | 115 +++++
 .../index/impl/EsEntityIndexBatchImpl.java      |  50 +-
 .../index/impl/EsEntityIndexImpl.java           | 176 ++-----
 .../index/impl/EsIndexBufferConsumerImpl.java   | 286 ++++++++----
 .../index/impl/EsIndexBufferProducerImpl.java   |  16 +-
 .../persistence/index/impl/EsIndexCache.java    | 138 +++---
 .../persistence/index/impl/IndexRequest.java    | 125 +++++
 .../index/guice/TestIndexModule.java            |   3 +-
 .../index/impl/BufferQueueSQSImplTest.java      | 169 +++++++
 .../impl/EntityConnectionIndexImplTest.java     |   5 +-
 .../persistence/index/impl/EntityIndexTest.java |  20 +-
 .../persistence/index/impl/EsTestUtils.java     |  48 --
 .../persistence/queue/QueueManager.java         |   4 +-
 .../usergrid/persistence/queue/QueueScope.java  |   2 +-
 .../persistence/queue/QueueScopeFactory.java    |  34 --
 .../persistence/queue/guice/QueueModule.java    |  17 +-
 .../queue/impl/QueueScopeFactoryImpl.java       |  48 --
 .../persistence/queue/impl/QueueScopeImpl.java  |  27 +-
 .../queue/impl/SQSQueueManagerImpl.java         | 214 +++++----
 .../persistence/queue/NoAWSCredsRule.java       |  98 ++++
 .../persistence/queue/QueueManagerTest.java     |  29 +-
 stack/pom.xml                                   |  43 --
 stack/rest/pom.xml                              |   4 -
 .../org/apache/usergrid/rest/RootResource.java  |   6 -
 stack/services/pom.xml                          |  13 +-
 .../cassandra/ManagementServiceImpl.java        |   9 +-
 .../services/AbstractCollectionService.java     |  40 +-
 .../services/AbstractConnectionsService.java    |  39 +-
 .../notifications/NotificationsService.java     |  50 +-
 .../services/notifications/QueueListener.java   |   5 +-
 .../usergrid/services/queues/QueueListener.java |   5 +-
 .../notifications/NotifiersServiceIT.java       |   6 +
 stack/test-utils/pom.xml                        |   5 -
 105 files changed, 3849 insertions(+), 1571 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/awscluster/src/main/dist/init_instance/init_rest_server.sh
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index cf927ce,161037b..5da7067
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@@ -33,21 -30,17 +30,15 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
  import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
  import org.apache.usergrid.persistence.collection.guice.CollectionModule;
- import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
  import org.apache.usergrid.persistence.core.guice.CommonModule;
  import org.apache.usergrid.persistence.core.migration.data.DataMigration;
- import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
- import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
  import org.apache.usergrid.persistence.graph.guice.GraphModule;
- import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
  import org.apache.usergrid.persistence.index.guice.IndexModule;
 -import org.apache.usergrid.persistence.index.impl.BufferQueue;
 -import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
  import org.apache.usergrid.persistence.map.guice.MapModule;
  import org.apache.usergrid.persistence.queue.guice.QueueModule;
- 
- import com.google.inject.AbstractModule;
- import com.google.inject.Provider;
- import com.google.inject.TypeLiteral;
- import com.google.inject.multibindings.Multibinder;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.springframework.context.ApplicationContext;
  
  
  /**
@@@ -76,31 -69,11 +67,31 @@@ public class CoreModule  extends Abstra
          bind(EntityManagerFactory.class).toProvider( lazyEntityManagerFactoryProvider );
  
          install( new CommonModule());
 -        install(new CollectionModule());
 -        install(new GraphModule());
 -        install( new IndexModule() );
 -//        install(new MapModule());   TODO, re-enable when index module doesn't depend on queue
 -//        install(new QueueModule());
 +        install( new CollectionModule() {
 +            /**
 +             * configure our migration data provider for all entities in the system
 +             */
 +            @Override
 +           public void configureMigrationProvider() {
 +
 +                bind(new TypeLiteral< MigrationDataProvider<EntityIdScope>>(){}).to(
 +                    AllEntitiesInSystemImpl.class );
 +           }
 +        } );
 +        install( new GraphModule() {
 +
 +            /**
 +             * Override the observable that needs to be used for migration
 +             */
 +            @Override
 +            public void configureMigrationProvider() {
 +                bind( new TypeLiteral<MigrationDataProvider<GraphNode>>() {} ).to(
 +                    AllNodesInGraphImpl.class );
 +            }
 +        } );
 +        install(new IndexModule());
-         install(new MapModule());
-         install(new QueueModule());
++       //        install(new MapModule());   TODO, re-enable when index module doesn't depend on queue
++       //        install(new QueueModule());
  
          bind(ManagerCache.class).to( CpManagerCache.class );
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 65dacee,01848cb..95ff9dc
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@@ -47,18 -53,18 +54,20 @@@ import rx.schedulers.Schedulers
  public class EntityVersionDeletedHandler implements EntityVersionDeleted {
      private static final Logger logger = LoggerFactory.getLogger(EntityVersionDeletedHandler.class );
  
-     @Inject
-     private SerializationFig serializationFig;
+ 
+ 
+ 
+     private final EntityManagerFactory emf;
  
      @Inject
-     private EntityManagerFactory emf;
+     public EntityVersionDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
  
  
 +
      @Override
 -    public void versionDeleted(
 -            final CollectionScope scope, final Id entityId, final List<MvccEntity> entityVersions) {
 +    public void versionDeleted( final CollectionScope scope, final Id entityId,
 +                                final List<MvccLogEntry> entityVersions ) {
 +
  
          // This check is for testing purposes and for a test that to be able to dynamically turn
          // off and on delete previous versions so that it can test clean-up on read.
@@@ -87,19 -89,18 +92,19 @@@
                  scope.getName()
          );
  
-         rx.Observable.from( entityVersions )
-             .buffer(serializationFig.getBufferSize())
-             .map(new Func1<List<MvccLogEntry>, List<MvccLogEntry>>() {
+         Observable.from( entityVersions )
+             .collect( ei.createBatch(), new Action2<EntityIndexBatch, MvccEntity>() {
                  @Override
-                 public List<MvccLogEntry> call(List<MvccLogEntry> entityList) {
-                     for (MvccLogEntry entity : entityList) {
-                         eibatch.deindex(indexScope, entityId, entity.getVersion());
-                     }
-                     eibatch.execute();
-                     return entityList;
+                 public void call( final EntityIndexBatch entityIndexBatch, final MvccEntity mvccEntity ) {
+                     entityIndexBatch.deindex( indexScope, mvccEntity.getId(), mvccEntity.getVersion() );
                  }
-             }).toBlocking().last();
+             } ).doOnNext( new Action1<EntityIndexBatch>() {
+             @Override
+             public void call( final EntityIndexBatch entityIndexBatch ) {
+                 entityIndexBatch.execute();
+             }
+         } ).toBlocking().last();
      }
  
 +
  }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index d5478d4,a73d7a7..68ddab5
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@@ -21,19 -21,17 +21,14 @@@ package org.apache.usergrid.persistence
  import org.safehaus.guicyfig.GuicyFigModule;
  
  import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
- import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
- import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
- import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
- import org.apache.usergrid.persistence.collection.MvccEntity;
+ import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory;
 -import org.apache.usergrid.persistence.collection.MvccEntity;
  import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
  import org.apache.usergrid.persistence.collection.event.EntityDeleted;
  import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
  import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
  import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
--import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
  import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
  import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
--import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
  import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
  import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
  import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index 10d85f8,23e375d..ac82181
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@@ -26,25 -25,25 +26,18 @@@ import java.util.concurrent.ExecutionEx
  import org.apache.usergrid.persistence.collection.CollectionScope;
  import org.apache.usergrid.persistence.collection.EntityCollectionManager;
  import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+ import org.apache.usergrid.persistence.collection.EntityCollectionManagerSync;
 +import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
 +import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
 +import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
  import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionManager;
  import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
--import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
- import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
- import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
- import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
 -import org.apache.usergrid.persistence.collection.guice.Write;
 -import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
+ import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 -import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
 -import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
 -import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
  import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
  import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
  import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
  import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
  import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
- import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
- import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
--import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
--import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 -import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
--import org.apache.usergrid.persistence.core.task.TaskExecutor;
  
  import com.google.common.base.Preconditions;
  import com.google.common.cache.CacheBuilder;
@@@ -74,25 -74,22 +67,24 @@@ public class EntityCollectionManagerFac
      private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
      private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
      private final Keyspace keyspace;
-     private final EntityVersionCleanupFactory entityVersionCleanupFactory;
-     private final EntityVersionCreatedFactory entityVersionCreatedFactory;
-     private final EntityDeletedFactory entityDeletedFactory;
+     private final EntityVersionTaskFactory entityVersionTaskFactory;
      private final TaskExecutor taskExecutor;
+     private final EntityCacheFig entityCacheFig;
+     private final MetricsFactory metricsFactory;
  
 +    private EntityCacheFig entityCacheFig;
 +    private SerializationFig serializationFig;
      private LoadingCache<CollectionScope, EntityCollectionManager> ecmCache =
          CacheBuilder.newBuilder().maximumSize( 1000 )
                      .build( new CacheLoader<CollectionScope, EntityCollectionManager>() {
                          public EntityCollectionManager load( CollectionScope scope ) {
  
                                    //create the target EM that will perform logic
 -                            final EntityCollectionManager target = new EntityCollectionManagerImpl( writeStart, writeUpdate, writeVerifyUnique,
 +                            final EntityCollectionManager target = new EntityCollectionManagerImpl(
 +                                writeStart, writeVerifyUnique,
                                  writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
                                  entitySerializationStrategy, uniqueValueSerializationStrategy,
-                                 mvccLogEntrySerializationStrategy, keyspace, serializationFig,entityVersionCleanupFactory,
-                                 entityVersionCreatedFactory, entityDeletedFactory, taskExecutor, scope );
 -                                mvccLogEntrySerializationStrategy, keyspace, entityVersionTaskFactory,
 -                                taskExecutor, scope, metricsFactory );
++                                mvccLogEntrySerializationStrategy, keyspace, serializationFig,entityVersionTaskFactory, taskExecutor, scope, metricsFactory );
  
  
                              final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target  );
@@@ -113,14 -111,13 +105,12 @@@
                                                 final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                                 final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
                                                 final Keyspace keyspace,
-                                                final EntityVersionCleanupFactory entityVersionCleanupFactory,
-                                                final EntityVersionCreatedFactory entityVersionCreatedFactory,
-                                                final EntityDeletedFactory entityDeletedFactory,
+                                                final EntityVersionTaskFactory entityVersionTaskFactory,
                                                 @CollectionTaskExecutor final TaskExecutor taskExecutor,
                                                final EntityCacheFig entityCacheFig,
-                                                final SerializationFig serializationFig) {
+                                                MetricsFactory metricsFactory) {
  
          this.writeStart = writeStart;
 -        this.writeUpdate = writeUpdate;
          this.writeVerifyUnique = writeVerifyUnique;
          this.writeOptimisticVerify = writeOptimisticVerify;
          this.writeCommit = writeCommit;
@@@ -131,20 -128,21 +121,18 @@@
          this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
          this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
          this.keyspace = keyspace;
-         this.entityVersionCleanupFactory = entityVersionCleanupFactory;
-         this.entityVersionCreatedFactory = entityVersionCreatedFactory;
-         this.entityDeletedFactory = entityDeletedFactory;
+         this.entityVersionTaskFactory = entityVersionTaskFactory;
          this.taskExecutor = taskExecutor;
          this.entityCacheFig = entityCacheFig;
-         this.serializationFig = serializationFig;
+         this.metricsFactory = metricsFactory;
      }
 -
 -
      @Override
 -    public EntityCollectionManager createCollectionManager( CollectionScope collectionScope ) {
 -        Preconditions.checkNotNull( collectionScope );
 -        try {
 -            return ecmCache.get( collectionScope );
 -        }
 -        catch ( ExecutionException ee ) {
 -            throw new RuntimeException( ee );
 +    public EntityCollectionManager createCollectionManager(CollectionScope collectionScope) {
 +        Preconditions.checkNotNull(collectionScope);
 +        try{
 +            return ecmCache.get(collectionScope);
 +        }catch (ExecutionException ee){
 +            throw new RuntimeException(ee);
          }
      }
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 19e8937,6ba4513..001e0db
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@@ -35,6 -34,9 +37,10 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.collection.MvccEntity;
  import org.apache.usergrid.persistence.collection.VersionSet;
  import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
++import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
+ import org.apache.usergrid.persistence.collection.guice.Write;
+ import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
+ import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
  import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
  import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
  import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
@@@ -51,8 -51,9 +57,11 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
  import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
  import org.apache.usergrid.persistence.core.guice.ProxyImpl;
 +import org.apache.usergrid.persistence.core.task.Task;
 +import org.apache.usergrid.persistence.core.task.TaskExecutor;
+ import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+ import org.apache.usergrid.persistence.core.task.Task;
+ import org.apache.usergrid.persistence.core.task.TaskExecutor;
  import org.apache.usergrid.persistence.core.util.Health;
  import org.apache.usergrid.persistence.core.util.ValidationUtils;
  import org.apache.usergrid.persistence.model.entity.Entity;
@@@ -69,9 -72,14 +80,17 @@@ import com.netflix.astyanax.connectionp
  import com.netflix.astyanax.model.ColumnFamily;
  import com.netflix.astyanax.model.CqlResult;
  import com.netflix.astyanax.serializers.StringSerializer;
++import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
++import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
++import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
+ import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
+ import org.apache.usergrid.persistence.core.task.Task;
+ import org.apache.usergrid.persistence.core.task.TaskExecutor;
  
+ import rx.Notification;
  import rx.Observable;
  import rx.Subscriber;
+ import rx.functions.Action0;
  import rx.functions.Action1;
  import rx.functions.Func1;
  import rx.schedulers.Schedulers;
@@@ -288,6 -368,140 +375,89 @@@ public class EntityCollectionManagerImp
          } );
      }
  
+ 
+     /**
+      * Retrieves all entities that correspond to each field given in the Collection.
+      * @param fields
+      * @return
+      */
+     @Override
+     public Observable<FieldSet> getEntitiesFromFields( final Collection<Field> fields ) {
+         return rx.Observable.just(fields).map( new Func1<Collection<Field>, FieldSet>() {
+             @Override
+             public FieldSet call( Collection<Field> fields ) {
+                 try {
+ 
+                     final UUID startTime = UUIDGenerator.newTimeUUID();
+ 
+                     //Get back set of unique values that correspond to collection of fields
+                     UniqueValueSet set = uniqueValueSerializationStrategy.load( collectionScope, fields );
+ 
+                     //Short circut if we don't have any uniqueValues from the given fields.
+                     if(!set.iterator().hasNext()){
+                         return new MutableFieldSet( 0 );
+                     }
+ 
+ 
+                     //loop through each field, and construct an entity load
+                     List<Id> entityIds = new ArrayList<>(fields.size());
+                     List<UniqueValue> uniqueValues = new ArrayList<>(fields.size());
+ 
+                     for(final Field expectedField: fields) {
+ 
+                         UniqueValue value = set.getValue(expectedField.getName());
+ 
+                         if(value ==null){
+                             logger.debug( "Field does not correspond to a unique value" );
+                         }
+ 
+                         entityIds.add(value.getEntityId());
+                         uniqueValues.add(value);
+                     }
+ 
+                     //Load a entity for each entityId we retrieved.
+                     final EntitySet entitySet = entitySerializationStrategy.load(collectionScope, entityIds, startTime);
+ 
+                     //now loop through and ensure the entities are there.
+                     final MutationBatch deleteBatch = keyspace.prepareMutationBatch();
+ 
+                     final MutableFieldSet response = new MutableFieldSet(fields.size());
+ 
+                     for(final UniqueValue expectedUnique: uniqueValues) {
+                         final MvccEntity entity = entitySet.getEntity(expectedUnique.getEntityId());
+ 
+                         //bad unique value, delete this, it's inconsistent
+                         if(entity == null || !entity.getEntity().isPresent()){
+                             final MutationBatch valueDelete = uniqueValueSerializationStrategy.delete(collectionScope, expectedUnique);
+                             deleteBatch.mergeShallow(valueDelete);
+                             continue;
+                         }
+ 
+ 
+                         //else add it to our result set
+                         response.addEntity(expectedUnique.getField(),entity);
+ 
+                     }
+ 
+                     //TODO: explore making this an Async process
+                     //We'll repair it again if we have to
+                     deleteBatch.execute();
+ 
+                     return response;
+ 
+ 
+                 }
+                 catch ( ConnectionException e ) {
+                     logger.error( "Failed to getIdField", e );
+                     throw new RuntimeException( e );
+                 }
+             }
+         } );
+     }
+ 
+ 
+ 
 -    @Override
 -    public Observable<Entity> update( final Entity entity ) {
 -
 -        logger.debug( "Starting update process" );
 -
 -        //do our input validation
 -        Preconditions.checkNotNull( entity, "Entity is required in the new stage of the mvcc write" );
 -
 -        final Id entityId = entity.getId();
 -
 -
 -        ValidationUtils.verifyIdentity( entityId );
 -
 -        // create our observable and start the write
 -        CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
 -
 -
 -        Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeUpdate );
 -
 -
 -        final Timer.Context timer = updateTimer.time();
 -        return observable.map( writeCommit ).doOnNext(new Action1<Entity>() {
 -            @Override
 -            public void call(final Entity entity) {
 -                logger.debug("sending entity to the queue");
 -
 -                //we an update, signal the fix
 -                taskExecutor.submit( entityVersionTaskFactory.getCreatedTask( collectionScope, entity ));
 -
 -                taskExecutor.submit( entityVersionTaskFactory.getCleanupTask( collectionScope, entity.getId(), entity.getVersion(), false) );
 -                //TODO T.N Change this to fire a task
 -                //                Observable.from( new CollectionIoEvent<Id>(collectionScope,
 -                // entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();
 -
 -
 -            }
 -        }).doOnError(rollback)
 -            .doOnNext(new Action1<Entity>() {
 -                @Override
 -                public void call(Entity entity) {
 -                    updateMeter.mark();
 -                }
 -            })
 -            .doOnCompleted(new Action0() {
 -                @Override
 -                public void call() {
 -                    timer.stop();
 -                }
 -            });
 -    }
 -
+ 
      // fire the stages
      public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
                                                                    WriteStart writeState ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
index dbe58d4,83f165d..5472645
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@@ -22,9 -22,8 +22,8 @@@ import com.google.inject.Inject
  import com.google.inject.assistedinject.Assisted;
  import com.netflix.astyanax.MutationBatch;
  import org.apache.usergrid.persistence.collection.CollectionScope;
- import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
  import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 -import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 +import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
  import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
  import org.apache.usergrid.persistence.core.task.Task;
  import org.apache.usergrid.persistence.model.entity.Id;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 7c39ef4,65506ec..27c3db8
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@@ -20,9 -20,17 +20,18 @@@ package org.apache.usergrid.persistence
  
  import java.util.Iterator;
  import java.util.List;
 +import java.util.Set;
  import java.util.UUID;
  
+ import com.google.inject.Inject;
+ import com.google.inject.assistedinject.Assisted;
+ import org.apache.usergrid.persistence.collection.MvccEntity;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+ import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+ import org.apache.usergrid.persistence.collection.util.EntityUtils;
+ import org.apache.usergrid.persistence.model.entity.Entity;
+ import org.apache.usergrid.persistence.model.field.Field;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -70,6 -75,7 +79,7 @@@ public class EntityVersionCleanupTask i
      private final CollectionScope scope;
      private final Id entityId;
      private final UUID version;
 -    private final int numToSkip;
++    private final boolean includeVersion;
  
  
      @Inject
@@@ -89,6 -100,8 +103,8 @@@
          this.scope = scope;
          this.entityId = entityId;
          this.version = version;
+ 
 -        numToSkip = includeVersion? 0: 1;
++        includeVersion = includeVersion;
      }
  
  
@@@ -117,94 -130,79 +133,92 @@@
      @Override
      public Void call() throws Exception {
          //TODO Refactor this logic into a a class that can be invoked from anywhere
 -        //load every entity we have history of
 -        Observable<List<MvccEntity>> deleteFieldsObservable =
 -            Observable.create(new ObservableIterator<MvccEntity>("deleteColumns") {
 -                @Override
 -                protected Iterator<MvccEntity> getIterator() {
 -                    Iterator<MvccEntity> entities =  entitySerializationStrategy.loadDescendingHistory(
 -                        scope, entityId, version, 1000); // TODO: what fetchsize should we use here?
 -                    return entities;
 -                }
 -            })
 -            //buffer them for efficiency
 -            .skip(numToSkip)
 -            .buffer(serializationFig.getBufferSize()).doOnNext(
 -            new Action1<List<MvccEntity>>() {
 -                @Override
 -                public void call(final List<MvccEntity> mvccEntities) {
 -                    final MutationBatch batch = keyspace.prepareMutationBatch();
 -                    final MutationBatch entityBatch = keyspace.prepareMutationBatch();
 -                    final MutationBatch logBatch = keyspace.prepareMutationBatch();
--
 -                    for (MvccEntity mvccEntity : mvccEntities) {
 -                        final UUID entityVersion = mvccEntity.getVersion();
--
-         //iterate all unique values
 -
 -                        //if the entity is present process the fields
 -                        if(mvccEntity.getEntity().isPresent()) {
 -                            final Entity entity = mvccEntity.getEntity().get();
 -
 -                            //remove all unique fields from the index
 -                            for ( final Field field : EntityUtils.getUniqueFields(entity )) {
 -
 -                                final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion );
 -                                final MutationBatch deleteMutation =
 -                                    uniqueValueSerializationStrategy.delete( scope, unique );
 -                                batch.mergeShallow( deleteMutation );
++       //iterate all unique values
 +        final BlockingObservable<Long> uniqueValueCleanup =
 +                Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
 +                    @Override
 +                    protected Iterator<UniqueValue> getIterator() {
 +                        return uniqueValueSerializationStrategy.getAllUniqueFields( scope, entityId );
 +                    }
 +                } )
 +
 +                        //skip current versions
 +                        .skipWhile( new Func1<UniqueValue, Boolean>() {
 +                            @Override
 +                            public Boolean call( final UniqueValue uniqueValue ) {
 +                                return version.equals( uniqueValue.getEntityVersion() );
                              }
 +                        } )
 +                                //buffer our buffer size, then roll them all up in a single batch mutation
 +                        .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<UniqueValue>>() {
 +                    @Override
 +                    public void call( final List<UniqueValue> uniqueValues ) {
 +                        final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
 +
 +
 +                        for ( UniqueValue value : uniqueValues ) {
 +                            uniqueCleanupBatch.mergeShallow( uniqueValueSerializationStrategy.delete( scope, value ) );
                          }
  
 -                        final MutationBatch entityDelete = entitySerializationStrategy
 -                                .delete(scope, entityId, mvccEntity.getVersion());
 -                        entityBatch.mergeShallow( entityDelete );
 -                        final MutationBatch logDelete = logEntrySerializationStrategy
 -                                .delete(scope, entityId, version);
 -                        logBatch.mergeShallow(logDelete);
 +                        try {
 +                            uniqueCleanupBatch.execute();
 +                        }
 +                        catch ( ConnectionException e ) {
 +                            throw new RuntimeException( "Unable to execute batch mutation", e );
 +                        }
                      }
 +                } ).subscribeOn( Schedulers.io() ).longCount().toBlocking();
  
 -                    try {
 -                        batch.execute();
 -                    } catch (ConnectionException e1) {
 -                        throw new RuntimeException("Unable to execute " +
 -                                "unique value " +
 -                                "delete", e1);
 -                    }
 -                    fireEvents(mvccEntities);
 -                    try {
 -                        entityBatch.execute();
 -                    } catch (ConnectionException e) {
 -                        throw new RuntimeException("Unable to delete entities in cleanup", e);
 +
 +        //start calling the listeners for remove log entries
 +        BlockingObservable<Long> versionsDeletedObservable =
 +
 +                Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) {
 +                    @Override
 +                    protected Iterator<MvccLogEntry> getIterator() {
 +
 +                        return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, version,
 +                                serializationFig.getBufferSize() );
                      }
 +                } )
 +                        //skip current version
 +                        .skipWhile( new Func1<MvccLogEntry, Boolean>() {
 +                            @Override
 +                            public Boolean call( final MvccLogEntry mvccLogEntry ) {
 +                                return version.equals( mvccLogEntry.getVersion() );
 +                            }
 +                        } )
 +                                //buffer them for efficiency
 +                        .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<MvccLogEntry>>() {
 +                    @Override
 +                    public void call( final List<MvccLogEntry> mvccEntities ) {
 +
 +                        fireEvents( mvccEntities );
 +
 +                        final MutationBatch logCleanupBatch = keyspace.prepareMutationBatch();
 +
 +
 +                        for ( MvccLogEntry entry : mvccEntities ) {
 +                            logCleanupBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, entry.getVersion() ));
 +                        }
  
 -                    try {
 -                        logBatch.execute();
 -                    } catch (ConnectionException e) {
 -                        throw new RuntimeException("Unable to delete entities from the log", e);
 +                        try {
 +                            logCleanupBatch.execute();
 +                        }
 +                        catch ( ConnectionException e ) {
 +                            throw new RuntimeException( "Unable to execute batch mutation", e );
 +                        }
                      }
 +                } ).subscribeOn( Schedulers.io() ).longCount().toBlocking();
 +
 +        //wait or this to complete
 +        final Long removedCount = uniqueValueCleanup.last();
  
 -                }
 -            }
 -        );
 +        logger.debug( "Removed unique values for {} entities of entity {}", removedCount, entityId );
  
 -        final int removedCount = deleteFieldsObservable.count().toBlocking().last();
 +        final Long versionCleanupCount = versionsDeletedObservable.last();
  
 -        logger.debug("Removed unique values for {} entities of entity {}",removedCount,entityId);
 +        logger.debug( "Removed {} previous entity versions of entity {}", versionCleanupCount, entityId );
  
          return null;
      }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
index 5b7898e,60275d9..d7a6407
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/UniqueValueSerializationStrategy.java
@@@ -23,9 -22,9 +23,10 @@@ import java.util.Iterator
  
  import com.netflix.astyanax.MutationBatch;
  import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+ import com.netflix.astyanax.model.ConsistencyLevel;
  import org.apache.usergrid.persistence.collection.CollectionScope;
  import org.apache.usergrid.persistence.core.migration.schema.Migration;
 +import org.apache.usergrid.persistence.model.entity.Id;
  import org.apache.usergrid.persistence.model.field.Field;
  
  
@@@ -55,27 -53,23 +56,38 @@@ public interface UniqueValueSerializati
      /**
       * Load UniqueValue that matches field from collection or null if that value does not exist.
       *
 -     * @param colScope Collection scope in which to look for field name/value
 +     * @param collectionScope scope in which to look for field name/value
       * @param fields Field name/value to search for
 +     *
       * @return UniqueValueSet containing fields from the collection that exist in cassandra
 +     *
       * @throws ConnectionException on error connecting to Cassandra
       */
 -    public UniqueValueSet load( CollectionScope colScope, Collection<Field> fields ) throws ConnectionException;
 +    public UniqueValueSet load( CollectionScope collectionScope, Collection<Field> fields ) throws ConnectionException;
  
+     /**
 -     * Load UniqueValue that matches field from collection or null if that value does not exist.
 -     *
 -     * @param colScope Collection scope in which to look for field name/value
 -     * @param consistencyLevel Consistency level of query
 -     * @param fields Field name/value to search for
 -     * @return UniqueValueSet containing fields from the collection that exist in cassandra
 -     * @throws ConnectionException on error connecting to Cassandra
 -     */
++    * Load UniqueValue that matches field from collection or null if that value does not exist.
++    *
++    * @param colScope Collection scope in which to look for field name/value
++    * @param consistencyLevel Consistency level of query
++    * @param fields Field name/value to search for
++    * @return UniqueValueSet containing fields from the collection that exist in cassandra
++    * @throws ConnectionException on error connecting to Cassandra
++    */
+     public UniqueValueSet load( CollectionScope colScope, ConsistencyLevel consistencyLevel, Collection<Field> fields ) throws ConnectionException;
++
 +
 +    /**
 +     * Loads the currently persisted history of every unique value the entity has held.  This will
 +     * start from the max version and return values in descending version order.  Note that for entities
 +     * with more than one unique field, sequential fields can be returned with the same version.
 +     * @param collectionScope The scope the entity is stored in
 +     * @param entityId
 +     * @return
 +     */
 +    public Iterator<UniqueValue> getAllUniqueFields(CollectionScope collectionScope, Id entityId);
 +
 +
      /**
       * Delete the specified Unique Value from Cassandra.
       *

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index 7aed8db,47372ae..108f2e8
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@@ -38,16 -36,8 +40,17 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
  import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
  import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
 +import org.apache.usergrid.persistence.collection.util.EntityUtils;
 +import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
 +import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
 +import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
 +import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
 +import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 +import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 +import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+ import org.apache.usergrid.persistence.core.migration.schema.Migration;
  import org.apache.usergrid.persistence.core.util.ValidationUtils;
 +import org.apache.usergrid.persistence.model.entity.Entity;
  import org.apache.usergrid.persistence.model.entity.Id;
  import org.apache.usergrid.persistence.model.field.Field;
  
@@@ -78,40 -65,23 +81,43 @@@ public class UniqueValueSerializationSt
  
      private static final EntityVersionSerializer ENTITY_VERSION_SER = new EntityVersionSerializer();
  
 -    private static final MultiTennantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Field>>, EntityVersion> CF_UNIQUE_VALUES =
 -            new MultiTennantColumnFamily<>( "Unique_Values", ROW_KEY_SER,
 -                    ENTITY_VERSION_SER );
 +    private static final MultiTennantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Field>>, EntityVersion>
 +        CF_UNIQUE_VALUES = new MultiTennantColumnFamily<>( "Unique_Values", ROW_KEY_SER, ENTITY_VERSION_SER );
 +
 +
++
 +    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
 +
 +
 +    private static final CollectionScopedRowKeySerializer<Id> ENTITY_ROW_KEY_SER =
 +        new CollectionScopedRowKeySerializer<>( ID_SER );
 +
  
 +
 +    private static final MultiTennantColumnFamily<ScopedRowKey<CollectionPrefixedKey<Id>>, UniqueFieldEntry>
 +        CF_ENTITY_UNIQUE_VALUES =
 +        new MultiTennantColumnFamily<>( "Entity_Unique_Values", ENTITY_ROW_KEY_SER, UniqueFieldEntrySerializer.get() );
 +
 +    public static final int COL_VALUE = 0x0;
 +
 +
-     private final Keyspace keyspace;
 +    private final SerializationFig serializationFig;
+     protected final Keyspace keyspace;
 -    private final CassandraFig cassandraFig;
++       private final CassandraFig cassandraFig;
 +
  
  
      /**
       * Construct serialization strategy for keyspace.
       *
       * @param keyspace Keyspace in which to store Unique Values.
 +     * @param serializationFig
       */
      @Inject
-     public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final SerializationFig serializationFig ) {
 -    public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final CassandraFig cassandraFig) {
 -        this.cassandraFig = cassandraFig;
++    public UniqueValueSerializationStrategyImpl( final Keyspace keyspace, final CassandraFig cassandraFig, final  SerializationFig serializationFig ) {
          this.keyspace = keyspace;
++        this.cassandraFig = cassandraFig;
 +        this.serializationFig = serializationFig;
      }
  
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
index beff3bd,cf964a3..20edb66
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/util/EntityUtils.java
@@@ -2,9 -2,10 +2,10 @@@ package org.apache.usergrid.persistence
  
  
  import java.util.ArrayList;
- import java.util.Collection;
  import java.util.List;
 -import java.util.Set;
++import java.util.Collection;
  import java.util.UUID;
+ import org.apache.usergrid.persistence.model.field.Field;
  
  import org.apache.commons.lang3.reflect.FieldUtils;
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 8f78799,f19d613..246bbca
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@@ -96,7 -126,8 +126,8 @@@ public class GraphManagerImpl implement
                               final GraphFig graphFig,
                               final EdgeDeleteListener edgeDeleteListener,
                               final NodeDeleteListener nodeDeleteListener,
-                              final ApplicationScope scope) {
 -                             @Assisted final ApplicationScope scope,
++                             final ApplicationScope scope,
+                              MetricsFactory metricsFactory) {
  
  
          ValidationUtils.validateApplicationScope( scope );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 7fccd11,50b994d..27fe705
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@@ -19,8 -19,12 +19,9 @@@
  package org.apache.usergrid.persistence.index.guice;
  
  
- import org.apache.usergrid.persistence.core.guice.CommonModule;
+ import org.apache.usergrid.persistence.collection.guice.CollectionModule;
  import org.apache.usergrid.persistence.core.guice.TestModule;
+ import org.apache.usergrid.persistence.core.guice.CommonModule;
 -import org.apache.usergrid.persistence.index.impl.BufferQueue;
 -import org.apache.usergrid.persistence.index.impl.BufferQueueInMemoryImpl;
 -import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
  
  
  public class TestIndexModule extends TestModule {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index e4043d2,ca9bf79..9cb5297
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@@ -24,10 -24,14 +24,12 @@@ import java.util.*
  import java.util.concurrent.CountDownLatch;
  import java.util.concurrent.atomic.AtomicLong;
  
+ import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
  import org.apache.usergrid.persistence.index.*;
 -import org.apache.usergrid.persistence.model.field.ArrayField;
 -import org.apache.usergrid.persistence.model.field.EntityObjectField;
 -import org.apache.usergrid.persistence.model.field.UUIDField;
 +import org.apache.usergrid.persistence.model.field.*;
  import org.apache.usergrid.persistence.model.field.value.EntityObject;
  import org.junit.Ignore;
+ import org.junit.Rule;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7b5d2224/stack/pom.xml
----------------------------------------------------------------------